Commit f1c96ec5 authored by Pramod S's avatar Pramod S

added log4j

parent 24ff629b
package hpdos_rdma_offloaded;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
public class ExperimentStatistics {
final static Logger logger = Logger.getLogger(ExperimentStatistics.class);
public static void collectStatistics(long latency, String path){
if (path.equals("localWriteTime")) {
BenchmarkingProperties.localWriteTime = BenchmarkingProperties.localWriteTime + latency;
BenchmarkingProperties.localWriteTimes.add(latency);
} else if(path.equals("replicationTime")){
BenchmarkingProperties.replicationTime = BenchmarkingProperties.replicationTime + latency;
BenchmarkingProperties.replicationTimes.add(latency);
}else if(path.equals("invalidationTime")){
BenchmarkingProperties.invalidationTime = BenchmarkingProperties.invalidationTime + latency;
BenchmarkingProperties.invalidationTimes.add(latency);
}else if(path.equals("write")){
BenchmarkingProperties.writes ++;
}
// BenchmarkingProperties.writes ++;
}
public static void printStatistics(){
logger.debug("Average local write latency time: " + BenchmarkingProperties.localWriteTime / BenchmarkingProperties.writes);
logger.debug("Average replication latency time: " + BenchmarkingProperties.replicationTime / BenchmarkingProperties.writes);
logger.debug("Average invalidation latency time: " + BenchmarkingProperties.invalidationTime / BenchmarkingProperties.writes);
logger.debug("Total writes: " + BenchmarkingProperties.writes);
int lastTenPercentLocalWrites = BenchmarkingProperties.localWriteTimes.size() / 10;
int lastTenPercentReplicationWrites = BenchmarkingProperties.replicationTimes.size() / 10;
int lastTenPercentInvalidationWrites = BenchmarkingProperties.invalidationTimes.size() / 10;
List<Long> lastTwentyLocalWrites = BenchmarkingProperties.localWriteTimes.subList(BenchmarkingProperties.localWriteTimes.size()-lastTenPercentLocalWrites, BenchmarkingProperties.localWriteTimes.size());
List<Long> lastTwentyReplicationWrites = BenchmarkingProperties.replicationTimes.subList(BenchmarkingProperties.localWriteTimes.size()-lastTenPercentReplicationWrites, BenchmarkingProperties.replicationTimes.size());
List<Long> lastTwentyInvalidationWrites = BenchmarkingProperties.invalidationTimes.subList(BenchmarkingProperties.localWriteTimes.size()-lastTenPercentInvalidationWrites, BenchmarkingProperties.invalidationTimes.size());
long sumLastTenPercentLocalWrites = 0;
long sumLastTenPercentReplicationWrites = 0;
long sumLastTenPercentInvalidationWrites = 0;
for (Long number : lastTwentyLocalWrites) {
sumLastTenPercentLocalWrites = sumLastTenPercentLocalWrites + number;
}
for (Long number : lastTwentyReplicationWrites) {
sumLastTenPercentReplicationWrites = sumLastTenPercentReplicationWrites + number;
}
for (Long number : lastTwentyInvalidationWrites) {
sumLastTenPercentInvalidationWrites = sumLastTenPercentInvalidationWrites + number;
}
logger.debug("Last ten percent writes: " + lastTenPercentLocalWrites);
logger.debug("Average for last ten percent local write latency time: " + sumLastTenPercentLocalWrites / lastTenPercentLocalWrites);
logger.debug("Average for last ten percent replication latency time: " + sumLastTenPercentReplicationWrites / lastTenPercentReplicationWrites);
logger.debug("Average for last ten percent invalidation latency time: " + sumLastTenPercentInvalidationWrites / lastTenPercentInvalidationWrites);
}
}
class BenchmarkingProperties {
static long localWriteTime = 0;
static long replicationTime = 0;
static long invalidationTime = 0;
static long writes = 0;
static ArrayList<Long> localWriteTimes = new ArrayList<>();
static ArrayList<Long> replicationTimes = new ArrayList<>();
static ArrayList<Long> invalidationTimes= new ArrayList<>();
}
...@@ -16,10 +16,12 @@ import java.util.concurrent.Executors; ...@@ -16,10 +16,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import com.ibm.darpc.DaRPCClientEndpoint; import com.ibm.darpc.DaRPCClientEndpoint;
import com.ibm.darpc.DaRPCClientGroup; import com.ibm.darpc.DaRPCClientGroup;
import com.ibm.darpc.DaRPCFuture; import com.ibm.darpc.DaRPCFuture;
import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.DaRPCStream;
import com.ibm.disni.util.StopWatch;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
...@@ -87,20 +89,48 @@ public class NetworkHandler { ...@@ -87,20 +89,48 @@ public class NetworkHandler {
// Change the Futre to CompletableFuture to achieve correct asynchronousness // Change the Futre to CompletableFuture to achieve correct asynchronousness
public void create(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{ public void create(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{
ReplicationHandler replicationHandler = new ReplicationHandler();
logger.info("Received create request for key/value: " + new String(packet.getKey()) + "/" + new String(packet.getValue())); logger.info("Received create request for key/value: " + new String(packet.getKey()) + "/" + new String(packet.getValue()));
Stopwatch stopWatch = Stopwatch.createUnstarted();
stopWatch.start();
if (this.isMaster) if (this.isMaster)
{ {
logger.info("Starting replication"); // logger.info("Starting replication");
Future<Response> futureReplication = this.executorService.submit(()->{ // Future<Response> futureReplication = this.executorService.submit(()->{
Response replicationResponse = new Response(); // Response replicationResponse = new Response();
// Write code to replicate the data to other nics // // Write code to replicate the data to other nics
ReplicationHandler.replicateMetadata(packet); // ReplicationHandler.replicateMetadata(packet);
return replicationResponse; // return replicationResponse;
});
Response response = futureReplication.get(); // replicationHandler.replicateMetadata(packet);
logger.info("Replication complete"); Set<Callable<DaRPCFuture<Request, Response>>> callables = new HashSet<>();
for(Follower follower : NetworkHandler.followers){
callables.add(()->{
Request request = new Request();
Response response = new Response();
request.setKey(packet.getKey());
request.setValue(packet.getValue());
request.setRequestType(packet.getOperationType());
DaRPCFuture<Request, Response> future = follower.getStream().request(request, response, false);
return future;
});
}
List<Future<DaRPCFuture<Request, Response>>> futures = executorService.invokeAll(callables);
for (Future<DaRPCFuture<Request, Response>> future: futures) {
Response response = future.get().get();
} }
stopWatch.stop();
}
logger.info("Future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset();
// stopWatch.start();
// Response response = futureReplication.get();
// stopWatch.stop();
// logger.info("Replication time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
logger.info("Replication complete");
} }
public void read(Packet packet) throws RocksDBException,InterruptedException,ExecutionException{ public void read(Packet packet) throws RocksDBException,InterruptedException,ExecutionException{
...@@ -119,23 +149,36 @@ public class NetworkHandler { ...@@ -119,23 +149,36 @@ public class NetworkHandler {
} }
public void update(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{ public void update(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{
Stopwatch stopWatch = Stopwatch.createUnstarted();
ReplicationHandler replicationHandler = new ReplicationHandler();
stopWatch.start();
if (isMaster) { if (isMaster) {
Response response = new Response(); Response response = new Response();
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
// Write code to replicate the data to other nics // Write code to replicate the data to other nics
Response replicationResponse = new Response(); Response replicationResponse = new Response();
// Write code to replicate the data to other nics // Write code to replicate the data to other nics
ReplicationHandler.replicateMetadata(packet); replicationHandler.replicateMetadata(packet);
return replicationResponse; return replicationResponse;
}); });
stopWatch.stop();
Future<Boolean> futureInvalidation = this.executorService.submit(()->{ logger.info("Future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
// sendInvalidationRequest(packet); stopWatch.reset();
return false; // Future<Boolean> futureInvalidation = this.executorService.submit(()->{
}); // sendInvalidationRequest(packet);
response = futureReplication.get(); // return false;
futureInvalidation.get(); // });
logger.info("Replicating complete Ack "+ response.getAck());
stopWatch.start();
response = futureReplication.get();
stopWatch.stop();
logger.info("Replication time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset();
// stopWatch.start();
// futureInvalidation.get();
// stopWatch.stop();
// logger.info("Invalidation time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
logger.info("Replicating complete Ack "+ response.getAck());
} }
} }
...@@ -143,12 +186,13 @@ public class NetworkHandler { ...@@ -143,12 +186,13 @@ public class NetworkHandler {
// To implement delete // To implement delete
public void delete(Packet packet) throws RocksDBException,InterruptedException,ExecutionException public void delete(Packet packet) throws RocksDBException,InterruptedException,ExecutionException
{ {
ReplicationHandler replicationHandler = new ReplicationHandler();
if (this.isMaster) { if (this.isMaster) {
logger.info("Starting replication"); logger.info("Starting replication");
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
Response replicationResponse = new Response(); Response replicationResponse = new Response();
// Write code to replicate the data to other nics // Write code to replicate the data to other nics
ReplicationHandler.replicateMetadata(packet); replicationHandler.replicateMetadata(packet);
return replicationResponse; return replicationResponse;
}); });
Response response = futureReplication.get(); Response response = futureReplication.get();
...@@ -180,7 +224,7 @@ public class NetworkHandler { ...@@ -180,7 +224,7 @@ public class NetworkHandler {
return requestFutures; return requestFutures;
*/ */
// Pramod method // Pramod method
ExecutorService executorService = Executors.newFixedThreadPool(10); ExecutorService executorService = Executors.newWorkStealingPool();
logger.info("Sending Invalidation Request"); logger.info("Sending Invalidation Request");
InvalidationRequest invalidationRequest = new InvalidationRequest(); InvalidationRequest invalidationRequest = new InvalidationRequest();
invalidationRequest.setKey(request.getKey()); invalidationRequest.setKey(request.getKey());
......
...@@ -9,8 +9,13 @@ import java.util.concurrent.ExecutionException; ...@@ -9,8 +9,13 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import com.ibm.darpc.DaRPCFuture; import com.ibm.darpc.DaRPCFuture;
import com.ibm.disni.util.StopWatch;
import org.apache.log4j.Logger;
import hpdos_rdma_offloaded.lib.Follower; import hpdos_rdma_offloaded.lib.Follower;
import hpdos_rdma_offloaded.lib.Packet; import hpdos_rdma_offloaded.lib.Packet;
...@@ -19,6 +24,7 @@ import hpdos_rdma_offloaded.protocol.*; ...@@ -19,6 +24,7 @@ import hpdos_rdma_offloaded.protocol.*;
public class ReplicationHandler { public class ReplicationHandler {
public static List<Follower> followers = new ArrayList<>(); public static List<Follower> followers = new ArrayList<>();
final static Logger logger = Logger.getLogger(ReplicationHandler.class);
public static ExecutorService executorService = Executors.newFixedThreadPool(10); public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public ReplicationHandler(){} public ReplicationHandler(){}
...@@ -26,7 +32,9 @@ public class ReplicationHandler { ...@@ -26,7 +32,9 @@ public class ReplicationHandler {
followers.add(follower); followers.add(follower);
} }
public static void replicateMetadata(Packet packet) throws InterruptedException, ExecutionException{ public void replicateMetadata(Packet packet) throws InterruptedException, ExecutionException{
Stopwatch stopWatch = Stopwatch.createUnstarted();
stopWatch.start();
Set<Callable<DaRPCFuture<Request, Response>>> callables = new HashSet<>(); Set<Callable<DaRPCFuture<Request, Response>>> callables = new HashSet<>();
for(Follower follower : NetworkHandler.followers){ for(Follower follower : NetworkHandler.followers){
callables.add(()->{ callables.add(()->{
...@@ -40,10 +48,13 @@ public class ReplicationHandler { ...@@ -40,10 +48,13 @@ public class ReplicationHandler {
}); });
} }
List<Future<DaRPCFuture<Request, Response>>> futures = executorService.invokeAll(callables); List<Future<DaRPCFuture<Request, Response>>> futures = executorService.invokeAll(callables);
for (Future<DaRPCFuture<Request, Response>> future: futures) { for (Future<DaRPCFuture<Request, Response>> future: futures) {
Response response = future.get().get(); Response response = future.get().get();
} }
stopWatch.stop();
logger.info("Replication future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset();
} }
} }
...@@ -5,7 +5,9 @@ import java.net.InetSocketAddress; ...@@ -5,7 +5,9 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import com.ibm.darpc.DaRPCClientEndpoint; import com.ibm.darpc.DaRPCClientEndpoint;
import com.ibm.darpc.DaRPCClientGroup; import com.ibm.darpc.DaRPCClientGroup;
import com.ibm.darpc.DaRPCServerEndpoint; import com.ibm.darpc.DaRPCServerEndpoint;
...@@ -16,6 +18,7 @@ import com.ibm.darpc.DaRPCStream; ...@@ -16,6 +18,7 @@ import com.ibm.darpc.DaRPCStream;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.ExperimentStatistics;
import hpdos_rdma_offloaded.handler.NetworkHandler; import hpdos_rdma_offloaded.handler.NetworkHandler;
import hpdos_rdma_offloaded.handler.StorageHandler; import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.lib.Packet; import hpdos_rdma_offloaded.lib.Packet;
...@@ -53,13 +56,39 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -53,13 +56,39 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
Response response = event.getSendMessage(); Response response = event.getSendMessage();
Packet packet = new Packet(request.getRequestType(), request.getKey(), request.getValue()); Packet packet = new Packet(request.getRequestType(), request.getKey(), request.getValue());
logger.info("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey())); logger.info("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey()));
Stopwatch stopwatch = Stopwatch.createUnstarted();
long startTime, endTime, timeTaken;
if(request.getValue() != null) if(request.getValue() != null)
logger.info(" value : "+ new String(request.getValue())); logger.info(" value : "+ new String(request.getValue()));
try{ try{
if (request.getRequestType() == RequestType.PUT) { if (request.getRequestType() == RequestType.PUT) {
startTime = System.nanoTime();
this.storageHandler.create(request); this.storageHandler.create(request);
endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000;
ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime");
logger.info("Time to write to local cache: " + timeTaken);
// stopwatch.reset();
// stopwatch.start();
startTime = System.nanoTime();
this.networkHandler.create(packet); this.networkHandler.create(packet);
endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000;
logger.info("Time to replicate: " + timeTaken );
ExperimentStatistics.collectStatistics(timeTaken, "replicationTime");
// stopwatch.stop();
// logger.info("Time to replicate: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
// stopwatch.reset();
startTime = System.nanoTime();
this.networkHandler.sendInvalidateRequest(request); this.networkHandler.sendInvalidateRequest(request);
endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000;
logger.info("Time to invalidate: " + timeTaken);
ExperimentStatistics.collectStatistics(timeTaken, "invalidationTime");
ExperimentStatistics.collectStatistics(0, "write");
// stopwatch.reset();
// logger.info("Time to invalidate: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
// stopwatch.reset();
//Yet to consume isDone //Yet to consume isDone
// var isDone = this.networkHandler.consumeReplicationRequestFutures(replicationRequestFutures); // var isDone = this.networkHandler.consumeReplicationRequestFutures(replicationRequestFutures);
...@@ -68,7 +97,11 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -68,7 +97,11 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
response.setValue(null); response.setValue(null);
} }
else if(request.getRequestType() == RequestType.GET) { else if(request.getRequestType() == RequestType.GET) {
stopwatch.start();
byte[] value = this.storageHandler.read(request); byte[] value = this.storageHandler.read(request);
stopwatch.stop();
logger.info("Time to read: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset();
if(value == null) if(value == null)
{ {
response.setAck(AckType.NOTFOUND); response.setAck(AckType.NOTFOUND);
...@@ -80,9 +113,21 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -80,9 +113,21 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
} }
} }
else if(request.getRequestType() == RequestType.DELETE){ else if(request.getRequestType() == RequestType.DELETE){
stopwatch.start();
this.storageHandler.delete(request); this.storageHandler.delete(request);
stopwatch.stop();
logger.info("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset();
stopwatch.start();
this.networkHandler.delete(packet); this.networkHandler.delete(packet);
stopwatch.stop();
logger.info("Time to delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset();
stopwatch.start();
this.networkHandler.sendInvalidateRequest(request); this.networkHandler.sendInvalidateRequest(request);
stopwatch.stop();
logger.info("Time to invalidate delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset();
// var replicationRequestFutures = this.networkHandler.sendReplicateRequest(request); // var replicationRequestFutures = this.networkHandler.sendReplicateRequest(request);
//Yet to consume isDone //Yet to consume isDone
// var isDone = this.networkHandler.consumeReplicationRequestFutures(replicationRequestFutures); // var isDone = this.networkHandler.consumeReplicationRequestFutures(replicationRequestFutures);
...@@ -135,6 +180,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -135,6 +180,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
logger.debug("Closing Connection for ip address: " + ipAddress ); logger.debug("Closing Connection for ip address: " + ipAddress );
NetworkHandler.invalidationStreams.remove(ipAddress); NetworkHandler.invalidationStreams.remove(ipAddress);
ExperimentStatistics.printStatistics();
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment