Commit be61c6e3 authored by NILANJAN DAW's avatar NILANJAN DAW

Added Asynchronous Replication service for replicating data

Preliminary tests down on key create and read
parent e42f7bf9
......@@ -14,14 +14,21 @@ import io.grpc.ManagedChannelBuilder;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.*;
import java.util.concurrent.*;
public class ClientRunner {
public static final int parallelCount = 1;
private final String clientID;
public ClientRunner() {
clientID = UUID.randomUUID().toString();
}
public String getGreeting() {
return "Hello World!";
}
public void initClient() {
public String initClient(String id) {
final ManagedChannel channel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
......@@ -29,20 +36,53 @@ public class ClientRunner {
String key = "dummy", value = "dummy";
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel);
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_LOOKUP_REQUEST,
0, 0, key, 0, value));
// create a metadata block
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE,
0, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
Timestamp timestamp = Timestamp.from(Instant.now());
Packet response = stub.getMetadata(packet);
Timestamp timestampCreateStart = Timestamp.from(Instant.now());
Packet response = stub.createMetadata(packet);
System.out.println(response);
System.out.println("MDS create time taken in ms: " + (Timestamp.from(Instant.now()).getTime()
- timestampCreateStart.getTime()));
// read back the metadata
request.clear();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
packet = RequestBuilder.buildPacket(request);
Timestamp timestampReadStart = Timestamp.from(Instant.now());
response = stub.readMetadata(packet);
System.out.println(response);
System.out.println("Time taken in ms: " + (Timestamp.from(Instant.now()).getTime() - timestamp.getTime()));
System.out.println("MDS create time taken in ms: " + (Timestamp.from(Instant.now()).getTime()
- timestampReadStart.getTime()));
channel.shutdown();
return id;
}
public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ClientRunner clientRunner = new ClientRunner();
System.out.println(clientRunner.getGreeting());
clientRunner.initClient();
Thread.sleep(1000);
ExecutorService executorService = Executors.newFixedThreadPool(parallelCount);
Set<Callable<String>> callables = new HashSet<>();
for (int i = 0; i < parallelCount; i++) {
int finalI = i;
callables.add(new Callable<String>() {
@Override
public String call() throws Exception {
return clientRunner.initClient(Integer.toString(finalI));
}
});
}
List<Future<String>> futures = executorService.invokeAll(callables);
for (Future<String> future: futures) {
System.out.println(future.get());
}
// clientRunner.initClient();
}
}
package HpdosClient.MessageFormat;
public class MessageConstants {
public static final int INIT_VERSION = 0;
public static final int INVALID_VERSION = -1;
public static final int METADATA_ACCESS_PRIVATE = 700;
public static final int METADATA_ACCESS_SHARED = 777;
// 00 to 99 - Client Server Interaction operations
public static final int PACKET_METADATA_REQUEST = 0;
public static final int PACKET_METADATA_RESPONSE = 1;
// Distinguishing ACK and NACK packets (might be redundant)
public static final boolean STATUS_OK = true;
public static final boolean STATUS_FAIL = false;
// Distinguishing ACK and NACK packets
public static final int STATUS_OK = 200;
public static final int STATUS_OWNER_MISMATCH = 401;
public static final int STATUS_REPLICATE_FAILED = 402;
public static final int STATUS_SERVER_NOT_MASTER = 403;
public static final int STATUS_KEY_NOT_FOUND = 404;
// 101 to 199 - HPDOS System internal operations
public static final int METADATA_LOOKUP_REQUEST = 101;
public static final int METADATA_LOOKUP_RESPONSE = 102;
// 100 to 199 - HPDOS System internal operations
public static final int MASTER_HEARTBEAT = 100;
public static final int METADATA_CREATE = 101;
public static final int METADATA_READ = 102;
public static final int METADATA_UPDATE = 103;
public static final int METADATA_DELETE = 104;
}
......@@ -7,14 +7,18 @@ import java.util.ArrayList;
public class RequestBuilder {
public static Request buildRequest(int operationType, int version,
int dataSize, String key, int crc, String value) {
int dataSize, String key, int crc,
int accessType, String clientID, String value) {
Request.Builder request = Request.newBuilder();
request.setOperationType(operationType);
request.setVersion(version);
request.setDataSize(dataSize);
request.setKey(key);
request.setCrc(crc);
request.setAccessType(accessType);
request.setClientID(clientID);
request.setValue(value);
return request.build();
}
......
......@@ -5,9 +5,8 @@ import java.util.ArrayList;
public class ResponseBuilder {
public static Ack buildAck(int version, int dataSize, String key, int crc, String value) {
public static Ack buildAck(int version, int dataSize, String key, long crc, String value) {
Ack.Builder ack = Ack.newBuilder();
ack.setStatus(MessageConstants.STATUS_OK);
ack.setKey(key);
ack.setVersion(version);
ack.setDataSize(dataSize);
......@@ -16,26 +15,25 @@ public class ResponseBuilder {
return ack.build();
}
public static Nack buildNack(int version, int reason, String key) {
public static Nack buildNack(int version, String key) {
Nack.Builder nack = Nack.newBuilder();
nack.setStatus(MessageConstants.STATUS_FAIL);
nack.setKey(key);
nack.setVersion(version);
nack.setReason(reason);
return nack.build();
}
public static Response buildResponsePacket(int operationType,
ArrayList<Ack> acks,
ArrayList<Nack> nacks) {
public static Response buildResponsePacket(int operationType, int status,
Ack ack,
Nack nack) {
Response.Builder response = Response.newBuilder();
response.setOperationType(operationType);
if (acks != null)
response.addAllAck(acks);
response.setStatus(status);
if (ack != null)
response.setAck(ack);
else
response.clearAck();
if (nacks != null)
response.addAllNack(nacks);
if (nack != null)
response.setNack(nack);
else
response.clearNack();
return response.build();
......@@ -47,4 +45,11 @@ public class ResponseBuilder {
packet.addAllResponse(responses);
return packet.build();
}
public static Packet buildPacket(Response response) {
Packet.Builder packet = Packet.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addResponse(response);
return packet.build();
}
}
......@@ -10,7 +10,10 @@ service NetworkService {
/**
Receive Packet from client
*/
rpc getMetadata(Packet) returns (Packet) {}
rpc readMetadata(Packet) returns (Packet) {}
rpc createMetadata(Packet) returns (Packet) {}
rpc updateMetadata(Packet) returns (Packet) {}
rpc deleteMetadata(Packet) returns (Packet) {}
}
......@@ -25,28 +28,29 @@ message Request {
int32 version = 2;
int32 dataSize = 3;
string key = 4;
int32 crc = 5;
string value = 6;
int64 crc = 5;
int32 accessType = 6;
string clientID = 7;
string value = 8;
}
message Response {
int32 operationType = 1;
repeated Ack ack = 2;
repeated Nack nack = 3;
int32 status = 2;
Ack ack = 3;
Nack nack = 4;
}
message Ack {
bool status = 1;
int32 version = 2;
int32 dataSize = 3;
string key = 4;
int32 crc = 5;
int64 crc = 5;
string value = 6;
}
message Nack {
bool status = 1;
int32 reason = 2;
string key = 3;
int32 version = 4;
string key = 2;
int32 version = 3;
}
\ No newline at end of file
......@@ -4,6 +4,7 @@ public class ConfigConstants {
public static final String HOST = "localhost";
public static final int PORT = 8080;
public static final int HEARTBEAT_INTERVAL = 500;
public static final int REPLICATION_TIMEOUT = 5000;
// Backend types 300-399
public static final int BACKEND_IN_MEMORY = 300;
......
......@@ -10,9 +10,7 @@ import hpdos.handler.HeartbeatHandler;
import hpdos.handler.IOHandler;
import hpdos.handler.NetworkHandler;
import hpdos.handler.ReplicateHandler;
import hpdos.lib.MasterFollower;
import hpdos.lib.MemoryStorage;
import hpdos.lib.StorageService;
import hpdos.lib.*;
import hpdos.message.MessageConstants;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
......@@ -33,12 +31,14 @@ public class MetadataServer {
private int port;
private final String host;
private IOHandler ioHandler;
private ReplicationService replicationService;
public MetadataServer() {
this.followers = new HashMap<>();
this.serverID = UUID.randomUUID().toString();
this.port = 10000 + (int)(Math.random() * 40000);
this.host = "localhost";
this.replicationService = null;
}
public String getGreeting() {
......@@ -47,7 +47,7 @@ public class MetadataServer {
public boolean startMasterServices() {
this.server = ServerBuilder.forPort(ConfigConstants.PORT)
.addService(new NetworkHandler(this.ioHandler))
.addService(new NetworkHandler(this.ioHandler, this.replicationService))
.addService(new HeartbeatHandler(followers, serverID))
.build();
try {
......@@ -63,7 +63,7 @@ public class MetadataServer {
// In case of followers NetworkHandler will only serve read requests for private metadata
// Other network handler services will fail
this.server = ServerBuilder.forPort(port)
.addService(new NetworkHandler(this.ioHandler))
.addService(new NetworkHandler(this.ioHandler, this.replicationService))
.addService(new ReplicateHandler(this.ioHandler))
.build();
try {
......@@ -120,16 +120,19 @@ public class MetadataServer {
StorageService storageService;
switch (backend) {
case ConfigConstants.BACKEND_IN_MEMORY:
storageService = new MemoryStorage();
storageService = new MemoryStorageService();
break;
case ConfigConstants.BINARY_TREE_BACKEND:
case ConfigConstants.LSM_BACKEND:
default: return null;
}
IOHandler ioHandler = new IOHandler(storageService);
return ioHandler;
return new IOHandler(storageService, this.isMaster);
}
private void cleanup () {
if (this.replicationService != null)
this.replicationService.cleanup();
}
public static void main(String[] args) {
MetadataServer metaDataServer = new MetadataServer();
......@@ -146,6 +149,8 @@ public class MetadataServer {
System.out.println("Searching for MetadataMaster");
metaDataServer.announceToMaster();
if (metaDataServer.isMaster) {
metaDataServer.replicationService = new InlineReplicationService(metaDataServer.followers);
System.out.println("Started master replication module");
boolean status = metaDataServer.startMasterServices();
System.out.println("Master ID: " + metaDataServer.serverID);
if (status) {
......@@ -167,5 +172,8 @@ public class MetadataServer {
System.out.println("Failed to create server");
}
// Adding a shutdown hook
Runtime current = Runtime.getRuntime();
current.addShutdownHook(new Thread(metaDataServer::cleanup));
}
}
......@@ -11,8 +11,10 @@ import hpdos.message.ResponseBuilder;
public class IOHandler {
StorageService storageService;
public IOHandler(StorageService storageService) {
private final boolean isMaster;
public IOHandler(StorageService storageService, boolean isMaster) {
this.storageService = storageService;
this.isMaster = isMaster;
}
public Response create(Request request) {
......@@ -34,13 +36,15 @@ public class IOHandler {
}
else {
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
MessageConstants.STATUS_REPLICATE_FAILED,
request.getKey());
}
if (status)
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE,
ack, nack);
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_OK,
ack, null);
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_IO_WRITE_FAILED,
null, nack);
}
public Response update(Request request) {
......@@ -49,5 +53,40 @@ public class IOHandler {
public Response delete(Request request) {
return null;
}
public Response read(Request request) {
Ack ack = null; Nack nack = null;
int status;
// Only Metadata Master can serve shared object lookup requests
if (!isMaster && request.getAccessType() == MessageConstants.METADATA_ACCESS_SHARED) {
status = MessageConstants.STATUS_SERVER_NOT_MASTER;
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey());
} else {
StorageModel blob = storageService.readByKey(request.getKey());
if (blob == null) {
status = MessageConstants.STATUS_KEY_NOT_FOUND;
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey());
} else if (request.getAccessType() == MessageConstants.METADATA_ACCESS_PRIVATE &&
!request.getClientID().equals(blob.getOwner())) {
status = MessageConstants.STATUS_OWNER_MISMATCH;
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey());
} else {
status = MessageConstants.STATUS_OK;
ack = ResponseBuilder.buildAck(MessageConstants.INIT_VERSION,
blob.getDataSize(),
blob.getKey(),
blob.getCrc(),
blob.getValue());
}
}
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, status,
ack, nack);
}
}
package hpdos.handler;
import hpdos.grpc.*;
import hpdos.message.*;
import hpdos.lib.ReplicationService;
import hpdos.message.RequestBuilder;
import hpdos.message.ResponseBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
private IOHandler ioHandler;
public NetworkHandler(IOHandler ioHandler) {
private final IOHandler ioHandler;
private final ReplicationService replicationService;
public NetworkHandler(IOHandler ioHandler, ReplicationService replicationService) {
this.ioHandler = ioHandler;
this.replicationService = replicationService;
}
@Override
public void readMetadata(Packet request,
public void readMetadata(Packet requestPacket,
StreamObserver<Packet> responseObserver) {
System.out.println("Data received" + request.toString());
String key = "dummy";
String value = "dummy";
ArrayList<Ack> acks = new ArrayList<>();
acks.add(ResponseBuilder.buildAck(0, 100, key, 0, value));
ArrayList<Response> responses = new ArrayList<>();
responses.add(ResponseBuilder.
buildResponsePacket(MessageConstants.METADATA_READ, acks, null));
Packet packet = ResponseBuilder.buildPacket(responses);
System.out.println("Data received" + requestPacket.toString());
for (Request request: requestPacket.getRequestList()) {
Response response = ioHandler.read(request);
Packet packet = ResponseBuilder.buildPacket(response);
System.out.println(packet);
responseObserver.onNext(packet);
}
responseObserver.onCompleted();
}
@Override
public void createMetadata(Packet request, StreamObserver<Packet> responseObserver) {
super.createMetadata(request, responseObserver);
public void createMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) {
System.out.println("new create request " + requestPacket);
for (Request request: requestPacket.getRequestList()) {
ioHandler.create(request);
}
System.out.println("Added to local memory");
ReplicationRequest replicationRequest = RequestBuilder.
buildReplicationRequest(new ArrayList<>(requestPacket.getRequestList()));
ReplicationResponse replicationResponse = null;
try {
System.out.println("starting replication");
replicationResponse = replicationService.replicateMetadata(replicationRequest);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("replication complete");
Packet packet = ResponseBuilder.buildPacket(new ArrayList<>(replicationResponse.getResponseList()));
System.out.println(packet);
responseObserver.onNext(packet);
responseObserver.onCompleted();
}
@Override
......
......@@ -28,6 +28,7 @@ public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceI
}
responseObserver.onNext(responsePacket);
}
responseObserver.onCompleted();
}
}
......
package hpdos.lib;
import hpdos.grpc.ReplicationRequest;
import hpdos.grpc.ReplicationResponse;
import hpdos.grpc.ReplicationServiceGrpc;
import hpdos.grpc.Response;
import hpdos.message.MessageConstants;
import hpdos.message.ResponseBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.*;
import java.util.concurrent.*;
public class InlineReplicationService implements ReplicationService {
private final HashMap<String, MasterFollower> followers;
private final HashMap<String, ManagedChannel> channels;
public InlineReplicationService(HashMap<String, MasterFollower> followers) {
this.followers = followers;
this.channels = new HashMap<>();
for (MasterFollower follower: this.followers.values()) {
ManagedChannel channel = ManagedChannelBuilder
.forAddress(follower.getIp(), follower.getPort())
.usePlaintext()
.build();
channels.put(follower.getFollowerID(), channel);
}
}
@Override
public void cleanup() {
for (ManagedChannel channel: channels.values())
channel.shutdown();
}
private void establishChannels() {
for (String followerID: followers.keySet()) {
if (!channels.containsKey(followerID)) {
MasterFollower follower = followers.get(followerID);
ManagedChannel channel = ManagedChannelBuilder
.forAddress(follower.getIp(), follower.getPort())
.usePlaintext()
.build();
channels.put(follower.getFollowerID(), channel);
}
}
}
@Override
public ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(followers.size());
Set<Callable<ReplicationResponse>> callables = new HashSet<>();
// new followers have joined or left.
// TODO: Handle follower leaving scenario
// FIXME: fix edge case where equal number of followers leaving and joining won't trigger connection reestablishment
if (channels.size() != followers.size()) {
establishChannels();
}
for (ManagedChannel channel: channels.values()) {
callables.add(() -> {
ReplicationServiceGrpc.ReplicationServiceBlockingStub stub =
ReplicationServiceGrpc.newBlockingStub(channel);
return stub.replicateMetadata(replicationRequest);
});
}
List<Future<ReplicationResponse>> futures = executorService.invokeAll(callables);
HashMap<String, Response> responseHashMap = new HashMap<>();
for (Future<ReplicationResponse> future: futures) {
ReplicationResponse replicationResponse;
replicationResponse = future.get(); //TODO: Add and handle get timeout. Timeout related constants already added
for (Response receivedResponse: replicationResponse.getResponseList()) {
int status = receivedResponse.getStatus();
if (status == MessageConstants.STATUS_OK) {
if (!responseHashMap.containsKey(receivedResponse.getAck().getKey()))
responseHashMap.put(receivedResponse.getAck().getKey(), receivedResponse);
} else {
responseHashMap.put(receivedResponse.getNack().getKey(), receivedResponse);
}
}
}
return ResponseBuilder.
buildReplicationResponse(new ArrayList<>(responseHashMap.values()));
}
}
......@@ -2,9 +2,9 @@ package hpdos.lib;
import java.util.HashMap;
public class MemoryStorage implements StorageService{
public class MemoryStorageService implements StorageService{
private final HashMap<String, StorageModel> memoryKVStore;
public MemoryStorage() {
public MemoryStorageService() {
this.memoryKVStore = new HashMap<>();
}
@Override
......
package hpdos.lib;
import hpdos.grpc.ReplicationRequest;
import hpdos.grpc.ReplicationResponse;
import java.util.concurrent.ExecutionException;
public interface ReplicationService {
abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException;
abstract void cleanup();
}
......@@ -4,6 +4,9 @@ public class MessageConstants {
public static final int INIT_VERSION = 0;
public static final int INVALID_VERSION = -1;
public static final int METADATA_ACCESS_PRIVATE = 700;
public static final int METADATA_ACCESS_SHARED = 777;
// 00 to 99 - Client Server Interaction operations
public static final int PACKET_METADATA_REQUEST = 0;
public static final int PACKET_METADATA_RESPONSE = 1;
......@@ -12,6 +15,10 @@ public class MessageConstants {
public static final int STATUS_OK = 200;
public static final int STATUS_OWNER_MISMATCH = 401;
public static final int STATUS_REPLICATE_FAILED = 402;
public static final int STATUS_SERVER_NOT_MASTER = 403;
public static final int STATUS_KEY_NOT_FOUND = 404;
public static final int STATUS_REPLICATION_TIMEOUT = 405;
public static final int STATUS_IO_WRITE_FAILED = 406;
// 100 to 199 - HPDOS System internal operations
public static final int MASTER_HEARTBEAT = 100;
......
package hpdos.message;
import hpdos.grpc.Packet;
import hpdos.grpc.ReplicationRequest;
import hpdos.grpc.Request;
import java.util.ArrayList;
......@@ -26,4 +27,11 @@ public class RequestBuilder {
return packet.build();
}
public static ReplicationRequest buildReplicationRequest(ArrayList<Request> requests) {
ReplicationRequest.Builder builder = ReplicationRequest.newBuilder();
builder.setPacketType(MessageConstants.PACKET_METADATA_REQUEST);
builder.addAllRequest(requests);
return builder.build();
}
}
......@@ -7,7 +7,6 @@ public class ResponseBuilder {
public static Ack buildAck(int version, int dataSize, String key, long crc, String value) {
Ack.Builder ack = Ack.newBuilder();
ack.setStatus(MessageConstants.STATUS_OK);
ack.setKey(key);
ack.setVersion(version);
ack.setDataSize(dataSize);
......@@ -16,41 +15,25 @@ public class ResponseBuilder {
return ack.build();
}
public static Nack buildNack(int version, int status, String key) {
public static Nack buildNack(int version, String key) {
Nack.Builder nack = Nack.newBuilder();
nack.setStatus(status);
nack.setKey(key);
nack.setVersion(version);
return nack.build();
}
public static Response buildResponsePacket(int operationType,
ArrayList<Ack> acks,
ArrayList<Nack> nacks) {
Response.Builder response = Response.newBuilder();
response.setOperationType(operationType);
if (acks != null)
response.addAllAck(acks);
else
response.clearAck();
if (nacks != null)
response.addAllNack(nacks);
else
response.clearNack();
return response.build();
}
public static Response buildResponsePacket(int operationType,
public static Response buildResponsePacket(int operationType, int status,
Ack ack,
Nack nack) {
Response.Builder response = Response.newBuilder();
response.setOperationType(operationType);
response.setStatus(status);
if (ack != null)
response.addAck(ack);
response.setAck(ack);
else
response.clearAck();
if (nack != null)
response.addNack(nack);
response.setNack(nack);
else
response.clearNack();
return response.build();
......@@ -76,4 +59,11 @@ public class ResponseBuilder {
packet.addResponse(response);
return packet.build();
}
public static ReplicationResponse buildReplicationResponse(ArrayList<Response> response) {
ReplicationResponse.Builder packet = ReplicationResponse.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addAllResponse(response);
return packet.build();
}
}
......@@ -36,12 +36,13 @@ message Request {
message Response {
int32 operationType = 1;
repeated Ack ack = 2;
repeated Nack nack = 3;
int32 status = 2;
Ack ack = 3;
Nack nack = 4;
}
message Ack {
int32 status = 1;
int32 version = 2;
int32 dataSize = 3;
string key = 4;
......@@ -50,7 +51,6 @@ message Ack {
}
message Nack {
int32 status = 1;
string key = 2;
int32 version = 3;
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment