Commit 2011b7f2 authored by NILANJAN DAW's avatar NILANJAN DAW

Added support for data replication on Follower nodes.

Added Storage model
Added IO handler wrapper class
parent 34204c32
...@@ -7,11 +7,13 @@ import hpdos.grpc.HeartbeatRequest; ...@@ -7,11 +7,13 @@ import hpdos.grpc.HeartbeatRequest;
import hpdos.grpc.HeartbeatResponse; import hpdos.grpc.HeartbeatResponse;
import hpdos.grpc.HeartbeatServiceGrpc; import hpdos.grpc.HeartbeatServiceGrpc;
import hpdos.handler.HeartbeatHandler; import hpdos.handler.HeartbeatHandler;
import hpdos.handler.IOHandler;
import hpdos.handler.NetworkHandler; import hpdos.handler.NetworkHandler;
import hpdos.handler.ReplicateHandler;
import hpdos.lib.MasterFollower; import hpdos.lib.MasterFollower;
import hpdos.lib.MemoryStorage; import hpdos.lib.MemoryStorage;
import hpdos.lib.StorageService; import hpdos.lib.StorageService;
import hpdos.messageformat.MessageConstants; import hpdos.message.MessageConstants;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.Server; import io.grpc.Server;
...@@ -30,7 +32,7 @@ public class MetadataServer { ...@@ -30,7 +32,7 @@ public class MetadataServer {
private boolean isMaster = false; private boolean isMaster = false;
private int port; private int port;
private final String host; private final String host;
private StorageService storageService; private IOHandler ioHandler;
public MetadataServer() { public MetadataServer() {
this.followers = new HashMap<>(); this.followers = new HashMap<>();
...@@ -43,13 +45,13 @@ public class MetadataServer { ...@@ -43,13 +45,13 @@ public class MetadataServer {
return "Hello World!"; return "Hello World!";
} }
public boolean startServer() { public boolean startMasterServices() {
server = ServerBuilder.forPort(ConfigConstants.PORT) this.server = ServerBuilder.forPort(ConfigConstants.PORT)
.addService(new NetworkHandler()) .addService(new NetworkHandler(this.ioHandler))
.addService(new HeartbeatHandler(followers, serverID)) .addService(new HeartbeatHandler(followers, serverID))
.build(); .build();
try { try {
server.start(); this.server.start();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
return false; return false;
...@@ -57,7 +59,22 @@ public class MetadataServer { ...@@ -57,7 +59,22 @@ public class MetadataServer {
return true; return true;
} }
public void blockForIO() { public boolean startFollowerServices() {
// In case of followers NetworkHandler will only serve read requests for private metadata
// Other network handler services will fail
this.server = ServerBuilder.forPort(ConfigConstants.PORT)
.addService(new NetworkHandler(this.ioHandler))
.addService(new ReplicateHandler(this.ioHandler))
.build();
try {
this.server.start();
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public void blockForIO(Server server) {
if (server == null) if (server == null)
return; return;
try { try {
...@@ -87,7 +104,6 @@ public class MetadataServer { ...@@ -87,7 +104,6 @@ public class MetadataServer {
this.port = ConfigConstants.PORT; this.port = ConfigConstants.PORT;
} }
channel.shutdown(); channel.shutdown();
} }
private void startHeartbeatService() { private void startHeartbeatService() {
...@@ -100,15 +116,18 @@ public class MetadataServer { ...@@ -100,15 +116,18 @@ public class MetadataServer {
}, ConfigConstants.HEARTBEAT_INTERVAL, ConfigConstants.HEARTBEAT_INTERVAL); }, ConfigConstants.HEARTBEAT_INTERVAL, ConfigConstants.HEARTBEAT_INTERVAL);
} }
private boolean initStorage(int backend) { private IOHandler initStorage(int backend) {
StorageService storageService;
switch (backend) { switch (backend) {
case ConfigConstants.BACKEND_IN_MEMORY: case ConfigConstants.BACKEND_IN_MEMORY:
this.storageService = new MemoryStorage(); storageService = new MemoryStorage();
return true; break;
case ConfigConstants.BINARY_TREE_BACKEND: case ConfigConstants.BINARY_TREE_BACKEND:
case ConfigConstants.LSM_BACKEND: case ConfigConstants.LSM_BACKEND:
default: return false; default: return null;
} }
IOHandler ioHandler = new IOHandler(storageService);
return ioHandler;
} }
public static void main(String[] args) { public static void main(String[] args) {
...@@ -117,25 +136,35 @@ public class MetadataServer { ...@@ -117,25 +136,35 @@ public class MetadataServer {
System.out.println(metaDataServer.getGreeting()); System.out.println(metaDataServer.getGreeting());
System.out.println("Starting Metadata service"); System.out.println("Starting Metadata service");
System.out.println("Initialising storage service"); System.out.println("Initialising storage service");
boolean status = metaDataServer.initStorage(ConfigConstants.BACKEND_IN_MEMORY);
if (!status) { // Check ConfigConstants for available storage options
metaDataServer.ioHandler = metaDataServer.initStorage(ConfigConstants.BACKEND_IN_MEMORY);
if (metaDataServer.ioHandler == null) {
System.out.println("Storage server initialisation error"); System.out.println("Storage server initialisation error");
return; return;
} }
System.out.println("Searching for MetadataMaster"); System.out.println("Searching for MetadataMaster");
metaDataServer.announceToMaster(); metaDataServer.announceToMaster();
if (metaDataServer.isMaster) { if (metaDataServer.isMaster) {
status = metaDataServer.startServer(); boolean status = metaDataServer.startMasterServices();
System.out.println("Master ID: " + metaDataServer.serverID); System.out.println("Master ID: " + metaDataServer.serverID);
if (status) { if (status) {
System.out.println("Starting RPC server at: " + ConfigConstants.PORT); System.out.println("Starting Master MetadataServer at: " + ConfigConstants.PORT);
metaDataServer.blockForIO(); metaDataServer.blockForIO(metaDataServer.server);
} }
else else
System.out.println("Failed to create server"); System.out.println("Failed to create server");
} else { } else {
System.out.println("Master Node detected.\nStarting heartbeat service"); System.out.println("Master Node detected.\nStarting heartbeat service");
metaDataServer.startHeartbeatService(); metaDataServer.startHeartbeatService();
System.out.println("Starting replication service");
boolean status = metaDataServer.startFollowerServices();
if (status) {
System.out.println("Starting Follower MetadataServer at: " + metaDataServer.port);
metaDataServer.blockForIO(metaDataServer.server);
}
else
System.out.println("Failed to create server");
} }
} }
......
...@@ -4,14 +4,10 @@ import hpdos.grpc.HeartbeatRequest; ...@@ -4,14 +4,10 @@ import hpdos.grpc.HeartbeatRequest;
import hpdos.grpc.HeartbeatResponse; import hpdos.grpc.HeartbeatResponse;
import hpdos.grpc.HeartbeatServiceGrpc; import hpdos.grpc.HeartbeatServiceGrpc;
import hpdos.lib.MasterFollower; import hpdos.lib.MasterFollower;
import hpdos.messageformat.MessageConstants; import hpdos.message.MessageConstants;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.io.UnsupportedEncodingException;
import java.security.NoSuchAlgorithmException;
import java.security.Timestamp;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
public class HeartbeatHandler extends HeartbeatServiceGrpc.HeartbeatServiceImplBase { public class HeartbeatHandler extends HeartbeatServiceGrpc.HeartbeatServiceImplBase {
......
package hpdos.handler;
import hpdos.grpc.Ack;
import hpdos.grpc.Nack;
import hpdos.grpc.Request;
import hpdos.grpc.Response;
import hpdos.lib.StorageModel;
import hpdos.lib.StorageService;
import hpdos.message.MessageConstants;
import hpdos.message.ResponseBuilder;
public class IOHandler {
StorageService storageService;
public IOHandler(StorageService storageService) {
this.storageService = storageService;
}
public Response create(Request request) {
Ack ack = null; Nack nack = null;
StorageModel blob = new StorageModel(
MessageConstants.INIT_VERSION,
request.getDataSize(),
request.getKey(),
request.getAccessType(),
request.getClientID(),
request.getValue());
boolean status = storageService.create(request.getKey(), blob);
if (status) {
ack = ResponseBuilder.buildAck(MessageConstants.INIT_VERSION,
blob.getDataSize(),
blob.getKey(),
blob.getCrc(),
blob.getValue());
}
else {
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
MessageConstants.STATUS_REPLICATE_FAILED,
request.getKey());
}
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE,
ack, nack);
}
public Response update(Request request) {
return null;
}
public Response delete(Request request) {
return null;
}
}
package hpdos.handler; package hpdos.handler;
import hpdos.grpc.*; import hpdos.grpc.*;
import hpdos.messageformat.*; import hpdos.message.*;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import java.util.ArrayList; import java.util.ArrayList;
public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
private IOHandler ioHandler;
public NetworkHandler(IOHandler ioHandler) {
this.ioHandler = ioHandler;
}
@Override @Override
public void getMetadata(Packet request, public void readMetadata(Packet request,
StreamObserver<Packet> responseObserver) { StreamObserver<Packet> responseObserver) {
System.out.println("Data received" + request.toString()); System.out.println("Data received" + request.toString());
String key = "dummy"; String key = "dummy";
...@@ -18,11 +22,25 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { ...@@ -18,11 +22,25 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
ArrayList<Response> responses = new ArrayList<>(); ArrayList<Response> responses = new ArrayList<>();
responses.add(ResponseBuilder. responses.add(ResponseBuilder.
buildResponsePacket(MessageConstants.METADATA_LOOKUP_RESPONSE, acks, null)); buildResponsePacket(MessageConstants.METADATA_READ, acks, null));
Packet packet = ResponseBuilder.buildPacket(responses); Packet packet = ResponseBuilder.buildPacket(responses);
System.out.println(packet); System.out.println(packet);
responseObserver.onNext(packet); responseObserver.onNext(packet);
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
@Override
public void createMetadata(Packet request, StreamObserver<Packet> responseObserver) {
super.createMetadata(request, responseObserver);
}
@Override
public void updateMetadata(Packet request, StreamObserver<Packet> responseObserver) {
super.updateMetadata(request, responseObserver);
}
@Override
public void deleteMetadata(Packet request, StreamObserver<Packet> responseObserver) {
super.deleteMetadata(request, responseObserver);
}
} }
package hpdos.handler;
import hpdos.grpc.*;
import hpdos.message.MessageConstants;
import hpdos.message.ResponseBuilder;
import io.grpc.stub.StreamObserver;
public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceImplBase {
private final IOHandler ioHandler;
public ReplicateHandler(IOHandler ioHandler) {
this.ioHandler = ioHandler;
}
@Override
public void replicateMetadata(ReplicationRequest replicationRequest, StreamObserver<ReplicationResponse> responseObserver) {
ReplicationResponse responsePacket = null;
System.out.println("Replication request " + replicationRequest);
if (replicationRequest.getPacketType() == MessageConstants.PACKET_METADATA_REQUEST) {
for (Request request: replicationRequest.getRequestList()) {
if (request.getOperationType() == MessageConstants.METADATA_CREATE) {
Response response = ioHandler.create(request);
responsePacket = ResponseBuilder.buildReplicationResponse(response);
} else if (request.getOperationType() == MessageConstants.METADATA_UPDATE) {
Response response = ioHandler.update(request);
responsePacket = ResponseBuilder.buildReplicationResponse(response);
} else if (request.getOperationType() == MessageConstants.METADATA_DELETE) {
Response response = ioHandler.delete(request);
responsePacket = ResponseBuilder.buildReplicationResponse(response);
}
responseObserver.onNext(responsePacket);
}
}
}
}
...@@ -3,21 +3,42 @@ package hpdos.lib; ...@@ -3,21 +3,42 @@ package hpdos.lib;
import java.util.HashMap; import java.util.HashMap;
public class MemoryStorage implements StorageService{ public class MemoryStorage implements StorageService{
private final HashMap<String, String> memoryKVStore; private final HashMap<String, StorageModel> memoryKVStore;
public MemoryStorage() { public MemoryStorage() {
this.memoryKVStore = new HashMap<>(); this.memoryKVStore = new HashMap<>();
} }
@Override @Override
public String put(String key, String value) { public boolean create(String key, StorageModel value) {
try { try {
return memoryKVStore.put(key, value); if (memoryKVStore.containsKey(key))
return false;
memoryKVStore.put(key, value);
} catch (Exception e) { } catch (Exception e) {
return null; return false;
} }
return true;
} }
@Override @Override
public String getByKey(String key) { public StorageModel readByKey(String key) {
return memoryKVStore.get(key); return memoryKVStore.get(key);
} }
@Override
public boolean update(String key, StorageModel value) {
if (!memoryKVStore.containsKey(key))
return false;
memoryKVStore.put(key, value);
return true;
}
@Override
public boolean delete(String key) {
try {
memoryKVStore.remove(key);
} catch (Exception e) {
return false;
}
return true;
}
} }
package hpdos.lib;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
public class StorageModel {
private int version;
private int dataSize;
private String key;
private long crc;
private int accessType;
private String value;
private String owner;
public StorageModel(int version, int dataSize, String key, int accessType, String owner, String value) {
this.version = version;
this.dataSize = dataSize;
this.key = key;
this.accessType = accessType;
this.value = value;
this.owner = owner;
// calculate CRC32 based on the value field
byte[] bytes = value.getBytes();
Checksum checksum = new CRC32();
checksum.update(bytes, 0, bytes.length);
this.crc = checksum.getValue();
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getDataSize() {
return dataSize;
}
public void setDataSize(int dataSize) {
this.dataSize = dataSize;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public long getCrc() {
return crc;
}
public void setCrc(long crc) {
this.crc = crc;
}
public int getAccessType() {
return accessType;
}
public void setAccessType(int accessType) {
this.accessType = accessType;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getOwner() {
return owner;
}
public void setOwner(String owner) {
this.owner = owner;
}
}
package hpdos.lib; package hpdos.lib;
public interface StorageService { public interface StorageService {
String put(String key, String value); boolean create(String key, StorageModel value);
String getByKey(String key); StorageModel readByKey(String key);
boolean update(String key, StorageModel value);
boolean delete(String key);
} }
package hpdos.messageformat; package hpdos.message;
public class MessageConstants { public class MessageConstants {
public static final int INIT_VERSION = 0;
public static final int INVALID_VERSION = -1;
// 00 to 99 - Client Server Interaction operations // 00 to 99 - Client Server Interaction operations
public static final int PACKET_METADATA_REQUEST = 0; public static final int PACKET_METADATA_REQUEST = 0;
public static final int PACKET_METADATA_RESPONSE = 1; public static final int PACKET_METADATA_RESPONSE = 1;
// Distinguishing ACK and NACK packets (might be redundant) // Distinguishing ACK and NACK packets
public static final boolean STATUS_OK = true; public static final int STATUS_OK = 200;
public static final boolean STATUS_FAIL = false; public static final int STATUS_OWNER_MISMATCH = 401;
public static final int STATUS_REPLICATE_FAILED = 402;
// 100 to 199 - HPDOS System internal operations // 100 to 199 - HPDOS System internal operations
public static final int MASTER_HEARTBEAT = 100; public static final int MASTER_HEARTBEAT = 100;
public static final int METADATA_LOOKUP_REQUEST = 101; public static final int METADATA_CREATE = 101;
public static final int METADATA_LOOKUP_RESPONSE = 102; public static final int METADATA_READ = 102;
public static final int METADATA_UPDATE = 103;
public static final int METADATA_DELETE = 104;
} }
package hpdos.messageformat; package hpdos.message;
import hpdos.grpc.Packet; import hpdos.grpc.Packet;
import hpdos.grpc.PacketFormatProto;
import hpdos.grpc.Request; import hpdos.grpc.Request;
import java.util.ArrayList; import java.util.ArrayList;
......
package hpdos.messageformat; package hpdos.message;
import java.util.ArrayList;
import hpdos.grpc.*; import hpdos.grpc.*;
import java.util.ArrayList;
public class ResponseBuilder { public class ResponseBuilder {
public static Ack buildAck(int version, int dataSize, String key, int crc, String value) { public static Ack buildAck(int version, int dataSize, String key, long crc, String value) {
Ack.Builder ack = Ack.newBuilder(); Ack.Builder ack = Ack.newBuilder();
ack.setStatus(MessageConstants.STATUS_OK); ack.setStatus(MessageConstants.STATUS_OK);
ack.setKey(key); ack.setKey(key);
...@@ -16,18 +16,17 @@ public class ResponseBuilder { ...@@ -16,18 +16,17 @@ public class ResponseBuilder {
return ack.build(); return ack.build();
} }
public static Nack buildNack(int version, int reason, String key) { public static Nack buildNack(int version, int status, String key) {
Nack.Builder nack = Nack.newBuilder(); Nack.Builder nack = Nack.newBuilder();
nack.setStatus(MessageConstants.STATUS_FAIL); nack.setStatus(status);
nack.setKey(key); nack.setKey(key);
nack.setVersion(version); nack.setVersion(version);
nack.setReason(reason);
return nack.build(); return nack.build();
} }
public static Response buildResponsePacket(int operationType, public static Response buildResponsePacket(int operationType,
ArrayList<Ack> acks, ArrayList<Ack> acks,
ArrayList<Nack> nacks) { ArrayList<Nack> nacks) {
Response.Builder response = Response.newBuilder(); Response.Builder response = Response.newBuilder();
response.setOperationType(operationType); response.setOperationType(operationType);
if (acks != null) if (acks != null)
...@@ -41,10 +40,40 @@ public class ResponseBuilder { ...@@ -41,10 +40,40 @@ public class ResponseBuilder {
return response.build(); return response.build();
} }
public static Response buildResponsePacket(int operationType,
Ack ack,
Nack nack) {
Response.Builder response = Response.newBuilder();
response.setOperationType(operationType);
if (ack != null)
response.addAck(ack);
else
response.clearAck();
if (nack != null)
response.addNack(nack);
else
response.clearNack();
return response.build();
}
public static Packet buildPacket(ArrayList<Response> responses) { public static Packet buildPacket(ArrayList<Response> responses) {
Packet.Builder packet = Packet.newBuilder(); Packet.Builder packet = Packet.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE); packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addAllResponse(responses); packet.addAllResponse(responses);
return packet.build(); return packet.build();
} }
public static Packet buildPacket(Response response) {
Packet.Builder packet = Packet.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addResponse(response);
return packet.build();
}
public static ReplicationResponse buildReplicationResponse(Response response) {
ReplicationResponse.Builder packet = ReplicationResponse.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addResponse(response);
return packet.build();
}
} }
...@@ -10,7 +10,10 @@ service NetworkService { ...@@ -10,7 +10,10 @@ service NetworkService {
/** /**
Receive Packet from client Receive Packet from client
*/ */
rpc getMetadata(Packet) returns (Packet) {} rpc readMetadata(Packet) returns (Packet) {}
rpc createMetadata(Packet) returns (Packet) {}
rpc updateMetadata(Packet) returns (Packet) {}
rpc deleteMetadata(Packet) returns (Packet) {}
} }
...@@ -25,8 +28,10 @@ message Request { ...@@ -25,8 +28,10 @@ message Request {
int32 version = 2; int32 version = 2;
int32 dataSize = 3; int32 dataSize = 3;
string key = 4; string key = 4;
int32 crc = 5; int64 crc = 5;
string value = 6; int32 accessType = 6;
string clientID = 7;
string value = 8;
} }
message Response { message Response {
...@@ -36,17 +41,16 @@ message Response { ...@@ -36,17 +41,16 @@ message Response {
} }
message Ack { message Ack {
bool status = 1; int32 status = 1;
int32 version = 2; int32 version = 2;
int32 dataSize = 3; int32 dataSize = 3;
string key = 4; string key = 4;
int32 crc = 5; int64 crc = 5;
string value = 6; string value = 6;
} }
message Nack { message Nack {
bool status = 1; int32 status = 1;
int32 reason = 2; string key = 2;
string key = 3; int32 version = 3;
int32 version = 4;
} }
\ No newline at end of file
syntax = "proto3";
package hpdos.grpc;
import "PacketFormat.proto";
option java_multiple_files = true;
option java_package = "hpdos.grpc";
option java_outer_classname = "Replicate";
service ReplicationService {
/**
Receive Packet from client
*/
rpc replicateMetadata(ReplicationRequest) returns (ReplicationResponse) {}
}
message ReplicationRequest {
int32 packetType = 1;
repeated Request request = 2;
}
message ReplicationResponse {
int32 packetType = 1;
repeated Response response = 2;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment