Commit 6621f187 authored by NILANJAN DAW's avatar NILANJAN DAW

Added support for full CRUD queries.

Needs more testing
parent be61c6e3
...@@ -33,31 +33,36 @@ public class ClientRunner { ...@@ -33,31 +33,36 @@ public class ClientRunner {
forAddress(ConfigConstants.HOST, ConfigConstants.PORT) forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext() .usePlaintext()
.build(); .build();
String key = "dummy", value = "dummy"; double averageReadTime = 0, averageWriteTime = 0;
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel); for (int i = 1; i <= 10; i++) {
ArrayList<Request> request = new ArrayList<>();
// create a metadata block String key = Integer.toString((int) (Math.random() * Integer.MAX_VALUE)), value = "dummy";
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE, NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel);
0, value.length(), key, ArrayList<Request> request = new ArrayList<>();
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
Timestamp timestampCreateStart = Timestamp.from(Instant.now());
Packet response = stub.createMetadata(packet);
System.out.println(response);
System.out.println("MDS create time taken in ms: " + (Timestamp.from(Instant.now()).getTime()
- timestampCreateStart.getTime()));
// read back the metadata // create a metadata block
request.clear(); request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE,
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ, 0, value.length(), key,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, "")); 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
packet = RequestBuilder.buildPacket(request); Packet packet = RequestBuilder.buildPacket(request);
Timestamp timestampReadStart = Timestamp.from(Instant.now()); long timestampCreateStart = System.currentTimeMillis();
response = stub.readMetadata(packet); Packet response = stub.createMetadata(packet);
System.out.println(response);
System.out.println("MDS create time taken in ms: " + (Timestamp.from(Instant.now()).getTime() averageWriteTime = (averageWriteTime * (i - 1) + (System.currentTimeMillis() - timestampCreateStart)) / i;
- timestampReadStart.getTime())); // read back the metadata
request.clear();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
packet = RequestBuilder.buildPacket(request);
long timestampReadStart = System.currentTimeMillis();
response = stub.readMetadata(packet);
averageReadTime = (averageReadTime * (i - 1) + (System.currentTimeMillis() - timestampReadStart)) / i;
// if (i % 100 == 0)
System.out.println("Average MDS read time: "
+ averageReadTime
+ " ms average create time: " + averageWriteTime + "ms");
}
channel.shutdown(); channel.shutdown();
return id; return id;
} }
...@@ -71,12 +76,7 @@ public class ClientRunner { ...@@ -71,12 +76,7 @@ public class ClientRunner {
Set<Callable<String>> callables = new HashSet<>(); Set<Callable<String>> callables = new HashSet<>();
for (int i = 0; i < parallelCount; i++) { for (int i = 0; i < parallelCount; i++) {
int finalI = i; int finalI = i;
callables.add(new Callable<String>() { callables.add(() -> clientRunner.initClient(Integer.toString(finalI)));
@Override
public String call() throws Exception {
return clientRunner.initClient(Integer.toString(finalI));
}
});
} }
List<Future<String>> futures = executorService.invokeAll(callables); List<Future<String>> futures = executorService.invokeAll(callables);
......
...@@ -6,6 +6,7 @@ import hpdos.grpc.Request; ...@@ -6,6 +6,7 @@ import hpdos.grpc.Request;
import hpdos.grpc.Response; import hpdos.grpc.Response;
import hpdos.lib.StorageModel; import hpdos.lib.StorageModel;
import hpdos.lib.StorageService; import hpdos.lib.StorageService;
import hpdos.lib.StoredModel;
import hpdos.message.MessageConstants; import hpdos.message.MessageConstants;
import hpdos.message.ResponseBuilder; import hpdos.message.ResponseBuilder;
...@@ -26,34 +27,81 @@ public class IOHandler { ...@@ -26,34 +27,81 @@ public class IOHandler {
request.getAccessType(), request.getAccessType(),
request.getClientID(), request.getClientID(),
request.getValue()); request.getValue());
boolean status = storageService.create(request.getKey(), blob); StoredModel storedData = storageService.create(request.getKey(), blob);
if (status) { if (storedData.getStatus() == MessageConstants.STATUS_OK) {
ack = ResponseBuilder.buildAck(MessageConstants.INIT_VERSION, ack = ResponseBuilder.buildAck(storedData.getData().getVersion(),
blob.getDataSize(), storedData.getData().getDataSize(),
blob.getKey(), storedData.getData().getKey(),
blob.getCrc(), storedData.getData().getCrc(),
blob.getValue()); storedData.getData().getValue());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_OK,
ack, null);
} }
else { else {
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION, nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey()); request.getKey());
}
if (status)
return ResponseBuilder.buildResponsePacket( return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_OK, MessageConstants.METADATA_CREATE, storedData.getStatus(),
ack, null); null, nack);
return ResponseBuilder.buildResponsePacket( }
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_IO_WRITE_FAILED,
null, nack);
} }
public Response update(Request request) { public Response update(Request request) {
return null; Ack ack = null; Nack nack = null;
StorageModel blob = new StorageModel(
request.getVersion(),
request.getDataSize(),
request.getKey(),
request.getAccessType(),
request.getClientID(),
request.getValue());
StoredModel storedData = storageService.update(request.getKey(), blob);
if (storedData.getStatus() == MessageConstants.STATUS_OK) {
ack = ResponseBuilder.buildAck(storedData.getData().getVersion(),
storedData.getData().getDataSize(),
storedData.getData().getKey(),
storedData.getData().getCrc(),
storedData.getData().getValue());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_OK,
ack, null);
}
else {
int version = (storedData.getStatus() == MessageConstants.STATUS_UPDATE_VERSION_MISMATCH)?
storedData.getData().getVersion(): MessageConstants.INVALID_VERSION;
nack = ResponseBuilder.buildNack(version, request.getKey());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, storedData.getStatus(),
null, nack);
}
} }
public Response delete(Request request) { public Response delete(Request request) {
return null; Ack ack = null; Nack nack = null;
StoredModel storedData = storageService.delete(request.getKey(), request.getVersion());
if (storedData.getStatus() == MessageConstants.STATUS_OK) {
ack = ResponseBuilder.buildAck(storedData.getData().getVersion(),
storedData.getData().getDataSize(),
storedData.getData().getKey(),
storedData.getData().getCrc(),
storedData.getData().getValue());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_OK,
ack, null);
}
else {
int version = (storedData.getStatus() == MessageConstants.STATUS_UPDATE_VERSION_MISMATCH)?
storedData.getData().getVersion(): MessageConstants.INVALID_VERSION;
nack = ResponseBuilder.buildNack(version, request.getKey());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, storedData.getStatus(),
null, nack);
}
} }
public Response read(Request request) { public Response read(Request request) {
...@@ -65,24 +113,24 @@ public class IOHandler { ...@@ -65,24 +113,24 @@ public class IOHandler {
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION, nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey()); request.getKey());
} else { } else {
StorageModel blob = storageService.readByKey(request.getKey()); StoredModel readData = storageService.readByKey(request.getKey());
if (blob == null) { if (readData.getStatus() == MessageConstants.STATUS_KEY_NOT_FOUND) {
status = MessageConstants.STATUS_KEY_NOT_FOUND; status = MessageConstants.STATUS_KEY_NOT_FOUND;
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION, nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey()); request.getKey());
} else if (request.getAccessType() == MessageConstants.METADATA_ACCESS_PRIVATE && } else if (readData.getData().getAccessType() == MessageConstants.METADATA_ACCESS_PRIVATE &&
!request.getClientID().equals(blob.getOwner())) { !request.getClientID().equals(readData.getData().getOwner())) {
status = MessageConstants.STATUS_OWNER_MISMATCH; status = MessageConstants.STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS;
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION, nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey()); request.getKey());
} else { } else {
status = MessageConstants.STATUS_OK; status = MessageConstants.STATUS_OK;
ack = ResponseBuilder.buildAck(MessageConstants.INIT_VERSION, ack = ResponseBuilder.buildAck(readData.getData().getVersion(),
blob.getDataSize(), readData.getData().getDataSize(),
blob.getKey(), readData.getData().getKey(),
blob.getCrc(), readData.getData().getCrc(),
blob.getValue()); readData.getData().getValue());
} }
} }
return ResponseBuilder.buildResponsePacket( return ResponseBuilder.buildResponsePacket(
......
package hpdos.handler; package hpdos.handler;
import com.google.common.base.Stopwatch;
import hpdos.grpc.*; import hpdos.grpc.*;
import hpdos.lib.ReplicationService; import hpdos.lib.ReplicationService;
import hpdos.message.RequestBuilder; import hpdos.message.RequestBuilder;
...@@ -34,10 +35,48 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { ...@@ -34,10 +35,48 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
@Override @Override
public void createMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) { public void createMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) {
System.out.println("new create request " + requestPacket); System.out.println("new create request " + requestPacket);
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
for (Request request: requestPacket.getRequestList()) { for (Request request: requestPacket.getRequestList()) {
ioHandler.create(request); ioHandler.create(request);
} }
stopwatch.stop();
System.out.println("Added to local memory " + stopwatch);
stopwatch.reset();
stopwatch.start();
Packet packet = replicate(requestPacket);
stopwatch.stop();
System.out.println("Replication time " + stopwatch);
responseObserver.onNext(packet);
responseObserver.onCompleted();
}
@Override
public void updateMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) {
System.out.println("new create request " + requestPacket);
for (Request request: requestPacket.getRequestList()) {
ioHandler.update(request);
}
System.out.println("Added to local memory");
Packet packet = replicate(requestPacket);
responseObserver.onNext(packet);
responseObserver.onCompleted();
}
@Override
public void deleteMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) {
System.out.println("new create request " + requestPacket);
for (Request request: requestPacket.getRequestList()) {
ioHandler.delete(request);
}
System.out.println("Added to local memory"); System.out.println("Added to local memory");
Packet packet = replicate(requestPacket);
responseObserver.onNext(packet);
responseObserver.onCompleted();
}
private Packet replicate(Packet requestPacket) {
ReplicationRequest replicationRequest = RequestBuilder. ReplicationRequest replicationRequest = RequestBuilder.
buildReplicationRequest(new ArrayList<>(requestPacket.getRequestList())); buildReplicationRequest(new ArrayList<>(requestPacket.getRequestList()));
ReplicationResponse replicationResponse = null; ReplicationResponse replicationResponse = null;
...@@ -50,17 +89,6 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { ...@@ -50,17 +89,6 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
System.out.println("replication complete"); System.out.println("replication complete");
Packet packet = ResponseBuilder.buildPacket(new ArrayList<>(replicationResponse.getResponseList())); Packet packet = ResponseBuilder.buildPacket(new ArrayList<>(replicationResponse.getResponseList()));
System.out.println(packet); System.out.println(packet);
responseObserver.onNext(packet); return packet;
responseObserver.onCompleted();
}
@Override
public void updateMetadata(Packet request, StreamObserver<Packet> responseObserver) {
super.updateMetadata(request, responseObserver);
}
@Override
public void deleteMetadata(Packet request, StreamObserver<Packet> responseObserver) {
super.deleteMetadata(request, responseObserver);
} }
} }
package hpdos.lib; package hpdos.lib;
import hpdos.message.MessageConstants;
import java.util.HashMap; import java.util.HashMap;
public class MemoryStorageService implements StorageService{ public class MemoryStorageService implements StorageService {
private final HashMap<String, StorageModel> memoryKVStore; private final HashMap<String, StorageModel> memoryKVStore;
public MemoryStorageService() { public MemoryStorageService() {
this.memoryKVStore = new HashMap<>(); this.memoryKVStore = new HashMap<>();
} }
@Override @Override
public boolean create(String key, StorageModel value) { public StoredModel create(String key, StorageModel value) {
try { try {
if (memoryKVStore.containsKey(key)) if (memoryKVStore.containsKey(key))
return false; return new StoredModel(null, MessageConstants.STATUS_KEY_EXISTS);
memoryKVStore.put(key, value); memoryKVStore.put(key, value);
return new StoredModel(value, MessageConstants.STATUS_OK);
} catch (Exception e) { } catch (Exception e) {
return false; return null;
} }
return true;
} }
@Override @Override
public StorageModel readByKey(String key) { public StoredModel readByKey(String key) {
return memoryKVStore.get(key); if (!memoryKVStore.containsKey(key))
return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
return new StoredModel(memoryKVStore.get(key), MessageConstants.STATUS_OK);
} }
@Override @Override
public boolean update(String key, StorageModel value) { public StoredModel update(String key, StorageModel value) {
if (!memoryKVStore.containsKey(key)) if (!memoryKVStore.containsKey(key))
return false; return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
StorageModel previousValue = memoryKVStore.get(key);
if (previousValue.getVersion() != value.getVersion())
return new StoredModel(previousValue, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
value.setVersion((previousValue.getVersion() + 1) % Integer.MAX_VALUE); // version wraps around
memoryKVStore.put(key, value); memoryKVStore.put(key, value);
return true; return new StoredModel(value, MessageConstants.STATUS_OK);
} }
@Override @Override
public boolean delete(String key) { public StoredModel delete(String key, int version) {
try { if (!memoryKVStore.containsKey(key))
memoryKVStore.remove(key); return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
} catch (Exception e) { StorageModel previousValue = memoryKVStore.get(key);
return false; if (previousValue.getVersion() != version)
} return new StoredModel(previousValue, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
return true; return new StoredModel(memoryKVStore.remove(key), MessageConstants.STATUS_OK);
} }
} }
...@@ -82,3 +82,4 @@ public class StorageModel { ...@@ -82,3 +82,4 @@ public class StorageModel {
this.owner = owner; this.owner = owner;
} }
} }
package hpdos.lib; package hpdos.lib;
public interface StorageService { public interface StorageService {
boolean create(String key, StorageModel value); StoredModel create(String key, StorageModel value);
StorageModel readByKey(String key); StoredModel readByKey(String key);
boolean update(String key, StorageModel value); StoredModel update(String key, StorageModel value);
boolean delete(String key); StoredModel delete(String key, int version);
} }
package hpdos.lib;
public class StoredModel {
private StorageModel data;
private int status;
public StoredModel(StorageModel data, int status) {
this.data = data;
this.status = status;
}
public StorageModel getData() {
return data;
}
public void setData(StorageModel data) {
this.data = data;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
}
...@@ -13,12 +13,14 @@ public class MessageConstants { ...@@ -13,12 +13,14 @@ public class MessageConstants {
// Distinguishing ACK and NACK packets // Distinguishing ACK and NACK packets
public static final int STATUS_OK = 200; public static final int STATUS_OK = 200;
public static final int STATUS_OWNER_MISMATCH = 401; public static final int STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS = 401;
public static final int STATUS_REPLICATE_FAILED = 402; public static final int STATUS_REPLICATION_FAILED = 402;
public static final int STATUS_SERVER_NOT_MASTER = 403; public static final int STATUS_SERVER_NOT_MASTER = 403;
public static final int STATUS_KEY_NOT_FOUND = 404; public static final int STATUS_KEY_NOT_FOUND = 404;
public static final int STATUS_REPLICATION_TIMEOUT = 405; public static final int STATUS_REPLICATION_TIMEOUT = 405;
public static final int STATUS_IO_WRITE_FAILED = 406; public static final int STATUS_IO_WRITE_FAILED = 406;
public static final int STATUS_KEY_EXISTS = 407;
public static final int STATUS_UPDATE_VERSION_MISMATCH = 408;
// 100 to 199 - HPDOS System internal operations // 100 to 199 - HPDOS System internal operations
public static final int MASTER_HEARTBEAT = 100; public static final int MASTER_HEARTBEAT = 100;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment