Commit 484668c3 authored by NILANJAN DAW's avatar NILANJAN DAW

Added Async HPDOS client library support

parent 6852c7fd
app.name="HPDOS-Client" app.name="HPDOS-Client"
app.version="0.1.4" app.version="0.1.4"
app.thread_count=4 app.concurrency=5
app.runtime=1 app.runtime=10
app.data_size=10 app.data_size=10
app.data_conversion_factor=B app.data_conversion_factor=B
app.private_ratio=0.8 app.private_ratio=0.8
......
...@@ -4,11 +4,15 @@ ...@@ -4,11 +4,15 @@
package HpdosClient; package HpdosClient;
import HpdosClient.MessageFormat.MessageConstants; import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.MessageFormat.RequestBuilder; import HpdosClient.lib.StorageModel;
import HpdosClient.lib.StorageService;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import hpdos.grpc.*; import hpdos.grpc.*;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.netty.util.concurrent.FutureListener;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
...@@ -17,7 +21,7 @@ import java.util.*; ...@@ -17,7 +21,7 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
public class ClientRunner { public class ClientRunner {
public static int parallelCount; public static int concurrency;
public static int runtime; public static int runtime;
private final String clientID; private final String clientID;
public static String propertiesFile; public static String propertiesFile;
...@@ -26,24 +30,29 @@ public class ClientRunner { ...@@ -26,24 +30,29 @@ public class ClientRunner {
private List<Follower> replicaSet; private List<Follower> replicaSet;
private Queue<Long> createTime, updateTime, readTime, deleteTime; private Queue<Long> createTime, updateTime, readTime, deleteTime;
private final Properties properties; private final Properties properties;
private final List<StorageModel> generatedPacket;
private Semaphore limiter = null;
public ClientRunner() { public ClientRunner() {
clientID = UUID.randomUUID().toString(); clientID = UUID.randomUUID().toString();
generatedPacket = Collections.synchronizedList(new ArrayList<>());
properties = new Properties(); properties = new Properties();
try { try {
InputStream inputStream = new FileInputStream(propertiesFile); InputStream inputStream = new FileInputStream(propertiesFile);
properties.load(inputStream); this.properties.load(inputStream);
parallelCount = Integer.parseInt((String) properties.get("app.thread_count")); System.out.println((String) properties.get("app.concurrency"));
concurrency = Integer.parseInt((String) properties.get("app.concurrency"));
this.limiter = new Semaphore(concurrency);
runtime = Integer.parseInt((String) properties.get("app.runtime")); runtime = Integer.parseInt((String) properties.get("app.runtime"));
cCreate = Integer.parseInt((String) properties.get("app.cycle_create"));
cRead = Integer.parseInt((String) properties.get("app.cycle_read"));
cUpdate = Integer.parseInt((String) properties.get("app.cycle_update"));
cDelete = Integer.parseInt((String) properties.get("app.cycle_delete"));
createTime = new ConcurrentLinkedQueue<>(); this.cCreate = Integer.parseInt((String) properties.get("app.cycle_create"));
updateTime = new ConcurrentLinkedQueue<>(); this.cRead = Integer.parseInt((String) properties.get("app.cycle_read"));
readTime = new ConcurrentLinkedQueue<>(); this.cUpdate = Integer.parseInt((String) properties.get("app.cycle_update"));
deleteTime = new ConcurrentLinkedQueue<>(); this.cDelete = Integer.parseInt((String) properties.get("app.cycle_delete"));
this.createTime = new ConcurrentLinkedQueue<>();
this.updateTime = new ConcurrentLinkedQueue<>();
this.readTime = new ConcurrentLinkedQueue<>();
this.deleteTime = new ConcurrentLinkedQueue<>();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -52,54 +61,6 @@ public class ClientRunner { ...@@ -52,54 +61,6 @@ public class ClientRunner {
return "Hello World!"; return "Hello World!";
} }
// create a metadata block
public long create(NetworkServiceGrpc.NetworkServiceBlockingStub stub, String key, String value) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE,
0, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
Packet response = stub.createMetadata(packet);
return timestampCreateStart;
}
// read back the metadata
public Map.Entry<Packet, Long> read(ArrayList<ManagedChannel> channels, String key) {
int rnd = new Random().nextInt(channels.size());
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channels.get(rnd));
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
long timestampReadStart = System.currentTimeMillis();
Packet response = stub.readMetadata(packet);
// System.out.println(response);
return new AbstractMap.SimpleEntry<>(response, timestampReadStart);
}
public long update(NetworkServiceGrpc.NetworkServiceBlockingStub stub, String key, String value, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_UPDATE,
version, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
Packet response = stub.updateMetadata(packet);
return timestampCreateStart;
}
public long delete(NetworkServiceGrpc.NetworkServiceBlockingStub stub, String key, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_DELETE,
version, 0, key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
Packet response = stub.deleteMetadata(packet);
return timestampCreateStart;
}
private String createString() { private String createString() {
double dataSize = Double.parseDouble((String) properties.get("app.data_size")); double dataSize = Double.parseDouble((String) properties.get("app.data_size"));
dataSize /= 2.0; // Java strings are 2B long dataSize /= 2.0; // Java strings are 2B long
...@@ -113,72 +74,157 @@ public class ClientRunner { ...@@ -113,72 +74,157 @@ public class ClientRunner {
char[] data = new char[(int)(dataSize * multiplier)]; char[] data = new char[(int)(dataSize * multiplier)];
return new String(data); return new String(data);
} }
public double runExperiment(String id, long experimentStartTime) {
final ManagedChannel masterChannel = ManagedChannelBuilder. private void seedServer(StorageService storageService, String value) throws InterruptedException {
forAddress(ConfigConstants.HOST, ConfigConstants.PORT) CountDownLatch latch = new CountDownLatch(10);
.usePlaintext() for (int i = 0; i < 10; i++) {
.build(); String key = Integer.toString((int) (Math.random() * Integer.MAX_VALUE));
ArrayList<ManagedChannel> channels = new ArrayList<>(); ListenableFuture<Packet> res = storageService.create(key, value, MessageConstants.METADATA_ACCESS_PRIVATE);
channels.add(masterChannel); StorageModel model = new StorageModel(0, value.length(), key,
for (Follower follower: replicaSet) { MessageConstants.METADATA_ACCESS_PRIVATE, clientID, 0, value);
ManagedChannel channel = ManagedChannelBuilder. res.addListener(() -> {
forAddress(follower.getIp(), follower.getPort()) try {
.usePlaintext() Packet packet = res.get();
.build(); for (Response response: packet.getResponseList()) {
channels.add(channel); // System.out.println("seed: " + response);
if (response.getStatus() == MessageConstants.STATUS_OK) {
model.updateData(response.getAck());
generatedPacket.add(model);
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
latch.countDown();
}, MoreExecutors.directExecutor());
} }
String value = createString(), updatedValue = createString(); latch.await();
for (;;) { }
String key = id + (int) (Math.random() * Integer.MAX_VALUE); public double runExperiment(StorageService storageService) throws InterruptedException {
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(masterChannel);
for (int j = 0; j < cCreate; j++) {
long timestampCreateStart = create(stub, key, value);
createTime.add(System.currentTimeMillis() - timestampCreateStart);
}
for (int j = 0; j < cRead; j++) {
AbstractMap.Entry<Packet, Long> data = read(channels, key);
readTime.add(System.currentTimeMillis() - data.getValue());
}
for (int j = 0; j < cUpdate; j++) { String value = createString(), updatedValue = createString();
AbstractMap.Entry<Packet, Long> data = read(channels, key); double totalBracket = this.cCreate + this.cRead + this.cUpdate + this.cDelete;
long timestampUpdateStart = update(stub, key, updatedValue, double createBracket, readBracket, updateBracket, deleteBracket;
data.getKey().getResponse(0).getAck().getVersion());
readTime.add(System.currentTimeMillis() - data.getValue());
updateTime.add(System.currentTimeMillis() - timestampUpdateStart);
}
for (int j = 0; j < cDelete; j++) { createBracket = this.cCreate * 1.0 / totalBracket;
AbstractMap.Entry<Packet, Long> data = read(channels, key); readBracket = createBracket + this.cRead / totalBracket;
long timestampDeleteStart = delete(stub, key, data.getKey().getResponse(0).getAck().getVersion()); updateBracket = readBracket + this.cUpdate / totalBracket;
readTime.add(System.currentTimeMillis() - data.getValue()); deleteBracket = updateBracket + this.cDelete / totalBracket;
deleteTime.add(System.currentTimeMillis() - timestampDeleteStart); System.out.println("Starting experiment");
do {
limiter.acquire();
double toss = Math.random();
if (toss < createBracket) {
// System.out.println("create");
String key = Integer.toString((int) (Math.random() * Integer.MAX_VALUE));
StorageModel model = new StorageModel(0, value.length(), key,
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, 0, value);
ListenableFuture<Packet> res = storageService.create(model.getKey(), model.getValue(), model.getAccessType());
res.addListener(() -> {
try {
Packet packet = res.get();
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) {
model.updateData(response.getAck());
generatedPacket.add(model);
} else
System.out.println("error packet " + response);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
limiter.release();
}, MoreExecutors.directExecutor());
} else if (toss < readBracket) {
// System.out.println("read");
int index = (int) (Math.random() * generatedPacket.size());
StorageModel sendPacket = this.generatedPacket.get(index);
ListenableFuture<Packet> res = storageService.read(sendPacket.getKey());
res.addListener(() -> {
try {
Packet packet = res.get();
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK)
if (!sendPacket.getKey().equals(response.getAck().getKey()))
System.out.println("read kand ho gaya");
else {
synchronized (sendPacket) {
sendPacket.updateData(response.getAck());
}
}
else
System.out.println("error packet " + response);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
limiter.release();
}, MoreExecutors.directExecutor());
} else if (toss < updateBracket) {
// System.out.println("update");
int index = (int) (Math.random() * generatedPacket.size());
StorageModel sendPacket = this.generatedPacket.get(index);
ListenableFuture<Packet> res = storageService.update(sendPacket.getKey(), updatedValue,
sendPacket.getVersion());
res.addListener(() -> {
try {
Packet packet = res.get();
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK)
if (!sendPacket.getKey().equals(response.getAck().getKey()))
System.out.println("update kand ho gaya");
else {
synchronized (sendPacket) {
sendPacket.updateData(response.getAck());
}
}
else
System.out.println("error packet " + response);
}
limiter.release();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, MoreExecutors.directExecutor());
} else {
// System.out.println("delete");
int index = (int) (Math.random() * generatedPacket.size());
StorageModel sendPacket = this.generatedPacket.get(index);
ListenableFuture<Packet> res = storageService.delete(sendPacket.getKey(), sendPacket.getVersion());
res.addListener(() -> {
try {
Packet packet = res.get();
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK)
if (!sendPacket.getKey().equals(response.getAck().getKey()))
System.out.println("delete kand ho gaya");
else {
synchronized (sendPacket) {
generatedPacket.remove(sendPacket);
}
}
else
System.out.println("error packet " + response);
}
limiter.release();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, MoreExecutors.directExecutor());
// deleteTime.add(System.currentTimeMillis() - timestampDeleteStart);
} }
long currentTime = System.currentTimeMillis(); } while (!this.experimentEnded);
if ((currentTime - experimentStartTime) >= runtime * 1000L)
break;
}
// System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) + // System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) +
// "ms qps " + qps); // "ms qps " + qps);
masterChannel.shutdown();
return 0; return 0;
} }
public void retrieveFollowerList() {
final ManagedChannel channel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
.build();
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel);
ResponseList responseList = stub.getReadReplicaList(null);
this.replicaSet = responseList.getFollowerList();
// for (Follower follower: this.replicaSet) {
// System.out.println(follower);
// }
channel.shutdown();
}
private void timerService() { private void timerService() {
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start(); stopwatch.start();
...@@ -224,38 +270,52 @@ public class ClientRunner { ...@@ -224,38 +270,52 @@ public class ClientRunner {
+ " Update: " + updateQps + " Delete: " + deleteQps + " Total: " + totalQps); + " Update: " + updateQps + " Delete: " + deleteQps + " Total: " + totalQps);
totalRuntime /= 1000; totalRuntime /= 1000;
System.out.println("Total QPS: " + totalQps / totalRuntime + " avg query time: " + System.out.println("Total QPS: " + totalQps / totalRuntime + " avg query time: " +
(totalQps * parallelCount / (totalRuntime))); (totalQps * concurrency / (totalRuntime)));
System.out.println("Read QPS: " + readQps / totalRuntime + " avg query time: " + avgRead); System.out.println("Read QPS: " + readQps / totalRuntime + " avg query time: " + avgRead);
System.out.println("Create QPS: " + createQps / totalRuntime + " avg query time: " + avgCreate); System.out.println("Create QPS: " + createQps / totalRuntime + " avg query time: " + avgCreate);
System.out.println("Update QPS: " + updateQps / totalRuntime + " avg query time: " + avgUpdate); System.out.println("Update QPS: " + updateQps / totalRuntime + " avg query time: " + avgUpdate);
System.out.println("Delete QPS: " + deleteQps / totalRuntime + " avg query time: " + avgDelete); System.out.println("Delete QPS: " + deleteQps / totalRuntime + " avg query time: " + avgDelete);
} }
public static void main(String[] args) throws InterruptedException, ExecutionException { private void cleanupExperiment(StorageService storageService) throws InterruptedException {
System.out.println("Cleaning up remnants");
CountDownLatch latch = new CountDownLatch(generatedPacket.size());
for (StorageModel data: generatedPacket) {
ListenableFuture<Packet> res = storageService.delete(data.getKey(), data.getVersion());
res.addListener(latch::countDown, MoreExecutors.directExecutor());
}
latch.await();
storageService.cleanup();
}
public static void main(String[] args) throws InterruptedException {
propertiesFile = args[0]; propertiesFile = args[0];
ClientRunner clientRunner = new ClientRunner(); ClientRunner clientRunner = new ClientRunner();
System.out.println(clientRunner.getGreeting()); System.out.println(clientRunner.getGreeting());
System.out.println("Thread count: " + parallelCount + " runtime: " + runtime + "s"); StorageService storageService = new StorageService(clientRunner.clientID);
ExecutorService executorService = Executors.newFixedThreadPool(parallelCount); storageService.initStorage();
Thread.sleep(1000); // let things settle down a bit System.out.println("storage initialised");
clientRunner.retrieveFollowerList(); clientRunner.seedServer(storageService, clientRunner.createString());
Set<Callable<Double>> callables = new HashSet<>(); System.out.println("server seeded" + clientRunner.generatedPacket);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
for (int i = 0; i < parallelCount; i++) {
int finalI = i;
callables.add(() -> clientRunner.runExperiment(Integer.toString(finalI), startTime));
}
clientRunner.timerService(); clientRunner.timerService();
List<Future<Double>> futures = executorService.invokeAll(callables); ExecutorService executorService = Executors.newSingleThreadExecutor();
for (Future<Double> future: futures) { executorService.submit(() -> {
future.get(); try {
} Thread.sleep(runtime * 1000L);
clientRunner.experimentEnded = true; } catch (InterruptedException e) {
long endTime = System.currentTimeMillis(); e.printStackTrace();
double totalRuntime = endTime - startTime; }
clientRunner.printStatistics(totalRuntime); clientRunner.experimentEnded = true;
System.out.println("Experiment ended");
});
clientRunner.runExperiment(storageService);
// long endTime = System.currentTimeMillis();
// double totalRuntime = endTime - startTime;
//// clientRunner.printStatistics(totalRuntime);
clientRunner.cleanupExperiment(storageService);
executorService.shutdown(); executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.SECONDS); boolean status = executorService.awaitTermination(1000, TimeUnit.MICROSECONDS);
if (!status)
executorService.shutdownNow();
} }
} }
...@@ -12,10 +12,14 @@ public class MessageConstants { ...@@ -12,10 +12,14 @@ public class MessageConstants {
// Distinguishing ACK and NACK packets // Distinguishing ACK and NACK packets
public static final int STATUS_OK = 200; public static final int STATUS_OK = 200;
public static final int STATUS_OWNER_MISMATCH = 401; public static final int STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS = 401;
public static final int STATUS_REPLICATE_FAILED = 402; public static final int STATUS_REPLICATION_FAILED = 402;
public static final int STATUS_SERVER_NOT_MASTER = 403; public static final int STATUS_SERVER_NOT_MASTER = 403;
public static final int STATUS_KEY_NOT_FOUND = 404; public static final int STATUS_KEY_NOT_FOUND = 404;
public static final int STATUS_REPLICATION_TIMEOUT = 405;
public static final int STATUS_IO_WRITE_FAILED = 406;
public static final int STATUS_KEY_EXISTS = 407;
public static final int STATUS_UPDATE_VERSION_MISMATCH = 408;
// 100 to 199 - HPDOS System internal operations // 100 to 199 - HPDOS System internal operations
public static final int MASTER_HEARTBEAT = 100; public static final int MASTER_HEARTBEAT = 100;
......
package HpdosClient.lib;
import hpdos.grpc.Ack;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
public class StorageModel {
private int version;
private int dataSize;
private final String key; // key is immutable
private long crc;
private int accessType;
private String value;
private final String owner; // ownership is immutable
public StorageModel(int version, int dataSize, String key, int accessType, String owner, long crc, String value) {
this.version = version;
this.dataSize = dataSize;
this.key = key;
this.accessType = accessType;
this.value = value;
this.owner = owner;
// calculate CRC32 based on the value field
this.crc = crc;
}
public void updateData(Ack ack) {
this.version = ack.getVersion();
this.dataSize = ack.getDataSize();
this.value = ack.getValue();
this.crc = ack.getCrc();
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getDataSize() {
return dataSize;
}
public void setDataSize(int dataSize) {
this.dataSize = dataSize;
}
public String getKey() {
return key;
}
public long getCrc() {
return crc;
}
public void setCrc(long crc) {
this.crc = crc;
}
public int getAccessType() {
return accessType;
}
public void setAccessType(int accessType) {
this.accessType = accessType;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getOwner() {
return owner;
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (obj.getClass() != this.getClass())
return false;
StorageModel model = (StorageModel) obj;
return this.getKey().equals(model.getKey());
}
@Override
public String toString() {
return "key: " + this.key + "\n" +
"dataSize: " + this.dataSize + "\n" +
"version: " + this.version + "\n" +
"owner: " + this.owner + "\n" +
"accessType: " + this.accessType + "\n" +
"crc: " + this.crc + "\n" +
"value: " + this.value;
}
}
package HpdosClient.lib;
import HpdosClient.ConfigConstants;
import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.MessageFormat.RequestBuilder;
import com.google.common.util.concurrent.ListenableFuture;
import hpdos.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.*;
public class StorageService {
private final String clientID;
private ManagedChannel masterChannel;
private ArrayList<ManagedChannel> channels;
private NetworkServiceGrpc.NetworkServiceFutureStub masterStub;
private final ArrayList<NetworkServiceGrpc.NetworkServiceFutureStub> stubs;
private List<Follower> replicaSet;
public StorageService(String clientID) {
this.clientID = clientID;
this.stubs = new ArrayList<>();
}
public void retrieveFollowerList() {
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(this.masterChannel);
ResponseList responseList = stub.getReadReplicaList(null);
this.replicaSet = responseList.getFollowerList();
// for (Follower follower: this.replicaSet) {
// System.out.println(follower);
// }
}
public void cleanup() {
for (ManagedChannel channel: this.channels)
channel.shutdown();
}
public void initStorage() {
masterChannel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
.build();
channels = new ArrayList<>();
channels.add(masterChannel);
masterStub = NetworkServiceGrpc.newFutureStub(this.masterChannel);
retrieveFollowerList();
for (Follower follower: replicaSet) {
ManagedChannel channel = ManagedChannelBuilder.
forAddress(follower.getIp(), follower.getPort())
.usePlaintext()
.build();
channels.add(channel);
NetworkServiceGrpc.NetworkServiceFutureStub stub = NetworkServiceGrpc.newFutureStub(channel);
stubs.add(stub);
}
}
public ListenableFuture<Packet> create(String key, String value, int accessType) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE,
0, value.length(), key,
0, accessType, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
return this.masterStub.createMetadata(packet);
}
public ListenableFuture<Packet> read(String key) {
int rnd = new Random().nextInt(this.stubs.size());
NetworkServiceGrpc.NetworkServiceFutureStub stub = this.stubs.get(rnd);
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
return stub.readMetadata(packet);
}
public ListenableFuture<Packet> update(String key, String value, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_UPDATE,
version, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
return this.masterStub.updateMetadata(packet);
}
public ListenableFuture<Packet> delete(String key, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_DELETE,
version, 0, key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
return this.masterStub.deleteMetadata(packet);
}
}
...@@ -66,7 +66,7 @@ public class IOHandler { ...@@ -66,7 +66,7 @@ public class IOHandler {
storedData.getData().getValue()); storedData.getData().getValue());
return ResponseBuilder.buildResponsePacket( return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_OK, MessageConstants.METADATA_UPDATE, MessageConstants.STATUS_OK,
ack, null); ack, null);
} }
else { else {
...@@ -74,7 +74,7 @@ public class IOHandler { ...@@ -74,7 +74,7 @@ public class IOHandler {
storedData.getData().getVersion(): MessageConstants.INVALID_VERSION; storedData.getData().getVersion(): MessageConstants.INVALID_VERSION;
nack = ResponseBuilder.buildNack(version, request.getKey()); nack = ResponseBuilder.buildNack(version, request.getKey());
return ResponseBuilder.buildResponsePacket( return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, storedData.getStatus(), MessageConstants.METADATA_UPDATE, storedData.getStatus(),
null, nack); null, nack);
} }
} }
...@@ -102,7 +102,7 @@ public class IOHandler { ...@@ -102,7 +102,7 @@ public class IOHandler {
storedData.getData().getValue()); storedData.getData().getValue());
return ResponseBuilder.buildResponsePacket( return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_OK, MessageConstants.METADATA_DELETE, MessageConstants.STATUS_OK,
ack, null); ack, null);
} }
else { else {
...@@ -110,7 +110,7 @@ public class IOHandler { ...@@ -110,7 +110,7 @@ public class IOHandler {
storedData.getData().getVersion(): MessageConstants.INVALID_VERSION; storedData.getData().getVersion(): MessageConstants.INVALID_VERSION;
nack = ResponseBuilder.buildNack(version, request.getKey()); nack = ResponseBuilder.buildNack(version, request.getKey());
return ResponseBuilder.buildResponsePacket( return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, storedData.getStatus(), MessageConstants.METADATA_DELETE, storedData.getStatus(),
null, nack); null, nack);
} }
} }
...@@ -145,7 +145,7 @@ public class IOHandler { ...@@ -145,7 +145,7 @@ public class IOHandler {
} }
} }
return ResponseBuilder.buildResponsePacket( return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, status, MessageConstants.METADATA_READ, status,
ack, nack); ack, nack);
} }
} }
...@@ -43,9 +43,9 @@ public class MemoryStorageService implements StorageService { ...@@ -43,9 +43,9 @@ public class MemoryStorageService implements StorageService {
boolean status = memoryKVStore.replace(key, value, newValue); // the equals method is overridden in Storage model boolean status = memoryKVStore.replace(key, value, newValue); // the equals method is overridden in Storage model
// to equate two objects based on their version numbers // to equate two objects based on their version numbers
if (status) if (status)
return new StoredModel(value, MessageConstants.STATUS_OK); return new StoredModel(newValue, MessageConstants.STATUS_OK);
else else
return new StoredModel(null, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH); return new StoredModel(previousValue, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
} }
@Override @Override
...@@ -60,6 +60,6 @@ public class MemoryStorageService implements StorageService { ...@@ -60,6 +60,6 @@ public class MemoryStorageService implements StorageService {
if (status) if (status)
return new StoredModel(previousValue, MessageConstants.STATUS_OK); return new StoredModel(previousValue, MessageConstants.STATUS_OK);
else else
return new StoredModel(null, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH); return new StoredModel(previousValue, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
} }
} }
...@@ -2,7 +2,7 @@ package hpdos.message; ...@@ -2,7 +2,7 @@ package hpdos.message;
public class MessageConstants { public class MessageConstants {
public static final int INIT_VERSION = 0; public static final int INIT_VERSION = 1;
public static final int INVALID_VERSION = -1; public static final int INVALID_VERSION = -1;
public static final int METADATA_ACCESS_PRIVATE = 700; public static final int METADATA_ACCESS_PRIVATE = 700;
public static final int METADATA_ACCESS_SHARED = 777; public static final int METADATA_ACCESS_SHARED = 777;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment