Commit dd2b8b74 authored by Pramod S's avatar Pramod S

Merge branch 'rdma_metadata_server' of https://git.cse.iitb.ac.in/synerg/hpdos...

Merge branch 'rdma_metadata_server' of https://git.cse.iitb.ac.in/synerg/hpdos into rdma_metadata_server
parents 09da7005 02542bc9
app.HOST=192.168.200.20 app.HOST=192.168.200.20
app.MASTER=true app.MASTER=true
app.NFOLLOWER=1 app.NFOLLOWER=1
app.FOLLOWER1=192.168.200.210 app.FOLLOWER1=192.168.200.40
app.FOLLOWER2= app.FOLLOWER2=
app.MASTER_IP=NA app.MASTER_IP=NA
#this is how to pair threads to cpu cores #this is how to pair threads to cpu cores
......
...@@ -41,7 +41,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -41,7 +41,7 @@ public class NetworkHandlerM implements Runnable{
try try
{ {
DaRPCFutureM<InvalidationRequest,InvalidationResponse> future = stream.processStream(); DaRPCFutureM<InvalidationRequest,InvalidationResponse> future = stream.processStream();
if(future.metadataResponse.invalidationTotal == future.metadataResponse.invalidationDone.incrementAndGet()) if(future!=null && future.metadataResponse.invalidationTotal == future.metadataResponse.invalidationDone.incrementAndGet())
{ {
if(Response.DONE == future.metadataResponse.state.incrementAndGet()) if(Response.DONE == future.metadataResponse.state.incrementAndGet())
future.metadataResponse.event.triggerResponse(); future.metadataResponse.event.triggerResponse();
...@@ -57,7 +57,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -57,7 +57,7 @@ public class NetworkHandlerM implements Runnable{
try try
{ {
DaRPCFutureM<Request,Response> future = stream.processStream(); DaRPCFutureM<Request,Response> future = stream.processStream();
if(future.metadataResponse.replicatonTotal == future.metadataResponse.replicationDone.incrementAndGet()) if(future!= null && future.metadataResponse.replicatonTotal == future.metadataResponse.replicationDone.incrementAndGet())
{ {
if(Response.DONE == future.metadataResponse.state.incrementAndGet()) if(Response.DONE == future.metadataResponse.state.incrementAndGet())
future.metadataResponse.event.triggerResponse(); future.metadataResponse.event.triggerResponse();
......
package hpdos_rdma_offloaded.service; package hpdos_rdma_offloaded.service;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
......
...@@ -55,14 +55,14 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -55,14 +55,14 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
logger.info(" value : "+ new String(request.getValue())); logger.info(" value : "+ new String(request.getValue()));
try{ try{
if (request.getRequestType() == RequestType.PUT) { if (request.getRequestType() == RequestType.PUT) {
response.event = event;
this.networkHandler.sendInvalidations(request, response);
this.networkHandler.sendReplications(request, response);
startTime = System.nanoTime(); startTime = System.nanoTime();
this.storageHandler.create(request); this.storageHandler.create(request);
endTime = System.nanoTime(); endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000; timeTaken = (endTime-startTime) / 1000;
response.state.getAndIncrement();
response.event = event;
this.networkHandler.sendInvalidations(request, response);
this.networkHandler.sendReplications(request, response);
// ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime"); // ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime");
logger.info("Time to write to local cache: " + timeTaken); logger.info("Time to write to local cache: " + timeTaken);
...@@ -88,15 +88,17 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -88,15 +88,17 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
event.triggerResponse(); event.triggerResponse();
} }
else if(request.getRequestType() == RequestType.DELETE){ else if(request.getRequestType() == RequestType.DELETE){
this.networkHandler.sendInvalidations(request, response);
this.networkHandler.sendReplications(request, response);
stopwatch.start(); stopwatch.start();
this.storageHandler.delete(request); this.storageHandler.delete(request);
stopwatch.stop(); stopwatch.stop();
response.state.getAndIncrement();
logger.info("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.info("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
response.event = event; response.event = event;
this.networkHandler.sendInvalidations(request, response);
this.networkHandler.sendReplications(request, response);
response.setAck(AckType.SUCCESS); response.setAck(AckType.SUCCESS);
} }
else{ else{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment