Commit fff1436f authored by NILANJAN DAW's avatar NILANJAN DAW

Write Through based cache implemented for client side.

Fixed Client bottleneck caused by shared channel and gRPC stub.
Added a synchronous mode client with cache.
parent daca520b
app.name="HPDOS-Client" app.name="HPDOS-Client"
app.version="0.1.4" app.version="0.1.4"
app.mode=rps
app.thread_count=10 app.thread_count=10
app.rps=1000 app.rps=1000
app.concurrency=5
app.runtime=10 app.runtime=10
app.data_size=10 app.data_size=10
app.data_conversion_factor=B app.data_conversion_factor=B
......
...@@ -53,7 +53,7 @@ dependencies { ...@@ -53,7 +53,7 @@ dependencies {
application { application {
// Define the main class for the application. // Define the main class for the application.
mainClass = 'HpdosClient.ClientRunner' mainClass = 'HpdosClient.ClientRunnerSync'
} }
sourceSets { sourceSets {
......
...@@ -20,14 +20,12 @@ ...@@ -20,14 +20,12 @@
package HpdosClient; package HpdosClient;
import HpdosClient.MessageFormat.MessageConstants; import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.lib.StorageHandler;
import HpdosClient.lib.StorageModel; import HpdosClient.lib.StorageModel;
import HpdosClient.lib.StorageService; import HpdosClient.lib.StorageService;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import hpdos.grpc.Follower;
import hpdos.grpc.Packet; import hpdos.grpc.Packet;
import hpdos.grpc.Response; import hpdos.grpc.Response;
...@@ -38,19 +36,15 @@ import java.util.*; ...@@ -38,19 +36,15 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
public class ClientRunner { public class ClientRunner {
public static int concurrency;
public static int runtime; public static int runtime;
private final String clientID; private final String clientID;
public static String propertiesFile; public static String propertiesFile;
private int cCreate, cRead, cUpdate, cDelete; private int cCreate, cRead, cUpdate, cDelete;
public boolean experimentEnded = false; public boolean experimentEnded = false;
private List<Follower> replicaSet;
private Queue<Long> createTime, updateTime, readTime, deleteTime; private Queue<Long> createTime, updateTime, readTime, deleteTime;
private final Properties properties; private final Properties properties;
private final List<StorageModel> generatedPacket; private final List<StorageModel> generatedPacket;
private Semaphore limiter = null;
private int threadCount; private int threadCount;
private String mode;
private int rps; private int rps;
AtomicDouble openCount; AtomicDouble openCount;
public ClientRunner() { public ClientRunner() {
...@@ -63,11 +57,8 @@ public class ClientRunner { ...@@ -63,11 +57,8 @@ public class ClientRunner {
InputStream inputStream = new FileInputStream(propertiesFile); InputStream inputStream = new FileInputStream(propertiesFile);
this.properties.load(inputStream); this.properties.load(inputStream);
System.out.println((String) properties.get("app.concurrency")); System.out.println((String) properties.get("app.concurrency"));
concurrency = Integer.parseInt((String) properties.get("app.concurrency"));
this.rps = Integer.parseInt((String) properties.get("app.rps")); this.rps = Integer.parseInt((String) properties.get("app.rps"));
this.mode = (String) properties.get("app.mode");
this.threadCount = Integer.parseInt((String) properties.get("app.thread_count")); this.threadCount = Integer.parseInt((String) properties.get("app.thread_count"));
this.limiter = new Semaphore(concurrency);
runtime = Integer.parseInt((String) properties.get("app.runtime")); runtime = Integer.parseInt((String) properties.get("app.runtime"));
this.cCreate = Integer.parseInt((String) properties.get("app.cycle_create")); this.cCreate = Integer.parseInt((String) properties.get("app.cycle_create"));
...@@ -124,173 +115,53 @@ public class ClientRunner { ...@@ -124,173 +115,53 @@ public class ClientRunner {
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
latch.await(); latch.await();
// for (int i = 0; i < 100; i++) {
// int index = (int) (Math.random() * generatedPacket.size());
// try {
// Stopwatch stopwatch = Stopwatch.createUnstarted();
// stopwatch.start();
// ListenableFuture<StorageModel> res = storageService.read(generatedPacket.get(index).getKey());
// res.addListener(() -> {
// stopwatch.stop();
// try {
// System.out.println("Time: " + stopwatch + " model: " + res.get());
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// stopwatch.reset();
// }, MoreExecutors.directExecutor());
// } catch (ExecutionException | IOException e) {
// System.out.println("resource not found");
// }
// }
// Thread.sleep(2000);
} }
private void seedServer(StorageService storageService, String value, String threadId) throws InterruptedException {
public double runExperimentByConcurrency(StorageService storageService) throws InterruptedException, IOException, ExecutionException { CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
String value = createString(), updatedValue = createString(); String key = threadId + (int) (Math.random() * Integer.MAX_VALUE);
double totalBracket = this.cCreate + this.cRead + this.cUpdate + this.cDelete; ListenableFuture<Packet> res = storageService.create(key, value, MessageConstants.METADATA_ACCESS_PRIVATE);
double createBracket, readBracket, updateBracket, deleteBracket; StorageModel model = new StorageModel(0, value.length(), key,
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, 0, value);
createBracket = this.cCreate * 1.0 / totalBracket; res.addListener(() -> {
readBracket = createBracket + this.cRead / totalBracket; try {
updateBracket = readBracket + this.cUpdate / totalBracket; Packet packet = res.get();
deleteBracket = updateBracket + this.cDelete / totalBracket; for (Response response: packet.getResponseList()) {
do { if (response.getStatus() == MessageConstants.STATUS_OK) {
limiter.acquire(); model.updateData(response.getAck());
double toss = Math.random(); generatedPacket.add(model);
if (toss < createBracket) {
// System.out.println("create");
String key = Integer.toString((int) (Math.random() * Integer.MAX_VALUE));
StorageModel model = new StorageModel(0, value.length(), key,
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, 0, value);
final long createStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.create(model.getKey(), model.getValue(), model.getAccessType());
res.addListener(() -> {
try {
Packet packet = res.get();
this.createTime.add(System.currentTimeMillis() - createStart);
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) {
model.updateData(response.getAck());
generatedPacket.add(model);
}
// else
// System.out.println("error packet " + response);
} }
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} }
} catch (InterruptedException | ExecutionException e) {
limiter.release(); e.printStackTrace();
}, MoreExecutors.directExecutor());
} else if (toss < readBracket) {
// System.out.println("read");
int index; StorageModel sendPacket;
synchronized (generatedPacket) {
index = (int) (Math.random() * generatedPacket.size());
sendPacket = this.generatedPacket.get(index);
} }
final long readStart = System.currentTimeMillis(); latch.countDown();
ListenableFuture<StorageModel> res = storageService.read(sendPacket.getKey()); }, MoreExecutors.directExecutor());
res.addListener(() -> { }
try { latch.await();
StorageModel model = res.get();
readTime.add(System.currentTimeMillis() - readStart);
// System.out.println("packet " + packet);
synchronized (sendPacket) {
sendPacket.updateData(model);
}
//
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
limiter.release();
}, MoreExecutors.directExecutor());
} else if (toss < updateBracket) {
// System.out.println("update");
int index = (int) (Math.random() * generatedPacket.size());
StorageModel sendPacket = this.generatedPacket.get(index);
final long updateStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.update(sendPacket.getKey(), updatedValue,
sendPacket.getVersion());
res.addListener(() -> {
try {
Packet packet = res.get();
updateTime.add(System.currentTimeMillis() - updateStart);
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK)
if (sendPacket.getKey().equals(response.getAck().getKey())) {
synchronized (sendPacket) {
sendPacket.updateData(response.getAck());
}
}
// else
// System.out.println("error packet " + response);
}
limiter.release();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, MoreExecutors.directExecutor());
} else {
// System.out.println("delete");
int index = (int) (Math.random() * generatedPacket.size());
StorageModel sendPacket = this.generatedPacket.get(index);
final long deleteStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.delete(sendPacket.getKey(), sendPacket.getVersion());
res.addListener(() -> {
try {
Packet packet = res.get();
deleteTime.add(System.currentTimeMillis() - deleteStart);
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK)
if (sendPacket.getKey().equals(response.getAck().getKey())) {
synchronized (generatedPacket) {
generatedPacket.remove(sendPacket);
}
}
// else
// System.out.println("error packet " + response);
}
limiter.release();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, MoreExecutors.directExecutor());
// deleteTime.add(System.currentTimeMillis() - timestampDeleteStart);
}
} while (!this.experimentEnded);
// System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) +
// "ms qps " + qps);
return 0;
} }
public double runExperimentByRps(StorageService storageService) throws InterruptedException, ExecutionException { public double runExperimentByRps(String threadId) throws InterruptedException, ExecutionException {
String value = createString(), updatedValue = createString(); String value = createString(), updatedValue = createString();
StorageService storageService = new StorageService(clientID);
storageService.initStorage();
this.seedServer(storageService, value, threadId);
double totalBracket = this.cCreate + this.cRead + this.cUpdate + this.cDelete; double totalBracket = this.cCreate + this.cRead + this.cUpdate + this.cDelete;
double createBracket, readBracket, updateBracket, deleteBracket; double createBracket, readBracket, updateBracket;
createBracket = this.cCreate * 1.0 / totalBracket; createBracket = this.cCreate * 1.0 / totalBracket;
readBracket = createBracket + this.cRead / totalBracket; readBracket = createBracket + this.cRead / totalBracket;
updateBracket = readBracket + this.cUpdate / totalBracket; updateBracket = readBracket + this.cUpdate / totalBracket;
deleteBracket = updateBracket + this.cDelete / totalBracket;
long sleepTime = (threadCount * 1000L) / this.rps; long sleepTime = (threadCount * 1000L) / this.rps;
Thread.sleep((long)(Math.random() * 500)); // randomizing start time to avoid spikes in request Thread.sleep((long)(Math.random() * 500)); // randomizing start time to avoid spikes in request
do { do {
openCount.addAndGet(1); openCount.addAndGet(1);
double toss = Math.random(); double toss = Math.random();
if (toss < createBracket) { if (toss < createBracket) {
// System.out.println("create");
String key = Integer.toString((int) (Math.random() * Integer.MAX_VALUE)); String key = threadId + (int) (Math.random() * Integer.MAX_VALUE);
StorageModel model = new StorageModel(0, value.length(), key, StorageModel model = new StorageModel(0, value.length(), key,
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, 0, value); MessageConstants.METADATA_ACCESS_PRIVATE, clientID, 0, value);
final long createStart = System.currentTimeMillis(); final long createStart = System.currentTimeMillis();
...@@ -299,14 +170,12 @@ public class ClientRunner { ...@@ -299,14 +170,12 @@ public class ClientRunner {
try { try {
Packet packet = res.get(); Packet packet = res.get();
this.createTime.add(System.currentTimeMillis() - createStart); this.createTime.add(System.currentTimeMillis() - createStart);
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) { if (response.getStatus() == MessageConstants.STATUS_OK) {
model.updateData(response.getAck()); model.updateData(response.getAck());
generatedPacket.add(model); generatedPacket.add(model);
} }
// else
// System.out.println("error packet " + response);
} }
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -315,7 +184,6 @@ public class ClientRunner { ...@@ -315,7 +184,6 @@ public class ClientRunner {
openCount.addAndGet(-1); openCount.addAndGet(-1);
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} else if (toss < readBracket) { } else if (toss < readBracket) {
// System.out.println("read");
int index; StorageModel sendPacket; int index; StorageModel sendPacket;
synchronized (generatedPacket) { synchronized (generatedPacket) {
index = (int) (Math.random() * generatedPacket.size()); index = (int) (Math.random() * generatedPacket.size());
...@@ -343,7 +211,6 @@ public class ClientRunner { ...@@ -343,7 +211,6 @@ public class ClientRunner {
} }
} else if (toss < updateBracket) { } else if (toss < updateBracket) {
// System.out.println("update");
int index; StorageModel sendPacket; int index; StorageModel sendPacket;
synchronized (generatedPacket) { synchronized (generatedPacket) {
index = (int) (Math.random() * generatedPacket.size()); index = (int) (Math.random() * generatedPacket.size());
...@@ -365,8 +232,6 @@ public class ClientRunner { ...@@ -365,8 +232,6 @@ public class ClientRunner {
sendPacket.updateData(response.getAck()); sendPacket.updateData(response.getAck());
} }
} }
// else
// System.out.println("error packet " + response);
} }
openCount.addAndGet(-1); openCount.addAndGet(-1);
...@@ -395,22 +260,18 @@ public class ClientRunner { ...@@ -395,22 +260,18 @@ public class ClientRunner {
generatedPacket.remove(sendPacket); generatedPacket.remove(sendPacket);
} }
} }
// else
// System.out.println("error packet " + response);
} }
openCount.addAndGet(-1); openCount.addAndGet(-1);
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
// deleteTime.add(System.currentTimeMillis() - timestampDeleteStart);
} }
while (openCount.get() > this.rps) while (openCount.get() > this.rps)
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
} while (!this.experimentEnded); } while (!this.experimentEnded);
// System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) + storageService.cleanup();
// "ms qps " + qps);
return 0; return 0;
} }
...@@ -460,7 +321,7 @@ public class ClientRunner { ...@@ -460,7 +321,7 @@ public class ClientRunner {
+ " Update: " + updateQps + " Delete: " + deleteQps + " Total: " + totalQps); + " Update: " + updateQps + " Delete: " + deleteQps + " Total: " + totalQps);
totalRuntime /= 1000; totalRuntime /= 1000;
System.out.println("Total QPS: " + totalQps / totalRuntime + " avg query time: " + System.out.println("Total QPS: " + totalQps / totalRuntime + " avg query time: " +
(totalQps * concurrency / (totalRuntime))); (totalQps * threadCount / (totalRuntime)));
System.out.println("Read QPS: " + readQps / totalRuntime + " avg query time: " + avgRead); System.out.println("Read QPS: " + readQps / totalRuntime + " avg query time: " + avgRead);
System.out.println("Create QPS: " + createQps / totalRuntime + " avg query time: " + avgCreate); System.out.println("Create QPS: " + createQps / totalRuntime + " avg query time: " + avgCreate);
System.out.println("Update QPS: " + updateQps / totalRuntime + " avg query time: " + avgUpdate); System.out.println("Update QPS: " + updateQps / totalRuntime + " avg query time: " + avgUpdate);
...@@ -501,11 +362,10 @@ public class ClientRunner { ...@@ -501,11 +362,10 @@ public class ClientRunner {
ExecutorService experimentExecutors = Executors.newFixedThreadPool(clientRunner.threadCount); ExecutorService experimentExecutors = Executors.newFixedThreadPool(clientRunner.threadCount);
Set<Callable<Double>> callables = new HashSet<>(); Set<Callable<Double>> callables = new HashSet<>();
System.out.println("Starting experiment"); System.out.println("Starting experiment");
for (int i = 0; i < clientRunner.threadCount; i++) { for (int i = 1; i <= clientRunner.threadCount; i++) {
if (clientRunner.mode.equals("rps")) int finalI = i;
callables.add(() -> clientRunner.runExperimentByRps(storageService)); callables.add(() -> clientRunner.runExperimentByRps(Integer.toString(finalI)));
else
callables.add(() -> clientRunner.runExperimentByConcurrency(storageService));
} }
List<Future<Double>> futures = experimentExecutors.invokeAll(callables); List<Future<Double>> futures = experimentExecutors.invokeAll(callables);
for (Future<Double> future: futures) { for (Future<Double> future: futures) {
...@@ -513,7 +373,6 @@ public class ClientRunner { ...@@ -513,7 +373,6 @@ public class ClientRunner {
} }
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
System.out.println("Waiting for system to settle down"); System.out.println("Waiting for system to settle down");
Thread.sleep(2000);
double totalRuntime = endTime - startTime; double totalRuntime = endTime - startTime;
clientRunner.printStatistics(totalRuntime); clientRunner.printStatistics(totalRuntime);
clientRunner.cleanupExperiment(storageService); clientRunner.cleanupExperiment(storageService);
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package HpdosClient;
import HpdosClient.lib.StorageModel;
import HpdosClient.lib.StorageServiceSync;
import com.google.common.base.Stopwatch;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.*;
public class ClientRunnerSync {
public static int parallelCount;
public static int runtime;
private final String clientID;
public static String propertiesFile;
private int cCreate, cRead, cUpdate, cDelete;
public boolean experimentEnded = false;
private Queue<Long> createTime, updateTime, readTime, deleteTime;
private final Properties properties;
public ClientRunnerSync() {
clientID = UUID.randomUUID().toString();
properties = new Properties();
try {
InputStream inputStream = new FileInputStream(propertiesFile);
properties.load(inputStream);
parallelCount = Integer.parseInt((String) properties.get("app.thread_count"));
runtime = Integer.parseInt((String) properties.get("app.runtime"));
cCreate = Integer.parseInt((String) properties.get("app.cycle_create"));
cRead = Integer.parseInt((String) properties.get("app.cycle_read"));
cUpdate = Integer.parseInt((String) properties.get("app.cycle_update"));
cDelete = Integer.parseInt((String) properties.get("app.cycle_delete"));
createTime = new ConcurrentLinkedQueue<>();
updateTime = new ConcurrentLinkedQueue<>();
readTime = new ConcurrentLinkedQueue<>();
deleteTime = new ConcurrentLinkedQueue<>();
} catch (IOException e) {
e.printStackTrace();
}
}
public String getGreeting() {
return "Hello World!";
}
private String createString() {
double dataSize = Double.parseDouble((String) properties.get("app.data_size"));
dataSize /= 2.0; // Java strings are 2B long
String conversionFactor = (String) properties.get("app.data_conversion_factor");
int multiplier = 1;
switch (conversionFactor) {
case "G": multiplier *= 1000;
case "M": multiplier *= 1000;
case "K": multiplier *= 1000;
}
char[] data = new char[(int)(dataSize * multiplier)];
return new String(data);
}
public double runExperiment(String id, long experimentStartTime) {
StorageServiceSync storageService = new StorageServiceSync(this.clientID);
storageService.initStorage();
String value = createString(), updatedValue = createString();
for (;;) {
String key = id + (int) (Math.random() * Integer.MAX_VALUE);
for (int j = 0; j < cCreate; j++) {
long timestampCreateStart = storageService.create(key, value);
createTime.add(System.currentTimeMillis() - timestampCreateStart);
}
for (int j = 0; j < cRead; j++) {
AbstractMap.Entry<StorageModel, Long> data = storageService.read(key);
readTime.add(System.currentTimeMillis() - data.getValue());
}
for (int j = 0; j < cUpdate; j++) {
AbstractMap.Entry<StorageModel, Long> data = storageService.read(key);
long timestampUpdateStart = storageService.update(key, updatedValue,
data.getKey().getVersion());
readTime.add(System.currentTimeMillis() - data.getValue());
updateTime.add(System.currentTimeMillis() - timestampUpdateStart);
}
for (int j = 0; j < cDelete; j++) {
AbstractMap.Entry<StorageModel, Long> data = storageService.read(key);
long timestampDeleteStart = storageService.delete(key, data.getKey().getVersion());
readTime.add(System.currentTimeMillis() - data.getValue());
deleteTime.add(System.currentTimeMillis() - timestampDeleteStart);
}
long currentTime = System.currentTimeMillis();
if ((currentTime - experimentStartTime) >= runtime * 1000L)
break;
}
System.out.println(storageService.getCache().stats());
storageService.cleanup();
return 0;
}
private void timerService() {
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
if (experimentEnded) {
timer.cancel();
timer.purge();
}
System.out.println("Experiment ran: " + stopwatch);
}
}, 5000, 5000);
}
private void printStatistics(double totalRuntime) {
long readQps = 0, createQps = 0, updateQps = 0, deleteQps = 0;
double avgRead = 0, avgCreate = 0, avgUpdate = 0, avgDelete = 0;
for (Long time: this.readTime) {
readQps++;
avgRead += time;
}
avgRead /= readQps * 1.0;
for (Long time: this.createTime) {
createQps++;
avgCreate += time;
}
avgCreate /= createQps * 1.0;
for (Long time: this.updateTime) {
updateQps++;
avgUpdate += time;
}
avgUpdate /= updateQps * 1.0;
for (Long time: this.deleteTime) {
deleteQps++;
avgDelete += time;
}
avgDelete /= deleteQps * 1.0;
double totalQps = readQps + createQps + updateQps + deleteQps;
System.out.println("Total runtime: " + totalRuntime);
System.out.println("Read: " + readQps + " Create: " + createQps
+ " Update: " + updateQps + " Delete: " + deleteQps + " Total: " + totalQps);
totalRuntime /= 1000;
System.out.println("Total QPS: " + totalQps / totalRuntime + " avg query time: " +
(totalRuntime * parallelCount / (totalQps)));
System.out.println("Read QPS: " + readQps / totalRuntime + " avg query time: " + avgRead);
System.out.println("Create QPS: " + createQps / totalRuntime + " avg query time: " + avgCreate);
System.out.println("Update QPS: " + updateQps / totalRuntime + " avg query time: " + avgUpdate);
System.out.println("Delete QPS: " + deleteQps / totalRuntime + " avg query time: " + avgDelete);
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
propertiesFile = args[0];
ClientRunnerSync clientRunner = new ClientRunnerSync();
System.out.println(clientRunner.getGreeting());
System.out.println("Using Sync Server. Thread count: " + parallelCount + " runtime: " + runtime + "s");
ExecutorService executorService = Executors.newFixedThreadPool(parallelCount);
Thread.sleep(1000); // let things settle down a bit
Set<Callable<Double>> callables = new HashSet<>();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < parallelCount; i++) {
int finalI = i;
callables.add(() -> clientRunner.runExperiment(Integer.toString(finalI), startTime));
}
clientRunner.timerService();
List<Future<Double>> futures = executorService.invokeAll(callables);
for (Future<Double> future: futures) {
future.get();
}
clientRunner.experimentEnded = true;
long endTime = System.currentTimeMillis();
double totalRuntime = endTime - startTime;
clientRunner.printStatistics(totalRuntime);
executorService.shutdown();
executorService.awaitTermination(2, TimeUnit.SECONDS);
}
}
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient.lib;
import com.google.common.util.concurrent.ListenableFuture;
import hpdos.grpc.Packet;
public class StorageHandler {
private boolean isResultReady;
private Packet packet;
private ListenableFuture<Packet> listener;
public StorageHandler(boolean isResultReady, Packet packet, ListenableFuture<Packet> listener) {
this.isResultReady = isResultReady;
this.packet = packet;
this.listener = listener;
}
public boolean isResultReady() {
return isResultReady;
}
public void setResultReady(boolean resultReady) {
isResultReady = resultReady;
}
public Packet getPacket() {
return packet;
}
public void setPacket(Packet packet) {
this.packet = packet;
}
public ListenableFuture<Packet> getListener() {
return listener;
}
public void setListener(ListenableFuture<Packet> listener) {
this.listener = listener;
}
}
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient.lib;
import HpdosClient.ConfigConstants;
import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.MessageFormat.RequestBuilder;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import hpdos.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class StorageServiceSync {
private final String clientID;
private ManagedChannel masterChannel;
private ArrayList<ManagedChannel> channels;
private List<Follower> replicaSet;
private final LoadingCache<String, StorageModel> cache;
public StorageServiceSync(String clientID) {
this.clientID = clientID;
this.cache = CacheBuilder
.newBuilder()
.maximumSize(10000)
.recordStats()
.build(new CacheLoader<>() {
@Override
public StorageModel load(@Nonnull String key) throws Exception {
int index = (int) (Math.random() * channels.size());
NetworkServiceGrpc.NetworkServiceBlockingStub stub =
NetworkServiceGrpc.newBlockingStub(channels.get(index));
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ""));
Packet requestPacket = RequestBuilder.buildPacket(request);
Packet responsePacket = stub.readMetadata(requestPacket);
Response response = responsePacket.getResponse(0);
if (response.getStatus() == MessageConstants.STATUS_OK) {
Ack ack = response.getAck();
return new StorageModel(ack.getVersion(), ack.getDataSize(), ack.getKey(),
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ack.getCrc(), ack.getValue());
} else {
throw new IOException("Resource not found");
}
}
});
}
public LoadingCache<String, StorageModel> getCache() {
return cache;
}
public void retrieveFollowerList() {
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(this.masterChannel);
ResponseList responseList = stub.getReadReplicaList(null);
this.replicaSet = responseList.getFollowerList();
}
public void cleanup() {
for (ManagedChannel channel: this.channels)
channel.shutdown();
}
public void initStorage() {
masterChannel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
.build();
channels = new ArrayList<>();
channels.add(masterChannel);
retrieveFollowerList();
for (Follower follower: replicaSet) {
ManagedChannel channel = ManagedChannelBuilder.
forAddress(follower.getIp(), follower.getPort())
.usePlaintext()
.build();
channels.add(channel);
}
}
// create a metadata block
public long create(String key, String value) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE,
0, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
NetworkServiceGrpc.NetworkServiceBlockingStub masterStub = NetworkServiceGrpc.newBlockingStub(masterChannel);
Packet response = masterStub.createMetadata(packet);
return timestampCreateStart;
}
// read back the metadata
public Map.Entry<StorageModel, Long> read(String key) {
long timestampReadStart = System.currentTimeMillis();
try {
StorageModel model = cache.get(key);
return new AbstractMap.SimpleEntry<>(model, timestampReadStart);
} catch (ExecutionException e) {
e.printStackTrace();
}
return new AbstractMap.SimpleEntry<>(null, timestampReadStart);
}
public long update(String key, String value, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_UPDATE,
version, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
NetworkServiceGrpc.NetworkServiceBlockingStub masterStub = NetworkServiceGrpc.newBlockingStub(masterChannel);
Packet responsePacket = masterStub.updateMetadata(packet);
if (responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
cache.invalidate(key);
return timestampCreateStart;
}
public long delete(String key, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_DELETE,
version, 0, key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
long timestampCreateStart = System.currentTimeMillis();
NetworkServiceGrpc.NetworkServiceBlockingStub masterStub = NetworkServiceGrpc.newBlockingStub(masterChannel);
Packet responsePacket = masterStub.deleteMetadata(packet);
if (responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
cache.invalidate(key);
return timestampCreateStart;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment