Commit 2b6c1a94 authored by Pramod S's avatar Pramod S

update api

parent 58600282
...@@ -19,9 +19,9 @@ import hpdos_rdma_offloaded.handler.StorageHandler; ...@@ -19,9 +19,9 @@ import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.protocol.Request; import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.Response; import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol; import hpdos_rdma_offloaded.protocol.RpcProtocol;
import hpdos_rdma_offloaded.rdma.DaRPCClientGroupM;
import hpdos_rdma_offloaded.service.FollowerMetadataService; import hpdos_rdma_offloaded.service.FollowerMetadataService;
import hpdos_rdma_offloaded.service.MetadataServiceMaster; import hpdos_rdma_offloaded.service.MetadataServiceMaster;
import rdma.DaRPCClientGroupM;
public class MetadataServer{ public class MetadataServer{
......
...@@ -89,12 +89,12 @@ public class NetworkHandler { ...@@ -89,12 +89,12 @@ public class NetworkHandler {
// Change the Futre to CompletableFuture to achieve correct asynchronousness // Change the Futre to CompletableFuture to achieve correct asynchronousness
public void create(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{ public void create(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{
ReplicationHandler replicationHandler = new ReplicationHandler(); ReplicationHandler replicationHandler = new ReplicationHandler();
logger.info("Received create request for key/value: " + new String(packet.getKey()) + "/" + new String(packet.getValue())); logger.debug("Received create request for key/value: " + new String(packet.getKey()) + "/" + new String(packet.getValue()));
Stopwatch stopWatch = Stopwatch.createUnstarted(); Stopwatch stopWatch = Stopwatch.createUnstarted();
stopWatch.start(); stopWatch.start();
if (this.isMaster) if (this.isMaster)
{ {
// logger.info("Starting replication"); // logger.debug("Starting replication");
// Future<Response> futureReplication = this.executorService.submit(()->{ // Future<Response> futureReplication = this.executorService.submit(()->{
// Response replicationResponse = new Response(); // Response replicationResponse = new Response();
// // Write code to replicate the data to other nics // // Write code to replicate the data to other nics
...@@ -122,21 +122,20 @@ public class NetworkHandler { ...@@ -122,21 +122,20 @@ public class NetworkHandler {
stopWatch.stop(); stopWatch.stop();
} }
logger.info("Future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset(); stopWatch.reset();
// stopWatch.start(); // stopWatch.start();
// Response response = futureReplication.get(); // Response response = futureReplication.get();
// stopWatch.stop(); // stopWatch.stop();
// logger.info("Replication time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS)); // logger.debug("Replication time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
logger.info("Replication complete"); logger.debug("Replication complete");
} }
public void read(Packet packet) throws RocksDBException,InterruptedException,ExecutionException{ public void read(Packet packet) throws RocksDBException,InterruptedException,ExecutionException{
logger.info("Received read request for key/value: " + new String(packet.getKey())); logger.debug("Received read request for key/value: " + new String(packet.getKey()));
if (this.isMaster) if (this.isMaster)
{ {
// System.out.println("Starting replication");
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
Response replicationResponse = new Response(); Response replicationResponse = new Response();
// Write code to replicate the data to other nics // Write code to replicate the data to other nics
...@@ -161,7 +160,7 @@ public class NetworkHandler { ...@@ -161,7 +160,7 @@ public class NetworkHandler {
return replicationResponse; return replicationResponse;
}); });
stopWatch.stop(); stopWatch.stop();
logger.info("Future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset(); stopWatch.reset();
// Future<Boolean> futureInvalidation = this.executorService.submit(()->{ // Future<Boolean> futureInvalidation = this.executorService.submit(()->{
// sendInvalidationRequest(packet); // sendInvalidationRequest(packet);
...@@ -171,13 +170,13 @@ public class NetworkHandler { ...@@ -171,13 +170,13 @@ public class NetworkHandler {
stopWatch.start(); stopWatch.start();
response = futureReplication.get(); response = futureReplication.get();
stopWatch.stop(); stopWatch.stop();
logger.info("Replication time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Replication time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset(); stopWatch.reset();
// stopWatch.start(); // stopWatch.start();
// futureInvalidation.get(); // futureInvalidation.get();
// stopWatch.stop(); // stopWatch.stop();
// logger.info("Invalidation time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS)); // logger.debug("Invalidation time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
logger.info("Replicating complete Ack "+ response.getAck()); logger.debug("Replicating complete Ack "+ response.getAck());
} }
} }
...@@ -187,7 +186,7 @@ public class NetworkHandler { ...@@ -187,7 +186,7 @@ public class NetworkHandler {
{ {
ReplicationHandler replicationHandler = new ReplicationHandler(); ReplicationHandler replicationHandler = new ReplicationHandler();
if (this.isMaster) { if (this.isMaster) {
logger.info("Starting replication"); logger.debug("Starting replication");
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
Response replicationResponse = new Response(); Response replicationResponse = new Response();
// Write code to replicate the data to other nics // Write code to replicate the data to other nics
...@@ -195,14 +194,12 @@ public class NetworkHandler { ...@@ -195,14 +194,12 @@ public class NetworkHandler {
return replicationResponse; return replicationResponse;
}); });
Response response = futureReplication.get(); Response response = futureReplication.get();
logger.info("Replicating complete "+ response.getAck()); logger.debug("Replicating complete "+ response.getAck());
} }
} }
//Paras method //Paras method
/* public ArrayList<DaRPCFuture<InvalidationRequest, InvalidationResponse>> sendInvalidateRequest(Request request){ /* public ArrayList<DaRPCFuture<InvalidationRequest, InvalidationResponse>> sendInvalidateRequest(Request request){
System.out.println("Sending Invalidation Request");
InvalidationRequest invalidationRequest = new InvalidationRequest(); InvalidationRequest invalidationRequest = new InvalidationRequest();
invalidationRequest.setKey(request.getKey()); invalidationRequest.setKey(request.getKey());
ArrayList<DaRPCFuture<InvalidationRequest,InvalidationResponse>> requestFutures; ArrayList<DaRPCFuture<InvalidationRequest,InvalidationResponse>> requestFutures;
...@@ -225,7 +222,7 @@ public class NetworkHandler { ...@@ -225,7 +222,7 @@ public class NetworkHandler {
/* /*
public void sendInvalidateRequest(Request request) throws InterruptedException, ExecutionException{ public void sendInvalidateRequest(Request request) throws InterruptedException, ExecutionException{
ExecutorService executorService = Executors.newWorkStealingPool(); ExecutorService executorService = Executors.newWorkStealingPool();
logger.info("Sending Invalidation Request"); logger.debug("Sending Invalidation Request");
InvalidationRequest invalidationRequest = new InvalidationRequest(); InvalidationRequest invalidationRequest = new InvalidationRequest();
invalidationRequest.setKey(request.getKey()); invalidationRequest.setKey(request.getKey());
Set<Callable<DaRPCFuture<InvalidationRequest, InvalidationResponse>>> callables = new HashSet<>(); Set<Callable<DaRPCFuture<InvalidationRequest, InvalidationResponse>>> callables = new HashSet<>();
......
package hpdos_rdma_offloaded.handler; package hpdos_rdma_offloaded.handler;
import java.io.IOException; import java.io.IOException;
import org.apache.log4j.Logger;
import org.checkerframework.checker.units.qual.degrees;
import java.lang.System.Logger.Level;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import com.ibm.disni.util.NativeAffinity; import com.ibm.disni.util.NativeAffinity;
...@@ -8,15 +12,15 @@ import com.ibm.disni.util.NativeAffinity; ...@@ -8,15 +12,15 @@ import com.ibm.disni.util.NativeAffinity;
import hpdos_rdma_offloaded.protocol.InvalidationRequest; import hpdos_rdma_offloaded.protocol.InvalidationRequest;
import hpdos_rdma_offloaded.protocol.InvalidationResponse; import hpdos_rdma_offloaded.protocol.InvalidationResponse;
import hpdos_rdma_offloaded.protocol.Request; import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.RequestType;
import hpdos_rdma_offloaded.protocol.Response; import hpdos_rdma_offloaded.protocol.Response;
import rdma.DaRPCFutureM; import hpdos_rdma_offloaded.rdma.DaRPCFutureM;
import rdma.DaRPCStreamM; import hpdos_rdma_offloaded.rdma.DaRPCStreamM;
public class NetworkHandlerM implements Runnable{ public class NetworkHandlerM implements Runnable{
private CopyOnWriteArrayList<DaRPCStreamM<InvalidationRequest, InvalidationResponse>> invalidationStreams; private CopyOnWriteArrayList<DaRPCStreamM<InvalidationRequest, InvalidationResponse>> invalidationStreams;
private CopyOnWriteArrayList<DaRPCStreamM<Request, Response>> replicationStreams; private CopyOnWriteArrayList<DaRPCStreamM<Request, Response>> replicationStreams;
final static Logger logger = Logger.getLogger(NetworkHandlerM.class);
public NetworkHandlerM() public NetworkHandlerM()
{ {
...@@ -30,50 +34,135 @@ public class NetworkHandlerM implements Runnable{ ...@@ -30,50 +34,135 @@ public class NetworkHandlerM implements Runnable{
{ {
this.replicationStreams.add(stream); this.replicationStreams.add(stream);
} }
void processCloseInvalidationStream(DaRPCStreamM<InvalidationRequest,InvalidationResponse> stream)
public void run()
{ {
NativeAffinity.setAffinity(1<<1); stream.completedList.forEach(future ->
while(true)
{ {
for(var stream : invalidationStreams) if(future.metadataResponse.invalidationTotal == future.metadataResponse.invalidationDone.incrementAndGet())
if(Response.DONE == future.metadataResponse.state.incrementAndGet())
{
try
{
future.metadataResponse.event.triggerResponse();
}catch(IOException ex)
{
logger.error( ex);
}
}
});
stream.endpoint.pendingFutures.forEach((ticket,future)->{
if(future.metadataResponse.invalidationTotal == future.metadataResponse.invalidationDone.incrementAndGet())
if(Response.DONE == future.metadataResponse.state.incrementAndGet())
{
try
{
future.metadataResponse.event.triggerResponse();
}catch(IOException ex)
{
logger.error( ex);
}
}
});
}
void processInvalidationStream(DaRPCStreamM<InvalidationRequest,InvalidationResponse> stream)
{ {
try try
{ {
DaRPCFutureM<InvalidationRequest,InvalidationResponse> future = stream.processStream(); DaRPCFutureM<InvalidationRequest,InvalidationResponse> future = stream.processStream();
if(future!=null && future.metadataResponse.invalidationTotal == future.metadataResponse.invalidationDone.incrementAndGet()) if(future!=null)
{ {
//logger.debug("future"+future.metadataResponse.invalidationTotal+" "+future.metadataResponse.invalidationDone);
//logger.debug(future.metadataResponse.state);
if(future.metadataResponse.invalidationTotal == future.metadataResponse.invalidationDone.incrementAndGet())
if(Response.DONE == future.metadataResponse.state.incrementAndGet()) if(Response.DONE == future.metadataResponse.state.incrementAndGet())
future.metadataResponse.event.triggerResponse(); future.metadataResponse.event.triggerResponse();
} }
}catch(Exception exx) }catch(Exception exx)
{ {
exx.printStackTrace(); exx.printStackTrace();
} }
} }
for(var stream : replicationStreams) void processCloseReplicationStream(DaRPCStreamM<Request,Response> stream)
{
stream.completedList.forEach(future ->
{
if(future.metadataResponse.replicationDone.incrementAndGet() == future.metadataResponse.replicatonTotal)
if(Response.DONE == future.metadataResponse.state.incrementAndGet())
{
try
{
future.metadataResponse.event.triggerResponse();
}catch(IOException ex)
{
logger.error( ex);
}
}
});
stream.endpoint.pendingFutures.forEach((ticket,future)->{
if(future.metadataResponse.replicationDone.incrementAndGet() == future.metadataResponse.replicatonTotal)
if(Response.DONE == future.metadataResponse.state.incrementAndGet())
{
try
{
future.metadataResponse.event.triggerResponse();
}catch(IOException ex)
{
logger.error( ex);
}
}
});
}
void processReplicationStream(DaRPCStreamM<Request,Response> stream)
{ {
try try
{ {
DaRPCFutureM<Request,Response> future = stream.processStream(); DaRPCFutureM<Request,Response> future = stream.processStream();
if(future!= null && future.metadataResponse.replicatonTotal == future.metadataResponse.replicationDone.incrementAndGet()) if(future!=null)
{ {
if(future.metadataResponse.replicationDone.incrementAndGet() == future.metadataResponse.replicatonTotal)
if(Response.DONE == future.metadataResponse.state.incrementAndGet()) if(Response.DONE == future.metadataResponse.state.incrementAndGet())
future.metadataResponse.event.triggerResponse(); future.metadataResponse.event.triggerResponse();
} }
}catch(Exception exx) }catch(Exception exx)
{ {
exx.printStackTrace(); exx.printStackTrace();
} }
} }
public void run()
{
NativeAffinity.setAffinity(1<<1);
while(true)
{
invalidationStreams.forEach(stream ->
{
if( stream.endpoint.isClosed())
{
processCloseInvalidationStream(stream);
invalidationStreams.remove(stream);
}
else
{
processInvalidationStream(stream);
}
});
replicationStreams.forEach(stream ->
{
if( ! stream.endpoint.isClosed())
{
processCloseReplicationStream(stream);
replicationStreams.remove(stream);
}
else
{
processReplicationStream(stream);
}
});
} }
} }
public void sendInvalidations(Request request,Response response) public void sendInvalidations(Request request,Response response)
{ {
InvalidationRequest inRequest = new InvalidationRequest(); InvalidationRequest inRequest = new InvalidationRequest();
//logger.debug("invdlidation send "+invalidationStreams.size());
inRequest.setKey( request.getKey()); inRequest.setKey( request.getKey());
response.invalidationTotal = invalidationStreams.size(); response.invalidationTotal = invalidationStreams.size();
...@@ -81,19 +170,25 @@ public class NetworkHandlerM implements Runnable{ ...@@ -81,19 +170,25 @@ public class NetworkHandlerM implements Runnable{
{ {
try try
{ {
stream.request(inRequest, new InvalidationResponse(), response, false); stream.request(inRequest, new InvalidationResponse(), response, true);
} }
catch(IOException ex) catch(IOException ex)
{ {
System.out.println(ex); logger.error(ex);
} }
} }
if(invalidationStreams.size() == 0)
{
response.state.incrementAndGet();
}
} }
public void sendReplications(Request request,Response response) public void sendReplications(Request request,Response response)
{ {
Request repRequest = new Request(); logger.debug("replication send "+replicationStreams.size());
/*Request repRequest = new Request();
repRequest.setRequestType(request.getRequestType()); repRequest.setRequestType(request.getRequestType());
repRequest.setKey(request.getKey()); repRequest.setKey(request.getKey());
...@@ -102,18 +197,24 @@ public class NetworkHandlerM implements Runnable{ ...@@ -102,18 +197,24 @@ public class NetworkHandlerM implements Runnable{
repRequest.setValue(request.getValue()); repRequest.setValue(request.getValue());
} }
*/
response.replicatonTotal = replicationStreams.size(); response.replicatonTotal = replicationStreams.size();
for(var stream : replicationStreams) for(var stream : replicationStreams)
{ {
try try
{ {
stream.request(repRequest, new Response(), response, false); stream.request(request, new Response(), response, true);
} }
catch(IOException ex) catch(IOException ex)
{ {
System.out.println(ex); logger.error(ex);
}
} }
if(replicationStreams.size() == 0)
{
response.state.incrementAndGet();
} }
} }
} }
...@@ -53,7 +53,7 @@ public class ReplicationHandler { ...@@ -53,7 +53,7 @@ public class ReplicationHandler {
Response response = future.get().get(); Response response = future.get().get();
} }
stopWatch.stop(); stopWatch.stop();
logger.info("Replication future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Replication future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset();*/ stopWatch.reset();*/
} }
......
...@@ -15,24 +15,23 @@ public class StorageHandler implements AutoCloseable{ ...@@ -15,24 +15,23 @@ public class StorageHandler implements AutoCloseable{
final static Logger logger = Logger.getLogger(StorageHandler.class); final static Logger logger = Logger.getLogger(StorageHandler.class);
public StorageHandler(String dbpath) throws RocksDBException public StorageHandler(String dbpath) throws RocksDBException
{ {
logger.debug("Creating RocksDB"); logger.warn("Creating RocksDB");
this.rockDbOptions = new Options(); this.rockDbOptions = new Options();
rockDbOptions.setCreateIfMissing(true); rockDbOptions.setCreateIfMissing(true);
this.rocksDB = RocksDB.open(rockDbOptions,dbpath); this.rocksDB = RocksDB.open(rockDbOptions,dbpath);
logger.debug("Created RocksDB"); logger.warn("Created RocksDB");
} }
public void close() public void close()
{ {
rocksDB.close(); rocksDB.close();
rockDbOptions.close(); rockDbOptions.close();
logger.debug("Closing RocksDB instance"); logger.warn("Closing RocksDB instance");
} }
public void create(byte[] key,byte[] value) throws RocksDBException{ public void create(byte[] key,byte[] value) throws RocksDBException{
rocksDB.put(key,value); rocksDB.put(key,value);
} }
public void create(Request request) throws RocksDBException { public void create(Request request) throws RocksDBException {
logger.debug("Writing to local");
rocksDB.put(request.getKey(),request.getValue()); rocksDB.put(request.getKey(),request.getValue());
} }
public byte[] read(byte[] key)throws RocksDBException public byte[] read(byte[] key)throws RocksDBException
......
...@@ -16,7 +16,6 @@ public class Request implements DaRPCMessage ...@@ -16,7 +16,6 @@ public class Request implements DaRPCMessage
@Override @Override
public int write(ByteBuffer buffer) throws IOException public int write(ByteBuffer buffer) throws IOException
{ {
// System.out.println("Request Write Method");
buffer.putInt(requestType); buffer.putInt(requestType);
if(key == null) if(key == null)
buffer.put(key); buffer.put(key);
...@@ -34,7 +33,6 @@ public class Request implements DaRPCMessage ...@@ -34,7 +33,6 @@ public class Request implements DaRPCMessage
@Override @Override
public void update(ByteBuffer buffer) throws IOException public void update(ByteBuffer buffer) throws IOException
{ {
// System.out.println("Request update method");
requestType = buffer.getInt(); requestType = buffer.getInt();
if(key == null || key.length != 128) if(key == null || key.length != 128)
this.key = new byte[128]; this.key = new byte[128];
......
...@@ -49,7 +49,6 @@ public class Response implements DaRPCMessage{ ...@@ -49,7 +49,6 @@ public class Response implements DaRPCMessage{
@Override @Override
public int write(ByteBuffer buffer) throws IOException public int write(ByteBuffer buffer) throws IOException
{ {
// System.out.println("Response write Method");
buffer.putInt(ack); buffer.putInt(ack);
if(ack == AckType.SUCCESS_WITH_VALUE) if(ack == AckType.SUCCESS_WITH_VALUE)
{ {
...@@ -62,7 +61,6 @@ public class Response implements DaRPCMessage{ ...@@ -62,7 +61,6 @@ public class Response implements DaRPCMessage{
@Override @Override
public void update(ByteBuffer buffer) throws IOException public void update(ByteBuffer buffer) throws IOException
{ {
// System.out.println("esponse update method");
ack = buffer.getInt(); ack = buffer.getInt();
if(ack == AckType.SUCCESS_WITH_VALUE) if(ack == AckType.SUCCESS_WITH_VALUE)
{ {
......
package rdma; package hpdos_rdma_offloaded.rdma;
/* /*
* DaRPC: Data Center Remote Procedure Call * DaRPC: Data Center Remote Procedure Call
* *
...@@ -39,7 +39,7 @@ import com.ibm.darpc.DaRPCEndpointGroup; ...@@ -39,7 +39,7 @@ import com.ibm.darpc.DaRPCEndpointGroup;
public class DaRPCClientEndpointM<R extends DaRPCMessage, T extends DaRPCMessage> extends DaRPCEndpoint<R,T> { public class DaRPCClientEndpointM<R extends DaRPCMessage, T extends DaRPCMessage> extends DaRPCEndpoint<R,T> {
private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc");
private ConcurrentHashMap<Integer, DaRPCFutureM<R,T>> pendingFutures; public ConcurrentHashMap<Integer, DaRPCFutureM<R,T>> pendingFutures;
private AtomicInteger ticketCount; private AtomicInteger ticketCount;
private int streamCount; private int streamCount;
private IbvWC[] wcList; private IbvWC[] wcList;
...@@ -86,7 +86,7 @@ public class DaRPCClientEndpointM<R extends DaRPCMessage, T extends DaRPCMessage ...@@ -86,7 +86,7 @@ public class DaRPCClientEndpointM<R extends DaRPCMessage, T extends DaRPCMessage
public void dispatchReceive(ByteBuffer recvBuffer, int ticket, int recvIndex) throws IOException { public void dispatchReceive(ByteBuffer recvBuffer, int ticket, int recvIndex) throws IOException {
DaRPCFutureM<R, T> future = pendingFutures.get(ticket); DaRPCFutureM<R, T> future = pendingFutures.get(ticket);
if (future == null){ if (future == null){
logger.info("no pending future (receive) for ticket " + ticket); logger.debug("no pending future (receive) for ticket " + ticket);
throw new IOException("no pending future (receive) for ticket " + ticket); throw new IOException("no pending future (receive) for ticket " + ticket);
} }
future.getReceiveMessage().update(recvBuffer); future.getReceiveMessage().update(recvBuffer);
...@@ -102,7 +102,7 @@ public class DaRPCClientEndpointM<R extends DaRPCMessage, T extends DaRPCMessage ...@@ -102,7 +102,7 @@ public class DaRPCClientEndpointM<R extends DaRPCMessage, T extends DaRPCMessage
public void dispatchSend(int ticket) throws IOException { public void dispatchSend(int ticket) throws IOException {
DaRPCFutureM<R, T> future = pendingFutures.get(ticket); DaRPCFutureM<R, T> future = pendingFutures.get(ticket);
if (future == null){ if (future == null){
logger.info("no pending future (send) for ticket " + ticket); logger.debug("no pending future (send) for ticket " + ticket);
throw new IOException("no pending future (send) for ticket " + ticket); throw new IOException("no pending future (send) for ticket " + ticket);
} }
if (future.touch()){ if (future.touch()){
......
package rdma; package hpdos_rdma_offloaded.rdma;
/* /*
* DaRPC: Data Center Remote Procedure Call * DaRPC: Data Center Remote Procedure Call
......
package rdma; package hpdos_rdma_offloaded.rdma;
/* /*
* DaRPC: Data Center Remote Procedure Call * DaRPC: Data Center Remote Procedure Call
...@@ -59,6 +59,7 @@ public class DaRPCFutureM<R extends DaRPCMessage, T extends DaRPCMessage> implem ...@@ -59,6 +59,7 @@ public class DaRPCFutureM<R extends DaRPCMessage, T extends DaRPCMessage> implem
this.response = response; this.response = response;
this.metadataResponse = metadataResponse;
this.stream = stream; this.stream = stream;
this.endpoint = endpoint; this.endpoint = endpoint;
this.status = new AtomicInteger(RPC_PENDING); this.status = new AtomicInteger(RPC_PENDING);
...@@ -134,7 +135,7 @@ public class DaRPCFutureM<R extends DaRPCMessage, T extends DaRPCMessage> implem ...@@ -134,7 +135,7 @@ public class DaRPCFutureM<R extends DaRPCMessage, T extends DaRPCMessage> implem
endpoint.pollOnce(); endpoint.pollOnce();
} catch(Exception e){ } catch(Exception e){
status.set(RPC_ERROR); status.set(RPC_ERROR);
logger.info(e.getMessage()); logger.debug(e.getMessage());
} }
} }
return status.get() > 0; return status.get() > 0;
......
package rdma; package hpdos_rdma_offloaded.rdma;
/* /*
* DaRPC: Data Center Remote Procedure Call * DaRPC: Data Center Remote Procedure Call
...@@ -34,11 +34,11 @@ import com.ibm.darpc.DaRPCMessage; ...@@ -34,11 +34,11 @@ import com.ibm.darpc.DaRPCMessage;
public class DaRPCStreamM<R extends DaRPCMessage,T extends DaRPCMessage> { public class DaRPCStreamM<R extends DaRPCMessage,T extends DaRPCMessage> {
private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc"); private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc");
private DaRPCClientEndpointM<R,T> endpoint; public DaRPCClientEndpointM<R,T> endpoint;
private LinkedBlockingDeque<DaRPCFutureM<R, T>> completedList; public LinkedBlockingDeque<DaRPCFutureM<R, T>> completedList;
DaRPCStreamM(DaRPCClientEndpointM<R,T> endpoint, int streamId) throws IOException{ DaRPCStreamM(DaRPCClientEndpointM<R,T> endpoint, int streamId) throws IOException{
logger.info("new direct rpc stream"); logger.debug("new direct rpc stream");
this.endpoint = endpoint; this.endpoint = endpoint;
this.completedList = new LinkedBlockingDeque<DaRPCFutureM<R, T>>(); this.completedList = new LinkedBlockingDeque<DaRPCFutureM<R, T>>();
} }
......
...@@ -33,17 +33,14 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService ...@@ -33,17 +33,14 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService
Request request = event.getReceiveMessage(); Request request = event.getReceiveMessage();
Response response = event.getSendMessage(); Response response = event.getSendMessage();
logger.info("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey())); logger.debug("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey()));
if(request.getValue() != null) if(request.getValue() != null)
logger.info(" value : "+ new String(request.getValue()) ); logger.debug(" value : "+ new String(request.getValue()) );
// System.out.println();
try{ try{
if (request.getRequestType() == RequestType.PUT) { if (request.getRequestType() == RequestType.PUT) {
this.storageHandler.create(request); this.storageHandler.create(request);
response.setAck(AckType.SUCCESS); response.setAck(AckType.SUCCESS);
response.setKey(null);
response.setValue(null);
} }
else if(request.getRequestType() == RequestType.GET) { else if(request.getRequestType() == RequestType.GET) {
byte[] value = this.storageHandler.read(request); byte[] value = this.storageHandler.read(request);
......
...@@ -54,11 +54,11 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -54,11 +54,11 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
Request request = event.getReceiveMessage(); Request request = event.getReceiveMessage();
Response response = event.getSendMessage(); Response response = event.getSendMessage();
Packet packet = new Packet(request.getRequestType(), request.getKey(), request.getValue()); Packet packet = new Packet(request.getRequestType(), request.getKey(), request.getValue());
logger.info("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey())); logger.debug("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey()));
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
long startTime, endTime, timeTaken; long startTime, endTime, timeTaken;
if(request.getValue() != null) if(request.getValue() != null)
logger.info(" value : "+ new String(request.getValue())); logger.debug(" value : "+ new String(request.getValue()));
try{ try{
if (request.getRequestType() == RequestType.PUT) { if (request.getRequestType() == RequestType.PUT) {
startTime = System.nanoTime(); startTime = System.nanoTime();
...@@ -66,27 +66,27 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -66,27 +66,27 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
endTime = System.nanoTime(); endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000; timeTaken = (endTime-startTime) / 1000;
ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime"); ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime");
logger.info("Time to write to local cache: " + timeTaken); logger.debug("Time to write to local cache: " + timeTaken);
// stopwatch.reset(); // stopwatch.reset();
// stopwatch.start(); // stopwatch.start();
startTime = System.nanoTime(); startTime = System.nanoTime();
this.networkHandler.create(packet); this.networkHandler.create(packet);
endTime = System.nanoTime(); endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000; timeTaken = (endTime-startTime) / 1000;
logger.info("Time to replicate: " + timeTaken ); logger.debug("Time to replicate: " + timeTaken );
ExperimentStatistics.collectStatistics(timeTaken, "replicationTime"); ExperimentStatistics.collectStatistics(timeTaken, "replicationTime");
// stopwatch.stop(); // stopwatch.stop();
// logger.info("Time to replicate: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); // logger.debug("Time to replicate: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
// stopwatch.reset(); // stopwatch.reset();
startTime = System.nanoTime(); startTime = System.nanoTime();
// this.networkHandler.sendInvalidateRequest(request); // this.networkHandler.sendInvalidateRequest(request);
endTime = System.nanoTime(); endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000; timeTaken = (endTime-startTime) / 1000;
logger.info("Time to invalidate: " + timeTaken); logger.debug("Time to invalidate: " + timeTaken);
ExperimentStatistics.collectStatistics(timeTaken, "invalidationTime"); ExperimentStatistics.collectStatistics(timeTaken, "invalidationTime");
ExperimentStatistics.collectStatistics(0, "write"); ExperimentStatistics.collectStatistics(0, "write");
// stopwatch.reset(); // stopwatch.reset();
// logger.info("Time to invalidate: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); // logger.debug("Time to invalidate: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
// stopwatch.reset(); // stopwatch.reset();
//Yet to consume isDone //Yet to consume isDone
// var isDone = this.networkHandler.consumeReplicationRequestFutures(replicationRequestFutures); // var isDone = this.networkHandler.consumeReplicationRequestFutures(replicationRequestFutures);
...@@ -99,7 +99,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -99,7 +99,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
stopwatch.start(); stopwatch.start();
byte[] value = this.storageHandler.read(request); byte[] value = this.storageHandler.read(request);
stopwatch.stop(); stopwatch.stop();
logger.info("Time to read: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Time to read: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
if(value == null) if(value == null)
{ {
...@@ -115,17 +115,17 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -115,17 +115,17 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
stopwatch.start(); stopwatch.start();
this.storageHandler.delete(request); this.storageHandler.delete(request);
stopwatch.stop(); stopwatch.stop();
logger.info("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
stopwatch.start(); stopwatch.start();
this.networkHandler.delete(packet); this.networkHandler.delete(packet);
stopwatch.stop(); stopwatch.stop();
logger.info("Time to delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Time to delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
stopwatch.start(); stopwatch.start();
// this.networkHandler.sendInvalidateRequest(request); // this.networkHandler.sendInvalidateRequest(request);
stopwatch.stop(); stopwatch.stop();
logger.info("Time to invalidate delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Time to invalidate delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
// var replicationRequestFutures = this.networkHandler.sendReplicateRequest(request); // var replicationRequestFutures = this.networkHandler.sendReplicateRequest(request);
//Yet to consume isDone //Yet to consume isDone
......
...@@ -25,8 +25,7 @@ import hpdos_rdma_offloaded.protocol.Request; ...@@ -25,8 +25,7 @@ import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.RequestType; import hpdos_rdma_offloaded.protocol.RequestType;
import hpdos_rdma_offloaded.protocol.Response; import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol; import hpdos_rdma_offloaded.protocol.RpcProtocol;
import hpdos_rdma_offloaded.rdma.DaRPCClientGroupM;
import rdma.DaRPCClientGroupM;
public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<Request, Response>{ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<Request, Response>{
NetworkHandlerM networkHandler; NetworkHandlerM networkHandler;
...@@ -48,11 +47,11 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -48,11 +47,11 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
public void processServerEvent(DaRPCServerEvent<Request, Response> event) throws IOException { public void processServerEvent(DaRPCServerEvent<Request, Response> event) throws IOException {
Request request = event.getReceiveMessage(); Request request = event.getReceiveMessage();
Response response = event.getSendMessage(); Response response = event.getSendMessage();
logger.info("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey())); logger.debug("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey()));
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
long startTime, endTime, timeTaken; long startTime, endTime, timeTaken;
if(request.getValue() != null) if(request.getValue() != null)
logger.info(" value : "+ new String(request.getValue())); logger.debug(" value : "+ new String(request.getValue()));
try{ try{
if (request.getRequestType() == RequestType.PUT) { if (request.getRequestType() == RequestType.PUT) {
response.event = event; response.event = event;
...@@ -63,8 +62,8 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -63,8 +62,8 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
endTime = System.nanoTime(); endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000; timeTaken = (endTime-startTime) / 1000;
response.state.getAndIncrement(); response.state.getAndIncrement();
// ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime"); //ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime");
logger.info("Time to write to local cache: " + timeTaken); logger.debug("Time to write to local cache: " + timeTaken);
response.setAck(AckType.SUCCESS); response.setAck(AckType.SUCCESS);
response.setKey(null); response.setKey(null);
...@@ -74,7 +73,7 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -74,7 +73,7 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
stopwatch.start(); stopwatch.start();
byte[] value = this.storageHandler.read(request); byte[] value = this.storageHandler.read(request);
stopwatch.stop(); stopwatch.stop();
logger.info("Time to read: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Time to read: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
if(value == null) if(value == null)
{ {
...@@ -94,7 +93,7 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -94,7 +93,7 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
this.storageHandler.delete(request); this.storageHandler.delete(request);
stopwatch.stop(); stopwatch.stop();
response.state.getAndIncrement(); response.state.getAndIncrement();
logger.info("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
response.event = event; response.event = event;
...@@ -119,18 +118,19 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -119,18 +118,19 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
ForkJoinPool.commonPool().submit(()->{ ForkJoinPool.commonPool().submit(()->{
try{ try{
System.out.println(rpcClientEndpoint.getDstAddr()+" "+rpcClientEndpoint.getSrcAddr()); logger.debug(rpcClientEndpoint.getDstAddr()+" "+rpcClientEndpoint.getSrcAddr());
String clientIP = ((InetSocketAddress)rpcClientEndpoint.getDstAddr()).getHostName(); String clientIP = ((InetSocketAddress)rpcClientEndpoint.getDstAddr()).getHostName();
InetSocketAddress clientSocketAddress = new InetSocketAddress(clientIP,1921); InetSocketAddress clientSocketAddress = new InetSocketAddress(clientIP,1921);
System.out.println("Creating Endpoint"); logger.debug("Creating Endpoint");
var invalidationClientEp = invalidationClientGroup.createEndpoint(); var invalidationClientEp = invalidationClientGroup.createEndpoint();
invalidationClientEp.connect(clientSocketAddress, 100); invalidationClientEp.connect(clientSocketAddress, 1000);
System.out.println("accepted");
if(invalidationClientEp.isConnected()){ if(invalidationClientEp.isConnected()){
logger.debug("accepted");
this.networkHandler.addInvalidationStream(invalidationClientEp.createStream()); this.networkHandler.addInvalidationStream(invalidationClientEp.createStream());
} }
else{ else{
System.out.println("Not able to Connect to client for invalidation"); logger.error("Not able to Connect to client for invalidation");
} }
} }
catch(Exception e){ catch(Exception e){
...@@ -143,8 +143,7 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -143,8 +143,7 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) { public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
try try
{ {
System.out.println("closed"+((InetSocketAddress)rpcClientEndpoint.getDstAddr()).getHostName()); logger.debug("closed"+((InetSocketAddress)rpcClientEndpoint.getDstAddr()).getHostName());
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment