Commit 66bddc63 authored by NILANJAN DAW's avatar NILANJAN DAW

Added asynchronous future based replication service. Need to perform correctness test.

PS. Observing an extra weird null response intermittently, which is currently being handled by a hack

Added some HPDOS architecture diagram
parent 6c1e29ed
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="libraries-with-intellij-classes">
<option name="intellijApiContainingLibraries">
<list>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIU" />
<option name="groupId" value="com.jetbrains.intellij.idea" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIU" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIC" />
<option name="groupId" value="com.jetbrains.intellij.idea" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIC" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPY" />
<option name="groupId" value="com.jetbrains.intellij.pycharm" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPY" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPC" />
<option name="groupId" value="com.jetbrains.intellij.pycharm" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPC" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="clion" />
<option name="groupId" value="com.jetbrains.intellij.clion" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="clion" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="riderRD" />
<option name="groupId" value="com.jetbrains.intellij.rider" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="riderRD" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="goland" />
<option name="groupId" value="com.jetbrains.intellij.goland" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="goland" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
</list>
</option>
</component>
</project>
\ No newline at end of file
app.name="HPDOS-Client" app.name="HPDOS-Client"
app.version="0.1.4" app.version="0.1.4"
app.thread_count=12 app.thread_count=4
app.runtime=30 app.runtime=30
app.data_size=10
app.data_conversion_factor="B"
app.private_ratio=0.8
app.cycle_create=1 app.cycle_create=1
app.cycle_read=4 app.cycle_read=4
app.cycle_update=3 app.cycle_update=3
......
...@@ -25,10 +25,10 @@ public class ClientRunner { ...@@ -25,10 +25,10 @@ public class ClientRunner {
public boolean experimentEnded = false; public boolean experimentEnded = false;
private List<Follower> replicaSet; private List<Follower> replicaSet;
private Queue<Long> createTime, updateTime, readTime, deleteTime; private Queue<Long> createTime, updateTime, readTime, deleteTime;
private final Properties properties;
public ClientRunner() { public ClientRunner() {
clientID = UUID.randomUUID().toString(); clientID = UUID.randomUUID().toString();
Properties properties = new Properties(); properties = new Properties();
try { try {
InputStream inputStream = new FileInputStream(propertiesFile); InputStream inputStream = new FileInputStream(propertiesFile);
properties.load(inputStream); properties.load(inputStream);
...@@ -99,6 +99,19 @@ public class ClientRunner { ...@@ -99,6 +99,19 @@ public class ClientRunner {
return timestampCreateStart; return timestampCreateStart;
} }
private String createString() {
int dataSize = Integer.parseInt((String) properties.get("app.data_size"));
dataSize /= 2; // Java strings are 2B long
String conversionFactor = (String) properties.get("app.data_conversion_factor");
int multiplier = 1;
switch (conversionFactor) {
case "G": multiplier *= 1000;
case "M": multiplier *= 1000;
case "K": multiplier *= 1000;
}
char[] data = new char[dataSize * multiplier];
return new String(data);
}
public double runExperiment(String id, long experimentStartTime) { public double runExperiment(String id, long experimentStartTime) {
final ManagedChannel masterChannel = ManagedChannelBuilder. final ManagedChannel masterChannel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT) forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
...@@ -113,12 +126,10 @@ public class ClientRunner { ...@@ -113,12 +126,10 @@ public class ClientRunner {
.build(); .build();
channels.add(channel); channels.add(channel);
} }
String value = createString(), updatedValue = createString();
for (;;) { for (;;) {
String key = id + (int) (Math.random() * Integer.MAX_VALUE), String key = id + (int) (Math.random() * Integer.MAX_VALUE);
value = "dummy",
updatedValue = "dummyUpdated";
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(masterChannel); NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(masterChannel);
for (int j = 0; j < cCreate; j++) { for (int j = 0; j < cCreate; j++) {
long timestampCreateStart = create(stub, key, value); long timestampCreateStart = create(stub, key, value);
...@@ -161,9 +172,9 @@ public class ClientRunner { ...@@ -161,9 +172,9 @@ public class ClientRunner {
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel); NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel);
ResponseList responseList = stub.getReadReplicaList(null); ResponseList responseList = stub.getReadReplicaList(null);
this.replicaSet = responseList.getFollowerList(); this.replicaSet = responseList.getFollowerList();
for (Follower follower: this.replicaSet) { // for (Follower follower: this.replicaSet) {
System.out.println(follower); // System.out.println(follower);
} // }
channel.shutdown(); channel.shutdown();
} }
......
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="libraries-with-intellij-classes">
<option name="intellijApiContainingLibraries">
<list>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIU" />
<option name="groupId" value="com.jetbrains.intellij.idea" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIU" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIC" />
<option name="groupId" value="com.jetbrains.intellij.idea" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIC" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPY" />
<option name="groupId" value="com.jetbrains.intellij.pycharm" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPY" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPC" />
<option name="groupId" value="com.jetbrains.intellij.pycharm" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPC" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="clion" />
<option name="groupId" value="com.jetbrains.intellij.clion" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="clion" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="riderRD" />
<option name="groupId" value="com.jetbrains.intellij.rider" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="riderRD" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="goland" />
<option name="groupId" value="com.jetbrains.intellij.goland" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="goland" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
</list>
</option>
</component>
</project>
\ No newline at end of file
...@@ -10,7 +10,6 @@ import io.grpc.stub.StreamObserver; ...@@ -10,7 +10,6 @@ import io.grpc.stub.StreamObserver;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ExecutionException;
public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
private final IOHandler ioHandler; private final IOHandler ioHandler;
...@@ -103,10 +102,10 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { ...@@ -103,10 +102,10 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
// System.out.println("starting replication"); // System.out.println("starting replication");
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start(); stopwatch.start();
replicationResponse = replicationService.replicateMetadata(replicationRequest); replicationResponse = replicationService.replicateMetadataAsync(replicationRequest);
stopwatch.stop(); stopwatch.stop();
// System.out.println("Network handler replicate" + stopwatch); // System.out.println("Network handler replicate" + stopwatch);
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
// System.out.println("replication complete"); // System.out.println("replication complete");
......
...@@ -18,6 +18,8 @@ public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceI ...@@ -18,6 +18,8 @@ public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceI
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start(); stopwatch.start();
if (replicationRequest.getPacketType() == MessageConstants.PACKET_METADATA_REQUEST) { if (replicationRequest.getPacketType() == MessageConstants.PACKET_METADATA_REQUEST) {
if (replicationRequest.getRequestList().size() > 1)
System.out.println("REQUEST BATCHING");
for (Request request: replicationRequest.getRequestList()) { for (Request request: replicationRequest.getRequestList()) {
if (request.getOperationType() == MessageConstants.METADATA_CREATE) { if (request.getOperationType() == MessageConstants.METADATA_CREATE) {
Response response = ioHandler.create(request); Response response = ioHandler.create(request);
...@@ -29,6 +31,7 @@ public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceI ...@@ -29,6 +31,7 @@ public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceI
Response response = ioHandler.delete(request); Response response = ioHandler.delete(request);
responsePacket = ResponseBuilder.buildReplicationResponse(response); responsePacket = ResponseBuilder.buildReplicationResponse(response);
} }
// System.out.println(responsePacket);
responseObserver.onNext(responsePacket); responseObserver.onNext(responsePacket);
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
......
package hpdos.lib; package hpdos.lib;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import hpdos.ConfigConstants; import hpdos.ConfigConstants;
import hpdos.grpc.ReplicationRequest; import hpdos.grpc.ReplicationRequest;
import hpdos.grpc.ReplicationResponse; import hpdos.grpc.ReplicationResponse;
...@@ -96,15 +99,53 @@ public class InlineReplicationService implements ReplicationService { ...@@ -96,15 +99,53 @@ public class InlineReplicationService implements ReplicationService {
buildReplicationResponse(new ArrayList<>(responseHashMap.values())); buildReplicationResponse(new ArrayList<>(responseHashMap.values()));
} }
/**
* Incomplete implementation do not use
* @param replicationRequest replication request sent
* @return replication response
*/
@Override @Override
public ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest) { public ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest) throws InterruptedException {
CountDownLatch replicationWaiter = new CountDownLatch(this.followers.size());
HashMap<String, Response> responseHashMap = new HashMap<>();
if (channels.size() != followers.size()) {
establishChannels();
}
for (ManagedChannel channel: channels.values()) { for (ManagedChannel channel: channels.values()) {
ReplicationServiceGrpc.ReplicationServiceFutureStub stub = ReplicationServiceGrpc.ReplicationServiceFutureStub stub =
ReplicationServiceGrpc.newFutureStub(channel); ReplicationServiceGrpc.newFutureStub(channel);
ListenableFuture<ReplicationResponse> res = stub.replicateMetadata(replicationRequest); ListenableFuture<ReplicationResponse> res = stub.replicateMetadata(replicationRequest);
Futures.addCallback(res, new FutureCallback<>() {
@Override
public void onSuccess(ReplicationResponse result) {
for (Response receivedResponse: result.getResponseList()) {
int status = receivedResponse.getStatus();
if (status == MessageConstants.STATUS_OK) {
if (!responseHashMap.containsKey(receivedResponse.getAck().getKey()))
responseHashMap.put(receivedResponse.getAck().getKey(), receivedResponse);
} else {
responseHashMap.put(receivedResponse.getNack().getKey(), receivedResponse);
}
}
replicationWaiter.countDown();
}
@Override
public void onFailure(Throwable t) {
replicationWaiter.countDown();
}
}, MoreExecutors.directExecutor());
} }
throw new UnsupportedOperationException("Implementation not complete");
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
replicationWaiter.await();
stopwatch.stop();
// System.out.println("replication latency" + stopwatch);
return ResponseBuilder.
buildReplicationResponse(new ArrayList<>(responseHashMap.values()));
} }
@Override @Override
......
...@@ -8,7 +8,7 @@ import java.util.concurrent.ExecutionException; ...@@ -8,7 +8,7 @@ import java.util.concurrent.ExecutionException;
public interface ReplicationService { public interface ReplicationService {
abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException; abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException;
abstract ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest); abstract ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest) throws InterruptedException;
abstract void cleanup() throws InterruptedException; abstract void cleanup() throws InterruptedException;
abstract HashMap<String, MasterFollower> getFollowers(); abstract HashMap<String, MasterFollower> getFollowers();
} }
...@@ -63,7 +63,17 @@ public class ResponseBuilder { ...@@ -63,7 +63,17 @@ public class ResponseBuilder {
public static ReplicationResponse buildReplicationResponse(ArrayList<Response> response) { public static ReplicationResponse buildReplicationResponse(ArrayList<Response> response) {
ReplicationResponse.Builder packet = ReplicationResponse.newBuilder(); ReplicationResponse.Builder packet = ReplicationResponse.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE); packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addAllResponse(response); if (response.size() > 1)
System.out.println("RESPONSE BATCHING" + response.size());
try {
for (Response subResponse: response)
if (subResponse != null)
packet.addResponse(subResponse);
} catch (Exception e) {
System.out.println(response);
System.exit(0);
}
return packet.build(); return packet.build();
} }
} }
<mxfile host="Electron" modified="2021-04-08T09:27:52.622Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/13.7.9 Chrome/85.0.4183.121 Electron/10.1.3 Safari/537.36" etag="F6-fK2rqGVaaWcVVMznQ" version="13.7.9" type="device"><diagram id="cKZVD0N--tm-yNMOju0u" name="Page-1">7Vxbc6M2FP41ftwMICTwY65Np003s+7MNk8dGWRDg5ErlDjeX19hhLkIO2yMLXD6kkEHcfvOdy46OvEIXC/efmF4GTxQn0Qjy/DfRuBmZFkucsXfVLDOBNCwM8GchX4mMgvBJPxBpNCQ0pfQJ0llIqc04uGyKvRoHBOPV2SYMbqqTpvRqPrUJZ4TRTDxcKRKv4c+D6QUQbs4cU/CeSAfbQGAsjMLnM+Wn5IE2KerkgjcjsA1o5RnR4u3axKl4OXAZNfd7Ti7fTNGYt7mgrvZ02Myuf8XJ3d///YVobunS/bFkS/3iqMX+cnybfk6x4DRl9gn6V2MEbhaBSEnkyX20rMroXQhC/giEiNTHPo4CTZz04G8N2GcvO18a3OLhSARoQvC2VpMkReAsXxDSSAHyPGqUIedzwlKmnClDEsGzLe3LjASBxKmn4DMcnoOmWlXITPNBsicBshscCzM4PuQEV8YnhxSxgM6pzGObgvpVRXUYs7vlC4lev8QztfSi+AXTqtAZ89MH7QfWfFe9IV5ZM8H5b4Isznhe+bZzZpiJMI8fK2+R+eotyDqCVAX2LL1X/L6zeApHVzAfHjzVj55s5YjDdpCB2prc+klY3hdmrCkYcyT0p0fU0FhrtsAIc0VuEZN89kdCx5sX+3j1DAVavxB+IqyZyG8x7EfEbbHqZnvO7VZGEXXNKJscy3wMXFnnpAnnNFnUjqDPJdMZx15PlSF0rJVz2daDZ4PHS1YoF7YYIe2ZLe0JQvqdH22AvuvX8V4QthrKD5vN8nfoXXVBmokn81mltdIch9NEUQdZURG30g+PjeOo5Ycd3VS3FR9+CeB3TS04q76lk+aVrXWF9CpLzUCfyPLKPTEo2lcjgkdRoLTpDv1SACg7khgur2wjQ457rbluKOT402wo4inBFxiQXI0T4+rvP8zYAT7S0qjfK54dHn6AC3CdvtmEVaLAtuwLCKvJr1vEmOtYboJ+P9tQowbyoEnjhJntyjOMpw2NqF1VZy/5s5cqMz/wRPdGusmuu6VwYVdXht8MS4MkabsXx5sRo+EhQIBwqSwYYeiG7OBLc1ml+aPW5l1avm1CWo7bPVKruHum3+cSi6wzs6ZtmUFOJQVhzlTdVMrdaYeFjJTUcFBdUXXI811xakLbWh04z5Rbdtwuy+hL09QN7BkuBJCleXDg1j/8sRUa7cFxOAMIN4mBfqSALVOe08w41OC09hxlN02SFzfboLataYAdbQRUd9tM7Wz2VJd8oRThufkCnvPJPa7JbSW7Z4eoKxWmx4Ixz7mqdN4wAlvoLMAgFexraIW05jUIJYiHIXzWAw9AViakV6lcAoHFV3KE4vQ9zdJTJMyq4lN5602NROATa02DbqxjqWbvIOnpJv5t0dxL0MAFiUt1NIebkaS8Aeebm6VgivTaXFfeDWCN+m9RJqYZBljR4ArzRIN7WBNrU3HA7wfu0EfdU2dJOygbX8N0LoRBNRAfIymgN2Q71JSB3bhjPuWvAO11iRjsRB2Eo57AbSpvfsC9KPNr0t/Mm7pT2ytBQCgrp3OoYlu2wWcd1xrZ7itJjUDZ7jdNmJCrS1G9mkippZ1FXR6R/N+pJI9aCyyD90la67P1zeIXLOmyswc5VU1bXZQqodqSjRwPwbbdgMjS6cfg6plDbpUXw/RrvadTti8GTLcUr0CsfYFFdzZyDgaZqm+DjHSXtmEamVz8PVjx+pbnoP60biutWSGrLaBU2/vtKWo6mxKZrCeDWoPouhMS2ZW74B2dDicLt1H23+9AIf24H6occqq/9M+eqdxav/84zROOWrm/zlW163JAztZhv80ecyau4CnIEM/fqFgSOw42LUc5sLV1dB1FBJZuPlwxb2DcLdt0OhPwq0ua4Rq5eJxQlTIhtYVUO+RaYK86TdijtYV4Kh5c9pYn34UC18xTxO6h5vB427BZlddobpxUuBVtyCBTwLxWf6Z4F5bujjO8XAXw+I3qrJwW/zSF7j9Dw==</diagram></mxfile>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment