Commit b535b3ce authored by NILANJAN DAW's avatar NILANJAN DAW

Added random read from replicaset

parent a5d9a657
app.name="HPDOS-Client"
app.version="0.1.4"
app.thread_count=12
app.runtime=15
app.runtime=30
app.cycle_create=1
app.cycle_read=4
app.cycle_update=3
......
......@@ -6,19 +6,13 @@ package HpdosClient;
import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.MessageFormat.RequestBuilder;
import com.google.common.base.Stopwatch;
import hpdos.grpc.NetworkServiceGrpc;
import hpdos.grpc.Packet;
import hpdos.grpc.PacketFormatProto;
import hpdos.grpc.Request;
import hpdos.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
......@@ -28,7 +22,8 @@ public class ClientRunner {
private final String clientID;
public static String propertiesFile;
private int cCreate, cRead, cUpdate, cDelete;
public boolean experimentEnded = false;
private List<Follower> replicaSet;
private Queue<Long> createTime, updateTime, readTime, deleteTime;
public ClientRunner() {
......@@ -69,7 +64,9 @@ public class ClientRunner {
}
// read back the metadata
public Map.Entry<Packet, Long> read(NetworkServiceGrpc.NetworkServiceBlockingStub stub, String key) {
public Map.Entry<Packet, Long> read(ArrayList<ManagedChannel> channels, String key) {
int rnd = new Random().nextInt(channels.size());
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channels.get(rnd));
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
......@@ -102,31 +99,38 @@ public class ClientRunner {
return timestampCreateStart;
}
public double initClient(String id, long experimentStartTime) {
final ManagedChannel channel = ManagedChannelBuilder.
public double runExperiment(String id, long experimentStartTime) {
final ManagedChannel masterChannel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
.build();
ArrayList<ManagedChannel> channels = new ArrayList<>();
channels.add(masterChannel);
for (Follower follower: replicaSet) {
ManagedChannel channel = ManagedChannelBuilder.
forAddress(follower.getIp(), follower.getPort())
.usePlaintext()
.build();
channels.add(channel);
}
long startTime = System.currentTimeMillis();
double i = 0;
for (;;i++) {
for (;;) {
String key = id + (int) (Math.random() * Integer.MAX_VALUE),
value = "dummy",
updatedValue = "dummyUpdated";
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel);
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(masterChannel);
for (int j = 0; j < cCreate; j++) {
long timestampCreateStart = create(stub, key, value);
createTime.add(System.currentTimeMillis() - timestampCreateStart);
}
for (int j = 0; j < cRead; j++) {
AbstractMap.Entry<Packet, Long> data = read(stub, key);
AbstractMap.Entry<Packet, Long> data = read(channels, key);
readTime.add(System.currentTimeMillis() - data.getValue());
}
for (int j = 0; j < cUpdate; j++) {
AbstractMap.Entry<Packet, Long> data = read(stub, key);
AbstractMap.Entry<Packet, Long> data = read(channels, key);
long timestampUpdateStart = update(stub, key, updatedValue,
data.getKey().getResponse(0).getAck().getVersion());
readTime.add(System.currentTimeMillis() - data.getValue());
......@@ -134,7 +138,7 @@ public class ClientRunner {
}
for (int j = 0; j < cDelete; j++) {
AbstractMap.Entry<Packet, Long> data = read(stub, key);
AbstractMap.Entry<Packet, Long> data = read(channels, key);
long timestampDeleteStart = delete(stub, key, data.getKey().getResponse(0).getAck().getVersion());
readTime.add(System.currentTimeMillis() - data.getValue());
deleteTime.add(System.currentTimeMillis() - timestampDeleteStart);
......@@ -145,73 +149,101 @@ public class ClientRunner {
}
// System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) +
// "ms qps " + qps);
channel.shutdown();
masterChannel.shutdown();
return 0;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
propertiesFile = args[0];
ClientRunner clientRunner = new ClientRunner();
System.out.println(clientRunner.getGreeting());
System.out.println("Thread count: " + parallelCount + " runtime: " + runtime + "s");
Thread.sleep(1000);
ExecutorService executorService = Executors.newFixedThreadPool(parallelCount);
Set<Callable<Double>> callables = new HashSet<>();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < parallelCount; i++) {
int finalI = i;
callables.add(() -> clientRunner.initClient(Integer.toString(finalI), startTime));
public void retrieveFollowerList() {
final ManagedChannel channel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
.build();
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channel);
ResponseList responseList = stub.getReadReplicaList(null);
this.replicaSet = responseList.getFollowerList();
for (Follower follower: this.replicaSet) {
System.out.println(follower);
}
channel.shutdown();
}
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor() ;
private void timerService() {
Stopwatch stopwatch = Stopwatch.createUnstarted();
Runnable task = () -> {
System.out.println( "Experiment running: " + stopwatch );
};
scheduledExecutorService.schedule( task, 5, TimeUnit.SECONDS) ;
stopwatch.start();
List<Future<Double>> futures = executorService.invokeAll(callables);
double qps = 0;
for (Future<Double> future: futures) {
future.get();
}
long endTime = System.currentTimeMillis();
double totalRuntime = endTime - startTime;
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (experimentEnded) {
timer.cancel();
timer.purge();
}
System.out.println("Experiment ran: " + stopwatch);
}
}, 5000, 5000);
}
private void printStatistics(double totalRuntime) {
long readQps = 0, createQps = 0, updateQps = 0, deleteQps = 0;
double avgRead = 0, avgCreate = 0, avgUpdate = 0, avgDelete = 0;
for (Long time: clientRunner.readTime) {
for (Long time: this.readTime) {
readQps++;
avgRead += time;
}
avgRead /= readQps * 1.0;
for (Long time: clientRunner.createTime) {
for (Long time: this.createTime) {
createQps++;
avgCreate += time;
}
avgCreate /= createQps * 1.0;
for (Long time: clientRunner.updateTime) {
for (Long time: this.updateTime) {
updateQps++;
avgUpdate += time;
}
avgUpdate /= updateQps * 1.0;
for (Long time: clientRunner.deleteTime) {
for (Long time: this.deleteTime) {
deleteQps++;
avgDelete += time;
}
avgDelete /= deleteQps * 1.0;
double totalQps = readQps + createQps + updateQps + deleteQps;
System.out.println("Total runtime: " + totalRuntime);
System.out.println("QPS Read: " + readQps + " Create: " + createQps + " Update: " + updateQps + " Delete: " + deleteQps);
System.out.println("QPS Total: " + totalQps + " avg query time[thread amortized]: " + (totalQps / totalRuntime));
System.out.println("Read Total: " + readQps + " avg query time: " + avgRead);
System.out.println("Create Total: " + createQps + " avg query time: " + avgCreate);
System.out.println("Update Total: " + updateQps + " avg query time: " + avgUpdate);
System.out.println("Delete Total: " + deleteQps + " avg query time: " + avgDelete);
// clientRunner.initClient();
System.out.println("Read: " + readQps + " Create: " + createQps
+ " Update: " + updateQps + " Delete: " + deleteQps + " Total: " + totalQps);
totalRuntime /= 1000;
System.out.println("Total QPS: " + totalQps / totalRuntime + " avg query time: " +
(totalQps * parallelCount / (totalRuntime)));
System.out.println("Read QPS: " + readQps / totalRuntime + " avg query time: " + avgRead);
System.out.println("Create QPS: " + createQps / totalRuntime + " avg query time: " + avgCreate);
System.out.println("Update QPS: " + updateQps / totalRuntime + " avg query time: " + avgUpdate);
System.out.println("Delete QPS: " + deleteQps / totalRuntime + " avg query time: " + avgDelete);
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
propertiesFile = args[0];
ClientRunner clientRunner = new ClientRunner();
System.out.println(clientRunner.getGreeting());
System.out.println("Thread count: " + parallelCount + " runtime: " + runtime + "s");
ExecutorService executorService = Executors.newFixedThreadPool(parallelCount);
Thread.sleep(1000); // let things settle down a bit
clientRunner.retrieveFollowerList();
Set<Callable<Double>> callables = new HashSet<>();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < parallelCount; i++) {
int finalI = i;
callables.add(() -> clientRunner.runExperiment(Integer.toString(finalI), startTime));
}
clientRunner.timerService();
List<Future<Double>> futures = executorService.invokeAll(callables);
for (Future<Double> future: futures) {
future.get();
}
clientRunner.experimentEnded = true;
long endTime = System.currentTimeMillis();
double totalRuntime = endTime - startTime;
clientRunner.printStatistics(totalRuntime);
executorService.shutdown();
scheduledExecutorService.shutdown();
executorService.awaitTermination(2, TimeUnit.SECONDS);
scheduledExecutorService.awaitTermination(2, TimeUnit.SECONDS);
}
}
......@@ -14,6 +14,7 @@ service NetworkService {
rpc createMetadata(Packet) returns (Packet) {}
rpc updateMetadata(Packet) returns (Packet) {}
rpc deleteMetadata(Packet) returns (Packet) {}
rpc getReadReplicaList(RequestList) returns (ResponseList) {}
}
......@@ -53,4 +54,19 @@ message Ack {
message Nack {
string key = 2;
int32 version = 3;
}
message RequestList {}
message ResponseList {
int32 operationType = 1;
int32 status = 2;
repeated Follower follower = 3;
}
message Follower {
string ip = 1;
int32 port = 2;
string followerID = 3;
int64 lastSeen = 4;
}
\ No newline at end of file
......@@ -2,12 +2,14 @@ package hpdos.handler;
import com.google.common.base.Stopwatch;
import hpdos.grpc.*;
import hpdos.lib.MasterFollower;
import hpdos.lib.ReplicationService;
import hpdos.message.RequestBuilder;
import hpdos.message.ResponseBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
......@@ -75,6 +77,24 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
responseObserver.onCompleted();
}
@Override
public void getReadReplicaList(RequestList requestList,
StreamObserver<ResponseList> responseListStreamObserver) {
HashMap<String, MasterFollower> followers = replicationService.getFollowers();
ResponseList.Builder responseList = ResponseList.newBuilder();
for (MasterFollower follower: followers.values()) {
Follower.Builder replica = Follower.newBuilder();
replica.setIp(follower.getIp());
replica.setPort(follower.getPort());
replica.setFollowerID(follower.getFollowerID());
replica.setLastSeen(follower.getLastSeen());
replica.build();
responseList.addFollower(replica);
}
responseListStreamObserver.onNext(responseList.build());
responseListStreamObserver.onCompleted();
}
private Packet replicate(Packet requestPacket) {
ReplicationRequest replicationRequest = RequestBuilder.
buildReplicationRequest(new ArrayList<>(requestPacket.getRequestList()));
......
......@@ -104,6 +104,11 @@ public class InlineReplicationService implements ReplicationService {
ListenableFuture<ReplicationResponse> res = stub.replicateMetadata(replicationRequest);
}
return null;
throw new UnsupportedOperationException("Implementation not complete");
}
@Override
public HashMap<String, MasterFollower> getFollowers() {
return followers;
}
}
......@@ -3,10 +3,12 @@ package hpdos.lib;
import hpdos.grpc.ReplicationRequest;
import hpdos.grpc.ReplicationResponse;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
public interface ReplicationService {
abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException;
abstract ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest);
abstract void cleanup() throws InterruptedException;
abstract HashMap<String, MasterFollower> getFollowers();
}
......@@ -14,6 +14,7 @@ service NetworkService {
rpc createMetadata(Packet) returns (Packet) {}
rpc updateMetadata(Packet) returns (Packet) {}
rpc deleteMetadata(Packet) returns (Packet) {}
rpc getReadReplicaList(RequestList) returns (ResponseList) {}
}
......@@ -53,4 +54,19 @@ message Ack {
message Nack {
string key = 2;
int32 version = 3;
}
message RequestList {}
message ResponseList {
int32 operationType = 1;
int32 status = 2;
repeated Follower follower = 3;
}
message Follower {
string ip = 1;
int32 port = 2;
string followerID = 3;
int64 lastSeen = 4;
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment