Commit 3c470c7b authored by NILANJAN DAW's avatar NILANJAN DAW

Modified storage to avoid locking in concurrent access scenarios

parent 6621f187
......@@ -18,7 +18,7 @@ import java.util.*;
import java.util.concurrent.*;
public class ClientRunner {
public static final int parallelCount = 1;
public static final int parallelCount = 10;
private final String clientID;
public ClientRunner() {
......@@ -28,43 +28,91 @@ public class ClientRunner {
return "Hello World!";
}
public String initClient(String id) {
// create a metadata block
public long create(NetworkServiceGrpc.NetworkServiceBlockingStub stub, String key, String value) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE,
0, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
Packet response = stub.createMetadata(packet);
return timestampCreateStart;
}
// read back the metadata
public Map.Entry<Packet, Long> read(NetworkServiceGrpc.NetworkServiceBlockingStub stub, String key) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
long timestampReadStart = System.currentTimeMillis();
Packet response = stub.readMetadata(packet);
// System.out.println(response);
return new AbstractMap.SimpleEntry<>(response, timestampReadStart);
}
public long update(NetworkServiceGrpc.NetworkServiceBlockingStub stub, String key, String value, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_UPDATE,
version, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
Packet response = stub.updateMetadata(packet);
return timestampCreateStart;
}
public long delete(NetworkServiceGrpc.NetworkServiceBlockingStub stub, String key, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_DELETE,
version, 0, key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
Packet response = stub.deleteMetadata(packet);
return timestampCreateStart;
}
public double initClient(String id) {
final ManagedChannel channel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
.build();
double averageReadTime = 0, averageWriteTime = 0;
for (int i = 1; i <= 10; i++) {
double averageReadTime = 0, averageWriteTime = 0, averageUpdateTime = 0, averageDeleteTime = 0;
long startTime = System.currentTimeMillis();
for (int i = 1; i <= 1000; i++) {
String key = Integer.toString((int) (Math.random() * Integer.MAX_VALUE)), value = "dummy";
String key = id + (int) (Math.random() * Integer.MAX_VALUE),
value = "dummy",
updatedValue = "dummyUpdated";
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel);
ArrayList<Request> request = new ArrayList<>();
// create a metadata block
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE,
0, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
Packet response = stub.createMetadata(packet);
long timestampCreateStart = create(stub, key, value);
averageWriteTime = (averageWriteTime * (i - 1) + (System.currentTimeMillis() - timestampCreateStart)) / i;
// read back the metadata
request.clear();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
packet = RequestBuilder.buildPacket(request);
long timestampReadStart = System.currentTimeMillis();
response = stub.readMetadata(packet);
averageReadTime = (averageReadTime * (i - 1) + (System.currentTimeMillis() - timestampReadStart)) / i;
AbstractMap.Entry<Packet, Long> data = read(stub, key);
averageReadTime = (averageReadTime * (i - 1) + (System.currentTimeMillis() - data.getValue())) / i;
long timestampUpdateStart = update(stub, key, updatedValue, data.getKey().getResponse(0).getAck().getVersion());
averageUpdateTime = (averageUpdateTime * (i - 1) + (System.currentTimeMillis() - timestampUpdateStart)) / i;
data = read(stub, key);
averageReadTime = (averageReadTime * (i - 1) + (System.currentTimeMillis() - data.getValue())) / i;
long timestampDeleteStart = delete(stub, key, data.getKey().getResponse(0).getAck().getVersion());
averageDeleteTime = (averageDeleteTime * (i - 1) + (System.currentTimeMillis() - timestampDeleteStart)) / i;
data = read(stub, key);
averageReadTime = (averageReadTime * (i - 1) + (System.currentTimeMillis() - data.getValue())) / i;
// if (i % 100 == 0)
System.out.println("Average MDS read time: "
if (i % 1000 == 0)
System.out.println(id + "Avg MDS time read: "
+ averageReadTime
+ " ms average create time: " + averageWriteTime + "ms");
+ " ms\t create: " + averageWriteTime
+ " ms\t update: " + averageUpdateTime
+ " ms\t delete: " + averageDeleteTime + "ms"
);
}
double qps = (1000 * 1000 / (System.currentTimeMillis() * 1.0 - startTime));
// System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) +
// "ms qps " + qps);
channel.shutdown();
return id;
return qps;
}
......@@ -73,16 +121,22 @@ public class ClientRunner {
System.out.println(clientRunner.getGreeting());
Thread.sleep(1000);
ExecutorService executorService = Executors.newFixedThreadPool(parallelCount);
Set<Callable<String>> callables = new HashSet<>();
Set<Callable<Double>> callables = new HashSet<>();
for (int i = 0; i < parallelCount; i++) {
int finalI = i;
callables.add(() -> clientRunner.initClient(Integer.toString(finalI)));
}
List<Future<String>> futures = executorService.invokeAll(callables);
for (Future<String> future: futures) {
System.out.println(future.get());
long startTime = System.currentTimeMillis();
List<Future<Double>> futures = executorService.invokeAll(callables);
double qps = 0;
for (Future<Double> future: futures) {
qps += future.get();
}
long endTime = System.currentTimeMillis();
System.out.println();
System.out.println("QPS Read: " + 3 * qps + " Write: " + qps + " Update: " + qps + " Delete: " + qps);
System.out.println("QPS Total: " + 6 * qps + "avg query time: " + ((6 * qps) / (endTime - startTime)));
// clientRunner.initClient();
}
}
......@@ -10,4 +10,5 @@ public class ConfigConstants {
public static final int BACKEND_IN_MEMORY = 300;
public static final int LSM_BACKEND = 301;
public static final int BINARY_TREE_BACKEND = 302;
public static final int REPLICATOR_THREAD_POOL_SIZE = 12;
}
......@@ -12,7 +12,7 @@ import java.util.HashMap;
public class HeartbeatHandler extends HeartbeatServiceGrpc.HeartbeatServiceImplBase {
private final HashMap<String, MasterFollower> followers;
private String serverID;
private final String serverID;
public HeartbeatHandler(HashMap<String, MasterFollower> followers, String serverID) {
this.followers = followers;
this.serverID = serverID;
......
......@@ -79,10 +79,21 @@ public class IOHandler {
}
}
/**
* Removes the object from the KV store if the request version matches with the current version of data
* @param request Models to be deleted. Only the key, clientID and version needs to be valid
* @return previous value if successful else null with status
*/
public Response delete(Request request) {
Ack ack = null; Nack nack = null;
StoredModel storedData = storageService.delete(request.getKey(), request.getVersion());
StorageModel blob = new StorageModel(
request.getVersion(),
request.getDataSize(),
request.getKey(),
request.getAccessType(),
request.getClientID(),
request.getValue());
StoredModel storedData = storageService.delete(request.getKey(), blob);
if (storedData.getStatus() == MessageConstants.STATUS_OK) {
ack = ResponseBuilder.buildAck(storedData.getData().getVersion(),
storedData.getData().getDataSize(),
......
......@@ -21,11 +21,11 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
@Override
public void readMetadata(Packet requestPacket,
StreamObserver<Packet> responseObserver) {
System.out.println("Data received" + requestPacket.toString());
// System.out.println("Data received" + requestPacket.toString());
for (Request request: requestPacket.getRequestList()) {
Response response = ioHandler.read(request);
Packet packet = ResponseBuilder.buildPacket(response);
System.out.println(packet);
// System.out.println(packet);
responseObserver.onNext(packet);
}
......@@ -34,7 +34,6 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
@Override
public void createMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) {
System.out.println("new create request " + requestPacket);
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
for (Request request: requestPacket.getRequestList()) {
......@@ -82,13 +81,17 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
ReplicationResponse replicationResponse = null;
try {
System.out.println("starting replication");
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
replicationResponse = replicationService.replicateMetadata(replicationRequest);
stopwatch.stop();
System.out.println("Network handler replicate" + stopwatch);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("replication complete");
Packet packet = ResponseBuilder.buildPacket(new ArrayList<>(replicationResponse.getResponseList()));
System.out.println(packet);
// System.out.println(packet);
return packet;
}
}
package hpdos.handler;
import com.google.common.base.Stopwatch;
import hpdos.grpc.*;
import hpdos.message.MessageConstants;
import hpdos.message.ResponseBuilder;
......@@ -13,7 +14,9 @@ public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceI
@Override
public void replicateMetadata(ReplicationRequest replicationRequest, StreamObserver<ReplicationResponse> responseObserver) {
ReplicationResponse responsePacket = null;
System.out.println("Replication request " + replicationRequest);
// System.out.println("Replication request " + replicationRequest);
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
if (replicationRequest.getPacketType() == MessageConstants.PACKET_METADATA_REQUEST) {
for (Request request: replicationRequest.getRequestList()) {
if (request.getOperationType() == MessageConstants.METADATA_CREATE) {
......@@ -30,6 +33,8 @@ public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceI
}
responseObserver.onCompleted();
}
stopwatch.stop();
// System.out.println("Replicate Handler " + stopwatch);
}
}
package hpdos.lib;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ListenableFuture;
import hpdos.ConfigConstants;
import hpdos.grpc.ReplicationRequest;
import hpdos.grpc.ReplicationResponse;
import hpdos.grpc.ReplicationServiceGrpc;
......@@ -16,6 +19,7 @@ public class InlineReplicationService implements ReplicationService {
private final HashMap<String, MasterFollower> followers;
private final HashMap<String, ManagedChannel> channels;
private final ExecutorService executorService;
public InlineReplicationService(HashMap<String, MasterFollower> followers) {
this.followers = followers;
this.channels = new HashMap<>();
......@@ -26,7 +30,7 @@ public class InlineReplicationService implements ReplicationService {
.build();
channels.put(follower.getFollowerID(), channel);
}
this.executorService = Executors.newFixedThreadPool(ConfigConstants.REPLICATOR_THREAD_POOL_SIZE);
}
@Override
......@@ -49,8 +53,9 @@ public class InlineReplicationService implements ReplicationService {
}
@Override
public ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(followers.size());
public ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest)
throws InterruptedException, ExecutionException {
Set<Callable<ReplicationResponse>> callables = new HashSet<>();
// new followers have joined or left.
// TODO: Handle follower leaving scenario
......@@ -67,6 +72,8 @@ public class InlineReplicationService implements ReplicationService {
}
List<Future<ReplicationResponse>> futures = executorService.invokeAll(callables);
HashMap<String, Response> responseHashMap = new HashMap<>();
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
for (Future<ReplicationResponse> future: futures) {
ReplicationResponse replicationResponse;
replicationResponse = future.get(); //TODO: Add and handle get timeout. Timeout related constants already added
......@@ -81,7 +88,20 @@ public class InlineReplicationService implements ReplicationService {
}
}
}
stopwatch.stop();
System.out.println("replicateMetadata ReplicationService " + stopwatch);
return ResponseBuilder.
buildReplicationResponse(new ArrayList<>(responseHashMap.values()));
}
@Override
public ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest) {
for (ManagedChannel channel: channels.values()) {
ReplicationServiceGrpc.ReplicationServiceFutureStub stub =
ReplicationServiceGrpc.newFutureStub(channel);
ListenableFuture<ReplicationResponse> res = stub.replicateMetadata(replicationRequest);
}
return null;
}
}
......@@ -2,19 +2,18 @@ package hpdos.lib;
import hpdos.message.MessageConstants;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
public class MemoryStorageService implements StorageService {
private final HashMap<String, StorageModel> memoryKVStore;
private final ConcurrentHashMap<String, StorageModel> memoryKVStore;
public MemoryStorageService() {
this.memoryKVStore = new HashMap<>();
this.memoryKVStore = new ConcurrentHashMap<>();
}
@Override
public StoredModel create(String key, StorageModel value) {
try {
if (memoryKVStore.containsKey(key))
if (memoryKVStore.putIfAbsent(key, value) != null)
return new StoredModel(null, MessageConstants.STATUS_KEY_EXISTS);
memoryKVStore.put(key, value);
return new StoredModel(value, MessageConstants.STATUS_OK);
} catch (Exception e) {
return null;
......@@ -30,23 +29,37 @@ public class MemoryStorageService implements StorageService {
@Override
public StoredModel update(String key, StorageModel value) {
if (!memoryKVStore.containsKey(key))
return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
StorageModel previousValue = memoryKVStore.get(key);
if (previousValue.getVersion() != value.getVersion())
return new StoredModel(previousValue, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
value.setVersion((previousValue.getVersion() + 1) % Integer.MAX_VALUE); // version wraps around
memoryKVStore.put(key, value);
return new StoredModel(value, MessageConstants.STATUS_OK);
if (previousValue == null)
return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
else if (previousValue.getAccessType() == MessageConstants.METADATA_ACCESS_PRIVATE
&& !previousValue.getOwner().equals(value.getOwner()))
return new StoredModel(null, MessageConstants.STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS);
// the request will have the old version number of the data to be inserted, we only update the data
// with a new version number if at the time of update the two versions match
// else we reject the update
StorageModel newValue = value.createVersionUpdatedModel();
boolean status = memoryKVStore.replace(key, value, newValue); // the equals method is overridden in Storage model
// to equate two objects based on their version numbers
if (status)
return new StoredModel(value, MessageConstants.STATUS_OK);
else
return new StoredModel(null, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
}
@Override
public StoredModel delete(String key, int version) {
if (!memoryKVStore.containsKey(key))
return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
public StoredModel delete(String key, StorageModel value) {
StorageModel previousValue = memoryKVStore.get(key);
if (previousValue.getVersion() != version)
return new StoredModel(previousValue, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
return new StoredModel(memoryKVStore.remove(key), MessageConstants.STATUS_OK);
if (previousValue == null)
return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
else if (previousValue.getAccessType() == MessageConstants.METADATA_ACCESS_PRIVATE
&& !previousValue.getOwner().equals(value.getOwner()))
return new StoredModel(null, MessageConstants.STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS);
boolean status = memoryKVStore.remove(key, value);
if (status)
return new StoredModel(previousValue, MessageConstants.STATUS_OK);
else
return new StoredModel(null, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
}
}
......@@ -7,5 +7,6 @@ import java.util.concurrent.ExecutionException;
public interface ReplicationService {
abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException;
abstract ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest);
abstract void cleanup();
}
......@@ -6,11 +6,11 @@ import java.util.zip.Checksum;
public class StorageModel {
private int version;
private int dataSize;
private String key;
private final String key; // key is immutable
private long crc;
private int accessType;
private String value;
private String owner;
private final String owner; // ownership is immutable
public StorageModel(int version, int dataSize, String key, int accessType, String owner, String value) {
this.version = version;
......@@ -26,6 +26,12 @@ public class StorageModel {
this.crc = checksum.getValue();
}
public StorageModel createVersionUpdatedModel() {
int updatedVersion = (this.getVersion() + 1) % Integer.MAX_VALUE; // version wraps around
return new StorageModel(updatedVersion, this.getDataSize(),
this.getKey(), this.getAccessType(), this.getOwner(), this.getValue());
}
public int getVersion() {
return version;
}
......@@ -46,10 +52,6 @@ public class StorageModel {
return key;
}
public void setKey(String key) {
this.key = key;
}
public long getCrc() {
return crc;
}
......@@ -78,8 +80,14 @@ public class StorageModel {
return owner;
}
public void setOwner(String owner) {
this.owner = owner;
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (obj.getClass() != this.getClass())
return false;
StorageModel model = (StorageModel) obj;
return this.getVersion() == model.getVersion();
}
}
......@@ -4,5 +4,5 @@ public interface StorageService {
StoredModel create(String key, StorageModel value);
StoredModel readByKey(String key);
StoredModel update(String key, StorageModel value);
StoredModel delete(String key, int version);
StoredModel delete(String key, StorageModel version);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment