Commit a5d9a657 authored by NILANJAN DAW's avatar NILANJAN DAW

Better shutdown cleanup for both server and client.

Added better metrics to client.
Client now configurable through config file
parent 3c470c7b
app.name="HPDOS-Client"
app.version="0.1.4"
app.thread_count=12
app.runtime=15
app.cycle_create=1
app.cycle_read=4
app.cycle_update=3
app.cycle_delete=1
...@@ -5,6 +5,7 @@ package HpdosClient; ...@@ -5,6 +5,7 @@ package HpdosClient;
import HpdosClient.MessageFormat.MessageConstants; import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.MessageFormat.RequestBuilder; import HpdosClient.MessageFormat.RequestBuilder;
import com.google.common.base.Stopwatch;
import hpdos.grpc.NetworkServiceGrpc; import hpdos.grpc.NetworkServiceGrpc;
import hpdos.grpc.Packet; import hpdos.grpc.Packet;
import hpdos.grpc.PacketFormatProto; import hpdos.grpc.PacketFormatProto;
...@@ -12,17 +13,44 @@ import hpdos.grpc.Request; ...@@ -12,17 +13,44 @@ import hpdos.grpc.Request;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.time.Instant; import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
public class ClientRunner { public class ClientRunner {
public static final int parallelCount = 10; public static int parallelCount = 10;
public static int runtime = 0;
private final String clientID; private final String clientID;
public static String propertiesFile;
private int cCreate, cRead, cUpdate, cDelete;
private Queue<Long> createTime, updateTime, readTime, deleteTime;
public ClientRunner() { public ClientRunner() {
clientID = UUID.randomUUID().toString(); clientID = UUID.randomUUID().toString();
Properties properties = new Properties();
try {
InputStream inputStream = new FileInputStream(propertiesFile);
properties.load(inputStream);
parallelCount = Integer.parseInt((String) properties.get("app.thread_count"));
runtime = Integer.parseInt((String) properties.get("app.runtime"));
cCreate = Integer.parseInt((String) properties.get("app.cycle_create"));
cRead = Integer.parseInt((String) properties.get("app.cycle_read"));
cUpdate = Integer.parseInt((String) properties.get("app.cycle_update"));
cDelete = Integer.parseInt((String) properties.get("app.cycle_delete"));
createTime = new ConcurrentLinkedQueue<>();
updateTime = new ConcurrentLinkedQueue<>();
readTime = new ConcurrentLinkedQueue<>();
deleteTime = new ConcurrentLinkedQueue<>();
} catch (IOException e) {
e.printStackTrace();
}
} }
public String getGreeting() { public String getGreeting() {
return "Hello World!"; return "Hello World!";
...@@ -74,69 +102,116 @@ public class ClientRunner { ...@@ -74,69 +102,116 @@ public class ClientRunner {
return timestampCreateStart; return timestampCreateStart;
} }
public double initClient(String id) { public double initClient(String id, long experimentStartTime) {
final ManagedChannel channel = ManagedChannelBuilder. final ManagedChannel channel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT) forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext() .usePlaintext()
.build(); .build();
double averageReadTime = 0, averageWriteTime = 0, averageUpdateTime = 0, averageDeleteTime = 0;
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
for (int i = 1; i <= 1000; i++) { double i = 0;
for (;;i++) {
String key = id + (int) (Math.random() * Integer.MAX_VALUE), String key = id + (int) (Math.random() * Integer.MAX_VALUE),
value = "dummy", value = "dummy",
updatedValue = "dummyUpdated"; updatedValue = "dummyUpdated";
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel); NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel);
long timestampCreateStart = create(stub, key, value); for (int j = 0; j < cCreate; j++) {
averageWriteTime = (averageWriteTime * (i - 1) + (System.currentTimeMillis() - timestampCreateStart)) / i; long timestampCreateStart = create(stub, key, value);
AbstractMap.Entry<Packet, Long> data = read(stub, key); createTime.add(System.currentTimeMillis() - timestampCreateStart);
averageReadTime = (averageReadTime * (i - 1) + (System.currentTimeMillis() - data.getValue())) / i; }
long timestampUpdateStart = update(stub, key, updatedValue, data.getKey().getResponse(0).getAck().getVersion()); for (int j = 0; j < cRead; j++) {
averageUpdateTime = (averageUpdateTime * (i - 1) + (System.currentTimeMillis() - timestampUpdateStart)) / i; AbstractMap.Entry<Packet, Long> data = read(stub, key);
data = read(stub, key); readTime.add(System.currentTimeMillis() - data.getValue());
averageReadTime = (averageReadTime * (i - 1) + (System.currentTimeMillis() - data.getValue())) / i; }
long timestampDeleteStart = delete(stub, key, data.getKey().getResponse(0).getAck().getVersion());
averageDeleteTime = (averageDeleteTime * (i - 1) + (System.currentTimeMillis() - timestampDeleteStart)) / i; for (int j = 0; j < cUpdate; j++) {
data = read(stub, key); AbstractMap.Entry<Packet, Long> data = read(stub, key);
averageReadTime = (averageReadTime * (i - 1) + (System.currentTimeMillis() - data.getValue())) / i; long timestampUpdateStart = update(stub, key, updatedValue,
data.getKey().getResponse(0).getAck().getVersion());
if (i % 1000 == 0) readTime.add(System.currentTimeMillis() - data.getValue());
System.out.println(id + "Avg MDS time read: " updateTime.add(System.currentTimeMillis() - timestampUpdateStart);
+ averageReadTime }
+ " ms\t create: " + averageWriteTime
+ " ms\t update: " + averageUpdateTime for (int j = 0; j < cDelete; j++) {
+ " ms\t delete: " + averageDeleteTime + "ms" AbstractMap.Entry<Packet, Long> data = read(stub, key);
); long timestampDeleteStart = delete(stub, key, data.getKey().getResponse(0).getAck().getVersion());
readTime.add(System.currentTimeMillis() - data.getValue());
deleteTime.add(System.currentTimeMillis() - timestampDeleteStart);
}
long currentTime = System.currentTimeMillis();
if ((currentTime - experimentStartTime) >= runtime * 1000L)
break;
} }
double qps = (1000 * 1000 / (System.currentTimeMillis() * 1.0 - startTime));
// System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) + // System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) +
// "ms qps " + qps); // "ms qps " + qps);
channel.shutdown(); channel.shutdown();
return qps; return 0;
} }
public static void main(String[] args) throws InterruptedException, ExecutionException { public static void main(String[] args) throws InterruptedException, ExecutionException {
propertiesFile = args[0];
ClientRunner clientRunner = new ClientRunner(); ClientRunner clientRunner = new ClientRunner();
System.out.println(clientRunner.getGreeting()); System.out.println(clientRunner.getGreeting());
System.out.println("Thread count: " + parallelCount + " runtime: " + runtime + "s");
Thread.sleep(1000); Thread.sleep(1000);
ExecutorService executorService = Executors.newFixedThreadPool(parallelCount); ExecutorService executorService = Executors.newFixedThreadPool(parallelCount);
Set<Callable<Double>> callables = new HashSet<>(); Set<Callable<Double>> callables = new HashSet<>();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < parallelCount; i++) { for (int i = 0; i < parallelCount; i++) {
int finalI = i; int finalI = i;
callables.add(() -> clientRunner.initClient(Integer.toString(finalI))); callables.add(() -> clientRunner.initClient(Integer.toString(finalI), startTime));
} }
long startTime = System.currentTimeMillis(); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor() ;
Stopwatch stopwatch = Stopwatch.createUnstarted();
Runnable task = () -> {
System.out.println( "Experiment running: " + stopwatch );
};
scheduledExecutorService.schedule( task, 5, TimeUnit.SECONDS) ;
stopwatch.start();
List<Future<Double>> futures = executorService.invokeAll(callables); List<Future<Double>> futures = executorService.invokeAll(callables);
double qps = 0; double qps = 0;
for (Future<Double> future: futures) { for (Future<Double> future: futures) {
qps += future.get(); future.get();
} }
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
System.out.println(); double totalRuntime = endTime - startTime;
System.out.println("QPS Read: " + 3 * qps + " Write: " + qps + " Update: " + qps + " Delete: " + qps); long readQps = 0, createQps = 0, updateQps = 0, deleteQps = 0;
System.out.println("QPS Total: " + 6 * qps + "avg query time: " + ((6 * qps) / (endTime - startTime))); double avgRead = 0, avgCreate = 0, avgUpdate = 0, avgDelete = 0;
for (Long time: clientRunner.readTime) {
readQps++;
avgRead += time;
}
avgRead /= readQps * 1.0;
for (Long time: clientRunner.createTime) {
createQps++;
avgCreate += time;
}
avgCreate /= createQps * 1.0;
for (Long time: clientRunner.updateTime) {
updateQps++;
avgUpdate += time;
}
avgUpdate /= updateQps * 1.0;
for (Long time: clientRunner.deleteTime) {
deleteQps++;
avgDelete += time;
}
avgDelete /= deleteQps * 1.0;
double totalQps = readQps + createQps + updateQps + deleteQps;
System.out.println("Total runtime: " + totalRuntime);
System.out.println("QPS Read: " + readQps + " Create: " + createQps + " Update: " + updateQps + " Delete: " + deleteQps);
System.out.println("QPS Total: " + totalQps + " avg query time[thread amortized]: " + (totalQps / totalRuntime));
System.out.println("Read Total: " + readQps + " avg query time: " + avgRead);
System.out.println("Create Total: " + createQps + " avg query time: " + avgCreate);
System.out.println("Update Total: " + updateQps + " avg query time: " + avgUpdate);
System.out.println("Delete Total: " + deleteQps + " avg query time: " + avgDelete);
// clientRunner.initClient(); // clientRunner.initClient();
executorService.shutdown();
scheduledExecutorService.shutdown();
executorService.awaitTermination(2, TimeUnit.SECONDS);
scheduledExecutorService.awaitTermination(2, TimeUnit.SECONDS);
} }
} }
...@@ -130,8 +130,13 @@ public class MetadataServer { ...@@ -130,8 +130,13 @@ public class MetadataServer {
} }
private void cleanup () { private void cleanup () {
if (this.replicationService != null) if (this.replicationService != null) {
this.replicationService.cleanup(); try {
this.replicationService.cleanup();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
public static void main(String[] args) { public static void main(String[] args) {
MetadataServer metaDataServer = new MetadataServer(); MetadataServer metaDataServer = new MetadataServer();
......
...@@ -41,23 +41,23 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { ...@@ -41,23 +41,23 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
} }
stopwatch.stop(); stopwatch.stop();
System.out.println("Added to local memory " + stopwatch); // System.out.println("Added to local memory " + stopwatch);
stopwatch.reset(); stopwatch.reset();
stopwatch.start(); stopwatch.start();
Packet packet = replicate(requestPacket); Packet packet = replicate(requestPacket);
stopwatch.stop(); stopwatch.stop();
System.out.println("Replication time " + stopwatch); // System.out.println("Replication time " + stopwatch);
responseObserver.onNext(packet); responseObserver.onNext(packet);
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
@Override @Override
public void updateMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) { public void updateMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) {
System.out.println("new create request " + requestPacket); // System.out.println("new create request " + requestPacket);
for (Request request: requestPacket.getRequestList()) { for (Request request: requestPacket.getRequestList()) {
ioHandler.update(request); ioHandler.update(request);
} }
System.out.println("Added to local memory"); // System.out.println("Added to local memory");
Packet packet = replicate(requestPacket); Packet packet = replicate(requestPacket);
responseObserver.onNext(packet); responseObserver.onNext(packet);
responseObserver.onCompleted(); responseObserver.onCompleted();
...@@ -65,11 +65,11 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { ...@@ -65,11 +65,11 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
@Override @Override
public void deleteMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) { public void deleteMetadata(Packet requestPacket, StreamObserver<Packet> responseObserver) {
System.out.println("new create request " + requestPacket); // System.out.println("new create request " + requestPacket);
for (Request request: requestPacket.getRequestList()) { for (Request request: requestPacket.getRequestList()) {
ioHandler.delete(request); ioHandler.delete(request);
} }
System.out.println("Added to local memory"); // System.out.println("Added to local memory");
Packet packet = replicate(requestPacket); Packet packet = replicate(requestPacket);
responseObserver.onNext(packet); responseObserver.onNext(packet);
responseObserver.onCompleted(); responseObserver.onCompleted();
...@@ -80,16 +80,16 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { ...@@ -80,16 +80,16 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
buildReplicationRequest(new ArrayList<>(requestPacket.getRequestList())); buildReplicationRequest(new ArrayList<>(requestPacket.getRequestList()));
ReplicationResponse replicationResponse = null; ReplicationResponse replicationResponse = null;
try { try {
System.out.println("starting replication"); // System.out.println("starting replication");
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start(); stopwatch.start();
replicationResponse = replicationService.replicateMetadata(replicationRequest); replicationResponse = replicationService.replicateMetadata(replicationRequest);
stopwatch.stop(); stopwatch.stop();
System.out.println("Network handler replicate" + stopwatch); // System.out.println("Network handler replicate" + stopwatch);
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("replication complete"); // System.out.println("replication complete");
Packet packet = ResponseBuilder.buildPacket(new ArrayList<>(replicationResponse.getResponseList())); Packet packet = ResponseBuilder.buildPacket(new ArrayList<>(replicationResponse.getResponseList()));
// System.out.println(packet); // System.out.println(packet);
return packet; return packet;
......
...@@ -34,9 +34,11 @@ public class InlineReplicationService implements ReplicationService { ...@@ -34,9 +34,11 @@ public class InlineReplicationService implements ReplicationService {
} }
@Override @Override
public void cleanup() { public void cleanup() throws InterruptedException {
for (ManagedChannel channel: channels.values()) for (ManagedChannel channel: channels.values())
channel.shutdown(); channel.shutdown();
executorService.shutdown();
executorService.awaitTermination(MessageConstants.STATUS_REPLICATION_TIMEOUT, TimeUnit.MILLISECONDS);
} }
private void establishChannels() { private void establishChannels() {
...@@ -89,7 +91,7 @@ public class InlineReplicationService implements ReplicationService { ...@@ -89,7 +91,7 @@ public class InlineReplicationService implements ReplicationService {
} }
} }
stopwatch.stop(); stopwatch.stop();
System.out.println("replicateMetadata ReplicationService " + stopwatch); // System.out.println("replicateMetadata ReplicationService " + stopwatch);
return ResponseBuilder. return ResponseBuilder.
buildReplicationResponse(new ArrayList<>(responseHashMap.values())); buildReplicationResponse(new ArrayList<>(responseHashMap.values()));
} }
......
...@@ -8,5 +8,5 @@ import java.util.concurrent.ExecutionException; ...@@ -8,5 +8,5 @@ import java.util.concurrent.ExecutionException;
public interface ReplicationService { public interface ReplicationService {
abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException; abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException;
abstract ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest); abstract ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest);
abstract void cleanup(); abstract void cleanup() throws InterruptedException;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment