Commit 397f695d authored by Paras Garg's avatar Paras Garg

Fixed request rate for benchmarking script

parent d64d2cc0
......@@ -12,6 +12,6 @@ writes=0.8
deletes=0.1
clients=1
requests_per_client=100000
requests_per_second=1000000
requests_per_second=5000000
key_size=10
value_size=10
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.log4j.Logger;
import hpdos.handlers.NetworkHandler;
public class BenchmarkingClient {
public Properties properties;
public double runExperiment(String threadId, MetadataClient client) throws Exception{
......@@ -23,7 +30,7 @@ public class BenchmarkingClient {
long sleepTimeMicros = Double.valueOf(sleepTime).longValue();
logger.debug(sleepTimeMicros);
Thread.sleep((long)(Math.random() * 500)); // randomizing start time to avoid spikes in request
Double requestsPerClient = BenchmarkingProperties.requestsPerClient;
long requestsPerClient = BenchmarkingProperties.requestsPerClient;
double totalBracket = BenchmarkingProperties.reads
+ BenchmarkingProperties.writes
+ BenchmarkingProperties.deletes;
......@@ -33,66 +40,117 @@ public class BenchmarkingClient {
readBracket = writeBracket + BenchmarkingProperties.reads / totalBracket;
deleteBracket = readBracket + BenchmarkingProperties.deletes / totalBracket;
// int temp = 5;
ExecutorService threadpool = Executors.newCachedThreadPool();
// ListeningExecutorService service = MoreExecutors.listeningDecorator(threadpool);
ArrayBlockingQueue<CompletableFuture<Void>> responses = new ArrayBlockingQueue<>((int)requestsPerClient);
while (requestsPerClient > 0) {
long startTime = System.nanoTime();
double toss = Math.random();
Stopwatch stopwatch = Stopwatch.createUnstarted();
if (toss < writeBracket) {
byte[] key = new byte[BenchmarkingProperties.keySize];
byte[] value = new byte[BenchmarkingProperties.valueSize];
new Random().nextBytes(key);
new Random().nextBytes(value);
stopwatch.start();
client.networkHandler.put(key, value);
stopwatch.stop();
collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 100);
// stopwatch.start();
long writeStartTime = System.nanoTime();
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
try {
client.networkHandler.put(key, value);
} catch (Exception e) {
e.printStackTrace();
}
}).whenComplete((r,t)->{
// collectExperimentStatistics((System.nanoTime() - writeStartTime)/1000, 100);
});
responses.add(cf);
// stopwatch.stop();
}
else if (toss < readBracket) {
byte[] key = new byte[BenchmarkingProperties.keySize];
new Random().nextBytes(key);
stopwatch.start();
client.networkHandler.get(key);
stopwatch.stop();
collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 101);
// stopwatch.start();
long readStartTime = System.nanoTime();
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
try {
client.networkHandler.get(key);
} catch (Exception e) {
e.printStackTrace();
}
}).whenComplete((r,t)->{
collectExperimentStatistics((System.nanoTime() - readStartTime)/1000, 101);
});
responses.add(cf);
// stopwatch.stop();
// collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 101);
}
else if(toss < deleteBracket){
byte[] key = new byte[BenchmarkingProperties.keySize];
new Random().nextBytes(key);
stopwatch.start();
client.networkHandler.delete(key);
stopwatch.stop();
collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 102);
// stopwatch.start();
long deleteStartTime = System.nanoTime();
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
try {
client.networkHandler.delete(key);
} catch (Exception e) {
e.printStackTrace();
}
}).whenComplete((r,t)->{
collectExperimentStatistics((System.nanoTime() - deleteStartTime)/1000, 102);
});
responses.add(cf);
// stopwatch.stop();
// collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 102);
}
if (requestsPerClient % 10000 == 0) {
logger.debug("Ten thousand requests completed " + requestsPerClient);
}
requestsPerClient -- ;
// Stopwatch stopwatch = Stopwatch.createUnstarted();
// stopwatch.start();
//testWait(sleepTimeMicros);
testWait(sleepTimeMicros);
long endTime = System.nanoTime();
BenchmarkingProperties.timePerRequest = BenchmarkingProperties.timePerRequest + ((endTime - startTime)/1000);
// stopwatch.stop();
// logger.debug("Microseconds waited: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
}
Iterator iteratorValues = responses.iterator();
long startTime = System.nanoTime();
while (iteratorValues.hasNext()) {
CompletableFuture<Void> res = (CompletableFuture<Void>) iteratorValues.next();
res.get(10, TimeUnit.SECONDS);
}
long endTime = System.nanoTime();
logger.debug("Average request latency: " + ((endTime - startTime)/1000)/(BenchmarkingProperties.requestsPerClient));
return 0;
}
public void collectExperimentStatistics(long time, int type){
public static void collectExperimentStatistics(long time, int type){
// logger.debug("Time: " + time);
if (type == 100) {
BenchmarkingProperties.writeCount ++;
BenchmarkingProperties.writeTime = BenchmarkingProperties.writeTime + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
else if (type == 101) {
BenchmarkingProperties.readCount ++;
BenchmarkingProperties.readTime = BenchmarkingProperties.readTime + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
else if(type == 102){
BenchmarkingProperties.deleteCount ++;
BenchmarkingProperties.deleteTime = BenchmarkingProperties.deleteTime + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
}
......@@ -108,9 +166,14 @@ public class BenchmarkingClient {
logger.debug("Average write time in microseconds: " + (BenchmarkingProperties.writeTime / BenchmarkingProperties.writeCount));
logger.debug("Average read time in microseconds: " + (BenchmarkingProperties.readTime / BenchmarkingProperties.readCount));
logger.debug("Average delete time in microseconds: " + (BenchmarkingProperties.deleteTime / BenchmarkingProperties.deleteCount));
logger.debug("Average Time/Request : " + BenchmarkingProperties.timePerRequest/(BenchmarkingProperties.writeCount + BenchmarkingProperties.readCount + BenchmarkingProperties.deleteCount));
long avgTimePerReq = BenchmarkingProperties.timePerRequest/(BenchmarkingProperties.writeCount + BenchmarkingProperties.readCount + BenchmarkingProperties.deleteCount);
logger.debug("Actual Requests/second : " + (1000000/avgTimePerReq));
}
public void testWait(long microseconds){
// logger.debug("Sleeping for " + microseconds + " microseconds.");
final long INTERVAL = microseconds * 1000;
long start = System.nanoTime();
long end=0;
......@@ -133,7 +196,7 @@ public class BenchmarkingClient {
BenchmarkingProperties.writes = Double.valueOf((String)benchmarkingClient.properties.get("writes"));
BenchmarkingProperties.deletes = Double.valueOf((String)benchmarkingClient.properties.get("deletes"));
BenchmarkingProperties.clients = Integer.valueOf((String)benchmarkingClient.properties.get("clients"));
BenchmarkingProperties.requestsPerClient = Double.valueOf((String)benchmarkingClient.properties.get("requests_per_client"));
BenchmarkingProperties.requestsPerClient = Long.valueOf((String)benchmarkingClient.properties.get("requests_per_client"));
BenchmarkingProperties.requestsPerSecond = Double.valueOf((String)benchmarkingClient.properties.get("requests_per_second"));
BenchmarkingProperties.keySize = Integer.valueOf((String)benchmarkingClient.properties.get("key_size"));
BenchmarkingProperties.valueSize = Integer.valueOf((String)benchmarkingClient.properties.get("value_size"));
......@@ -164,7 +227,8 @@ public class BenchmarkingClient {
future.get();
}
logger.debug("DONE...");
benchmarkingClient.printStatistics();
NetworkHandler networkHandler = new NetworkHandler();
networkHandler.printStatistics();
experimentExecutors.shutdown();
}
......@@ -175,7 +239,7 @@ class BenchmarkingProperties {
static Double writes;
static Double deletes;
static int clients;
static Double requestsPerClient;
static long requestsPerClient;
static Double requestsPerSecond;
static int keySize;
static int valueSize;
......@@ -188,4 +252,5 @@ class BenchmarkingProperties {
static long readTime = 0;
static long writeTime = 0 ;
static long deleteTime = 0;
static long timePerRequest = 0;
}
......@@ -44,6 +44,8 @@ public class NetworkHandler {
public int numberOfMasters;
public Cache<String, String> cache;
public NetworkHandler(){}
public NetworkHandler(int cacheSize, int numberOfMasters){
this.numberOfMasters = numberOfMasters;
this.cache = Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS).maximumSize(cacheSize).build();
......@@ -82,6 +84,7 @@ public class NetworkHandler {
{
logger.info("Getting From local cache");
// var value = this.cache.getIfPresent(key);
long startTime = System.nanoTime();
String value = this.cache.getIfPresent(new String(key));
if(value != null)
return value.getBytes(StandardCharsets.UTF_8);
......@@ -132,13 +135,15 @@ public class NetworkHandler {
return response.getValue();
}
collectExperimentStatistics((System.nanoTime() - startTime)/1000, 101);
return null;
}
public int put(byte[] key,byte[] value) throws Exception
{
logger.info("Putting KV pair on server ....");
long startTime = System.nanoTime();
Request request= new Request();
request.setRequestType(RequestType.PUT);
request.setKey(key);
......@@ -156,13 +161,15 @@ public class NetworkHandler {
if(response.getAck() == AckType.SUCCESS){
this.cache.put(new String(key), new String(value));
logger.info("Updating cache..");
}
}
collectExperimentStatistics((System.nanoTime() - startTime)/1000, 100);
return response.getAck();
}
public int delete(byte[] key) throws Exception
{
logger.info("Invalidating Local Cache and sending delete request to server");
long startTime = System.nanoTime();
this.cache.invalidate(new String(key));
Request request= new Request();
request.setRequestType(RequestType.DELETE);
......@@ -177,6 +184,7 @@ public class NetworkHandler {
DaRPCStream<Request, Response> stream = NetworkHandler.streams.get(ip);
stream.request(request, response, false).get();
collectExperimentStatistics((System.nanoTime() - startTime)/1000, 102);
return response.getAck();
}
// public Pair<byte[], byte[]> read(byte[] key) throws InterruptedException, ExecutionException, IOException{
......@@ -263,4 +271,63 @@ public class NetworkHandler {
}
}
public static void collectExperimentStatistics(long time, int type){
// logger.debug("Time: " + time);
if (type == 100) {
BenchmarkingProperties.writeCount ++;
BenchmarkingProperties.writeTime = BenchmarkingProperties.writeTime + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
else if (type == 101) {
BenchmarkingProperties.readCount ++;
BenchmarkingProperties.readTime = BenchmarkingProperties.readTime + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
else if(type == 102){
BenchmarkingProperties.deleteCount ++;
BenchmarkingProperties.deleteTime = BenchmarkingProperties.deleteTime + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
}
public void printStatistics(){
logger.debug("Total writes: " + BenchmarkingProperties.writeCount);
logger.debug("Total reads: " + BenchmarkingProperties.readCount);
logger.debug("Total deletes: " + BenchmarkingProperties.deleteCount);
logger.debug("Total write time: " + BenchmarkingProperties.writeTime);
logger.debug("Total read time: " + BenchmarkingProperties.readTime);
logger.debug("Total delete time: " + BenchmarkingProperties.deleteTime);
logger.debug("Average write time in microseconds: " + (BenchmarkingProperties.writeTime / BenchmarkingProperties.writeCount));
logger.debug("Average read time in microseconds: " + (BenchmarkingProperties.readTime / BenchmarkingProperties.readCount));
logger.debug("Average delete time in microseconds: " + (BenchmarkingProperties.deleteTime / BenchmarkingProperties.deleteCount));
// logger.debug("Average Time/Request : " + BenchmarkingProperties.timePerRequest/(BenchmarkingProperties.writeCount + BenchmarkingProperties.readCount + BenchmarkingProperties.deleteCount));
// long avgTimePerReq = BenchmarkingProperties.timePerRequest/(BenchmarkingProperties.writeCount + BenchmarkingProperties.readCount + BenchmarkingProperties.deleteCount);
// logger.debug("Actual Requests/second : " + (1000000/avgTimePerReq));
}
}
class BenchmarkingProperties {
static Double reads;
static Double writes;
static Double deletes;
static int clients;
static long requestsPerClient;
static Double requestsPerSecond;
static int keySize;
static int valueSize;
// Result statistics
static int readCount;
static int writeCount;
static int deleteCount;
static long readTime = 0;
static long writeTime = 0 ;
static long deleteTime = 0;
static long timePerRequest = 0;
}
\ No newline at end of file
......@@ -26,15 +26,32 @@ public class InvalidationServer
final static Logger logger = Logger.getLogger(InvalidationServer.class);
Cache<String, String> cache;
RdmaServerEndpoint<DaRPCServerEndpoint<InvalidationRequest, InvalidationResponse>> serverEp;
private long[] clusterAffinities;
public InvalidationServer(Cache<String, String> cache2,String invalidationServerIP, int invalidationServerPort, NetworkHandler networkHandler)throws Exception
{
this.cache = cache2;
long[] clusterAffinities = new long[1];
clusterAffinities[0] = 1<<1;
long[] clusterAffinities = new long[1];
clusterAffinities[0] = 1L << 1 ;
// clusterAffinities = new long[4];
// for (int i = 0; i < 4; i++) {
// clusterAffinities[i] = 1L << i ;
// }
DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null;
InvalidationService service = new InvalidationService(networkHandler);
group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, true, 2, 2, 2, 1);
group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, false, 16, 16, 16, 4);
// for (int i = 0; i < 4; i++)
// {
// long cpu = 1L << i;
// clusterAffinities[i] = cpu;
// System.out.println(cpu);
// };
// DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null;
// InvalidationService service = new InvalidationService(networkHandler);
// group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 0, false, 16, 16, 32, 4);
// group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, 1000, 0, false, 16, 16, 16, 4);
serverEp = group.createServerEndpoint();
InetSocketAddress address = new InetSocketAddress(invalidationServerIP, invalidationServerPort);
serverEp.bind(address, 2);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment