Commit 25ceaae4 authored by Pramod S's avatar Pramod S

Changed Replication and Invalidation Polling

parent f1c96ec5
...@@ -6,6 +6,12 @@ ...@@ -6,6 +6,12 @@
<attribute name="gradle_used_by_scope" value="main,test"/> <attribute name="gradle_used_by_scope" value="main,test"/>
</attributes> </attributes>
</classpathentry> </classpathentry>
<classpathentry kind="src" output="bin/main" path="src/main/resources">
<attributes>
<attribute name="gradle_scope" value="main"/>
<attribute name="gradle_used_by_scope" value="main,test"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11/"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11/"/>
<classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/> <classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/>
<classpathentry kind="output" path="bin/default"/> <classpathentry kind="output" path="bin/default"/>
......
app.HOST=192.168.200.20 app.HOST=192.168.200.20
app.MASTER=true app.MASTER=true
app.NFOLLOWER=1
app.FOLLOWER1=192.168.200.210
app.FOLLOWER2=
app.MASTER_IP=NA app.MASTER_IP=NA
#this is how to pair threads to cpu cores #this is how to pair threads to cpu cores
app.cpu_affinity=4 app.cpu_affinity=4
......
package hpdos_rdma_offloaded; package hpdos_rdma_offloaded;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Properties; import java.util.concurrent.ForkJoinPool;
import hpdos_rdma_offloaded.lib.Property;
import com.ibm.darpc.DaRPCServerEndpoint; import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup; import com.ibm.darpc.DaRPCServerGroup;
...@@ -14,105 +12,66 @@ import com.ibm.disni.RdmaServerEndpoint; ...@@ -14,105 +12,66 @@ import com.ibm.disni.RdmaServerEndpoint;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.handler.NetworkHandler; import hpdos_rdma_offloaded.handler.NetworkHandlerM;
import hpdos_rdma_offloaded.handler.RegistrationHandler;
import hpdos_rdma_offloaded.handler.StorageHandler; import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.protocol.Request; import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.Response; import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol;
import hpdos_rdma_offloaded.service.FollowerMetadataService; import hpdos_rdma_offloaded.service.FollowerMetadataService;
import hpdos_rdma_offloaded.service.MasterMetadataServerService; import hpdos_rdma_offloaded.service.MetadataServiceMaster;
import rdma.DaRPCClientGroupM;
public class MetadataServer implements Runnable{
private String host;
private String masterIp;
private int poolsize;
private int recvQueue;
private int sendQueue;
private int clusterSize;
private int wqSize = recvQueue;
private boolean polling;
private int maxinline = 0;
private int rdma_port;
private long[] clusterAffinities;
boolean isMaster; public class MetadataServer{
StorageHandler storageHandler; StorageHandler storageHandler;
NetworkHandler networkHandler;
MasterMetadataServerService metadataServerService = null; MetadataServiceMaster metadataService;
FollowerMetadataService followerMetadataServerService = null; FollowerMetadataService followerMetadataService;
RegistrationHandler registrationHandler = null;
public final static Properties properties= new Properties();
String propertyPath;
final static Logger logger = Logger.getLogger(MetadataServer.class);
public MetadataServer(String propertyPath) throws Exception,RocksDBException {
this.propertyPath = propertyPath;
InputStream inputStream = new FileInputStream(propertyPath);
properties.load(inputStream);
this.isMaster = Boolean.valueOf((String)properties.get("app.MASTER"));
host = properties.getProperty("app.HOST");
poolsize = Integer.valueOf( properties.getProperty("app.cpu_affinity"));
recvQueue = Integer.valueOf(properties.getProperty("app.rdma_receive_queue"));
sendQueue = Integer.valueOf( properties.getProperty("app.rdma_send_queue"));
clusterSize = Integer.valueOf(properties.getProperty("app.rdma_cluster_size"));
wqSize = recvQueue;
maxinline = Integer.valueOf(properties.getProperty("app.rdma_max_inline"));
rdma_port = Integer.valueOf(properties.getProperty("app.rdma_server_port"));
polling = Boolean.valueOf(properties.getProperty("app.rdma_polling"));
masterIp = String.valueOf(properties.getProperty("app.MASTER_IP"));
int registrationPort = Integer.valueOf(properties.getProperty("app.follower_registration_port"));
clusterAffinities = new long[poolsize];
for (int i = 0; i < poolsize; i++)
{
long cpu = 1L << i;
clusterAffinities[i] = cpu;
// System.out.println(cpu);
}
isMaster = Boolean.valueOf(properties.getProperty("app.MASTER"));
storageHandler = new StorageHandler(properties.getProperty("app.db_path"));
networkHandler = new NetworkHandler(isMaster);
// metadataServerService = new MasterMetadataServerService(this.networkHandler,storageHandler);
if(isMaster)
{
logger.debug("STARTING MASTER SERVICES.");
registrationHandler = new RegistrationHandler(isMaster, host, masterIp, registrationPort, networkHandler);
metadataServerService = new MasterMetadataServerService(this.networkHandler,storageHandler);
}
else
{
logger.debug("STARTING FOLLOWER SERVICES.");
registrationHandler = new RegistrationHandler(isMaster, host, masterIp, registrationPort, networkHandler);
followerMetadataServerService = new FollowerMetadataService(storageHandler);
}
} final static Logger logger = Logger.getLogger(MetadataServer.class);
public void run(){ public void run(Property property){
// MetadataServerService rpcService = new MetadataServerService();
DaRPCServerGroup<Request, Response> group = null;
try try
{ {
if (isMaster) { DaRPCServerGroup<Request, Response> group = null;
group = DaRPCServerGroup.createServerGroup(this.metadataServerService, this.clusterAffinities, 1000, maxinline, polling, recvQueue, sendQueue, wqSize, clusterSize); if(property.isMaster)
} else { {
group = DaRPCServerGroup.createServerGroup(this.followerMetadataServerService, this.clusterAffinities, 1000, maxinline, polling, recvQueue, sendQueue, wqSize, clusterSize); logger.debug("STARTING MASTER SERVICES.");
NetworkHandlerM networkHandler = new NetworkHandlerM();
RpcProtocol rpcProtocol = new RpcProtocol();
DaRPCClientGroupM<Request,Response> followerClientGroup = DaRPCClientGroupM.createClientGroup(rpcProtocol, 100, 10, 16, 16);
for(int i=0;i<property.nFollowers;i++)
{
var endpoint = followerClientGroup.createEndpoint();
endpoint.connect(new InetSocketAddress(property.followersIP[i], property.port), 100);
if(endpoint.isConnected())
networkHandler.addReplicationStream(endpoint.createStream());
}
var metadataServerService = new MetadataServiceMaster(networkHandler,storageHandler);
group = DaRPCServerGroup.createServerGroup(metadataServerService, property.clusterAffinities,
1000, property.maxinline, property.polling, property.recvQueue,
property.sendQueue, property.wqSize, property.clusterSize);
ForkJoinPool.commonPool().execute(networkHandler);
}
else
{
logger.debug("STARTING FOLLOWER SERVICES.");
//registrationHandler = new RegistrationHandler(isMaster, host, masterIp, registrationPort, networkHandler);
var followerMetadataServerService = new FollowerMetadataService(storageHandler);
group = DaRPCServerGroup.createServerGroup(followerMetadataServerService, property.clusterAffinities,
1000, property.maxinline, property.polling, property.recvQueue,
property.sendQueue, property.wqSize, property.clusterSize);
} }
RdmaServerEndpoint<DaRPCServerEndpoint<Request, Response>> serverEp = null; RdmaServerEndpoint<DaRPCServerEndpoint<Request, Response>> serverEp = group.createServerEndpoint();
serverEp = group.createServerEndpoint(); InetSocketAddress address = new InetSocketAddress(property.host, property.port);
InetSocketAddress address = new InetSocketAddress(host, rdma_port);
serverEp.bind(address, 100); serverEp.bind(address, 100);
while(true){ while(true){
try try
{ {
logger.debug("Listening to RDMA requests a, IP: " + host + " , PORT: " + rdma_port); logger.debug("Listening to RDMA requests a, IP: " + property.host + " , PORT: " + property.port);
serverEp.accept(); serverEp.accept();
logger.debug("Accepted connection."); logger.debug("Accepted connection.");
} }
...@@ -129,11 +88,10 @@ public class MetadataServer implements Runnable{ ...@@ -129,11 +88,10 @@ public class MetadataServer implements Runnable{
public static void main(String[] args) throws Exception public static void main(String[] args) throws Exception
{ {
RocksDB.loadLibrary(); Property property = new Property(args[0]);
//var process = Runtime.getRuntime().exec("/bin/sh export LD_LIBRARY_PATH=/usr/local/lib"); RocksDB.loadLibrary();
//process.wait(); var server = new MetadataServer();
// System.out.println("Exit value "+process.exitValue()); server.storageHandler = new StorageHandler(property.dbPath);
Runnable r = new MetadataServer(args[0]); server.run(property);
new Thread(r).start();
} }
} }
...@@ -44,7 +44,6 @@ public class NetworkHandler { ...@@ -44,7 +44,6 @@ public class NetworkHandler {
boolean isMaster; boolean isMaster;
// ConcurrentHashMap<SocketAddress,DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationClientEpStreams; // ConcurrentHashMap<SocketAddress,DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationClientEpStreams;
public NetworkHandler(boolean isMaster){ public NetworkHandler(boolean isMaster){
this.executorService = Executors.newFixedThreadPool(10); this.executorService = Executors.newFixedThreadPool(10);
this.isMaster = isMaster; this.isMaster = isMaster;
} }
...@@ -200,9 +199,8 @@ public class NetworkHandler { ...@@ -200,9 +199,8 @@ public class NetworkHandler {
} }
} }
public void sendInvalidateRequest(Request request) throws InterruptedException, ExecutionException{ //Paras method
/* Paras method /* public ArrayList<DaRPCFuture<InvalidationRequest, InvalidationResponse>> sendInvalidateRequest(Request request){
public DaRPCFuture<InvalidationRequest, InvalidationResponse> sendInvalidateRequest(Request request){
System.out.println("Sending Invalidation Request"); System.out.println("Sending Invalidation Request");
InvalidationRequest invalidationRequest = new InvalidationRequest(); InvalidationRequest invalidationRequest = new InvalidationRequest();
...@@ -222,8 +220,10 @@ public class NetworkHandler { ...@@ -222,8 +220,10 @@ public class NetworkHandler {
} }
return requestFutures; return requestFutures;
*/ }/*
// Pramod method // Pramod method
/*
public void sendInvalidateRequest(Request request) throws InterruptedException, ExecutionException{
ExecutorService executorService = Executors.newWorkStealingPool(); ExecutorService executorService = Executors.newWorkStealingPool();
logger.info("Sending Invalidation Request"); logger.info("Sending Invalidation Request");
InvalidationRequest invalidationRequest = new InvalidationRequest(); InvalidationRequest invalidationRequest = new InvalidationRequest();
...@@ -243,7 +243,9 @@ public class NetworkHandler { ...@@ -243,7 +243,9 @@ public class NetworkHandler {
InvalidationResponse response = future.get().get(); InvalidationResponse response = future.get().get();
} }
executorService.shutdown(); executorService.shutdown();
} }
*/
public boolean consumeReplicationRequestFutures(ArrayList<DaRPCFuture<InvalidationRequest,InvalidationResponse>> requestFutures ) public boolean consumeReplicationRequestFutures(ArrayList<DaRPCFuture<InvalidationRequest,InvalidationResponse>> requestFutures )
throws InterruptedException,ExecutionException{ throws InterruptedException,ExecutionException{
...@@ -259,6 +261,7 @@ public class NetworkHandler { ...@@ -259,6 +261,7 @@ public class NetworkHandler {
} }
if(requestFutures.size() == 0) if(requestFutures.size() == 0)
return true; return true;
//if not done now wait for 100 milliseconds before sending response //if not done now wait for 100 milliseconds before sending response
requestFutures.get(0).get(100,TimeUnit.MILLISECONDS); requestFutures.get(0).get(100,TimeUnit.MILLISECONDS);
......
package hpdos_rdma_offloaded.handler;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import com.ibm.disni.util.NativeAffinity;
import hpdos_rdma_offloaded.protocol.InvalidationRequest;
import hpdos_rdma_offloaded.protocol.InvalidationResponse;
import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.RequestType;
import hpdos_rdma_offloaded.protocol.Response;
import rdma.DaRPCFutureM;
import rdma.DaRPCStreamM;
public class NetworkHandlerM implements Runnable{
private CopyOnWriteArrayList<DaRPCStreamM<InvalidationRequest, InvalidationResponse>> invalidationStreams;
private CopyOnWriteArrayList<DaRPCStreamM<Request, Response>> replicationStreams;
public NetworkHandlerM()
{
this.invalidationStreams = new CopyOnWriteArrayList<>();
this.replicationStreams = new CopyOnWriteArrayList<>();
}
public void addInvalidationStream(DaRPCStreamM<InvalidationRequest, InvalidationResponse> stream){
this.invalidationStreams.add(stream);
}
public void addReplicationStream(DaRPCStreamM<Request,Response> stream)
{
this.replicationStreams.add(stream);
}
public void run()
{
NativeAffinity.setAffinity(1<<1);
while(true)
{
for(var stream : invalidationStreams)
{
try
{
DaRPCFutureM<InvalidationRequest,InvalidationResponse> future = stream.processStream();
if(future.metadataResponse.invalidationTotal == future.metadataResponse.invalidationDone.incrementAndGet())
{
if(Response.DONE == future.metadataResponse.state.incrementAndGet())
future.metadataResponse.event.triggerResponse();
}
}catch(Exception exx)
{
exx.printStackTrace();
}
}
for(var stream : replicationStreams)
{
try
{
DaRPCFutureM<Request,Response> future = stream.processStream();
if(future.metadataResponse.replicatonTotal == future.metadataResponse.replicationDone.incrementAndGet())
{
if(Response.DONE == future.metadataResponse.state.incrementAndGet())
future.metadataResponse.event.triggerResponse();
}
}catch(Exception exx)
{
exx.printStackTrace();
}
}
}
}
public void sendInvalidations(Request request,Response response)
{
InvalidationRequest inRequest = new InvalidationRequest();
inRequest.setKey( request.getKey());
response.invalidationTotal = invalidationStreams.size();
for(var stream : invalidationStreams)
{
try
{
stream.request(inRequest, new InvalidationResponse(), response, false);
}
catch(IOException ex)
{
System.out.println(ex);
}
}
}
public void sendReplications(Request request,Response response)
{
Request repRequest = new Request();
repRequest.setRequestType(request.getRequestType());
repRequest.setKey(request.getKey());
if(request.getRequestType() == RequestType.PUT)
{
repRequest.setValue(request.getValue());
}
response.replicatonTotal = replicationStreams.size();
for(var stream : replicationStreams)
{
try
{
stream.request(repRequest, new Response(), response, false);
}
catch(IOException ex)
{
System.out.println(ex);
}
}
}
}
...@@ -23,9 +23,9 @@ import hpdos_rdma_offloaded.protocol.*; ...@@ -23,9 +23,9 @@ import hpdos_rdma_offloaded.protocol.*;
public class ReplicationHandler { public class ReplicationHandler {
public static List<Follower> followers = new ArrayList<>(); public List<Follower> followers = new ArrayList<>();
final static Logger logger = Logger.getLogger(ReplicationHandler.class); final static Logger logger = Logger.getLogger(ReplicationHandler.class);
public static ExecutorService executorService = Executors.newFixedThreadPool(10); //public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public ReplicationHandler(){} public ReplicationHandler(){}
public void addFollower(Follower follower){ public void addFollower(Follower follower){
...@@ -33,7 +33,7 @@ public class ReplicationHandler { ...@@ -33,7 +33,7 @@ public class ReplicationHandler {
} }
public void replicateMetadata(Packet packet) throws InterruptedException, ExecutionException{ public void replicateMetadata(Packet packet) throws InterruptedException, ExecutionException{
Stopwatch stopWatch = Stopwatch.createUnstarted(); /* Stopwatch stopWatch = Stopwatch.createUnstarted();
stopWatch.start(); stopWatch.start();
Set<Callable<DaRPCFuture<Request, Response>>> callables = new HashSet<>(); Set<Callable<DaRPCFuture<Request, Response>>> callables = new HashSet<>();
for(Follower follower : NetworkHandler.followers){ for(Follower follower : NetworkHandler.followers){
...@@ -54,7 +54,15 @@ public class ReplicationHandler { ...@@ -54,7 +54,15 @@ public class ReplicationHandler {
} }
stopWatch.stop(); stopWatch.stop();
logger.info("Replication future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS)); logger.info("Replication future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset(); stopWatch.reset();*/
} }
public void setFollowers(ArrayList<Follower> followers)
{
this.followers = followers;
}
public void replicate(Request request)
{
}
} }
package hpdos_rdma_offloaded.lib;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Property {
public String host;
public String masterIp;
public String dbPath;
public int poolsize;
public int recvQueue;
public int sendQueue;
public int clusterSize;
public int wqSize = recvQueue;
public boolean polling;
public int maxinline = 0;
public int port;
public long[] clusterAffinities;
public boolean isMaster;
public int nFollowers;
public String[] followersIP;
public Property(String path) throws IOException
{
InputStream inputStream = new FileInputStream(path);
Properties prop = new Properties();
prop.load(inputStream);
isMaster = Boolean.valueOf((String)prop.get("app.MASTER"));
host = prop.getProperty("app.HOST");
dbPath = prop.getProperty("app.db_path");
poolsize = Integer.valueOf( prop.getProperty("app.cpu_affinity"));
recvQueue = Integer.valueOf(prop.getProperty("app.rdma_receive_queue"));
sendQueue = Integer.valueOf( prop.getProperty("app.rdma_send_queue"));
clusterSize = Integer.valueOf(prop.getProperty("app.rdma_cluster_size"));
wqSize = recvQueue;
maxinline = Integer.valueOf(prop.getProperty("app.rdma_max_inline"));
port = Integer.valueOf(prop.getProperty("app.rdma_server_port"));
polling = Boolean.valueOf(prop.getProperty("app.rdma_polling"));
masterIp = String.valueOf(prop.getProperty("app.MASTER_IP"));
clusterAffinities = new long[poolsize];
for (int i = 0; i < poolsize; i++)
{
long cpu = 1L << i;
clusterAffinities[i] = cpu;
}
if(isMaster)
{
nFollowers = Integer.valueOf(prop.getProperty("app.NFOLLOWER"));
followersIP = new String[nFollowers];
for(int i = 1;i<=nFollowers;i++)
followersIP[i-1] = prop.getProperty("app.FOLLOWER"+i);
}
}
}
...@@ -18,7 +18,8 @@ public class Request implements DaRPCMessage ...@@ -18,7 +18,8 @@ public class Request implements DaRPCMessage
{ {
// System.out.println("Request Write Method"); // System.out.println("Request Write Method");
buffer.putInt(requestType); buffer.putInt(requestType);
buffer.put(key); if(key == null)
buffer.put(key);
//if operation type is get and delete then value is not required //if operation type is get and delete then value is not required
if(requestType == RequestType.PUT) if(requestType == RequestType.PUT)
{ {
......
...@@ -6,5 +6,6 @@ public interface RequestType ...@@ -6,5 +6,6 @@ public interface RequestType
public static int GET =101; public static int GET =101;
public static int DELETE =102; public static int DELETE =102;
public static int INVALIDATE = 103; public static int INVALIDATE = 103;
public static int GETSERVERS = 104;
public static int ADDFOLLOWER = 105;
}; };
...@@ -2,15 +2,50 @@ package hpdos_rdma_offloaded.protocol; ...@@ -2,15 +2,50 @@ package hpdos_rdma_offloaded.protocol;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import com.ibm.darpc.DaRPCMessage; import com.ibm.darpc.DaRPCMessage;
import com.ibm.darpc.DaRPCServerEvent;
public class Response implements DaRPCMessage{ public class Response implements DaRPCMessage{
public static int SERIALIZED_SIZE = 260;
public static int SERIALIZED_SIZE = 260;
public static int INVALIDATION_DONE = 1;
public static int REPLICATION_DONE = 2;
public static int DONE = 3;
public static int NEW = 0;
public AtomicInteger invalidationDone;
public AtomicInteger replicationDone;
public int invalidationTotal;
public int replicatonTotal ;
public AtomicInteger state;
public DaRPCServerEvent<Request,Response> event;
private int ack; private int ack;
public byte[] key; public byte[] key;
public byte[] value; public byte[] value;
public Response()
{
state = new AtomicInteger(NEW);
replicationDone = new AtomicInteger(0);
invalidationDone = new AtomicInteger(0);
}
public void setEvent(DaRPCServerEvent<Request,Response> event)
{
this.event = event;
}
public void triggerResponse() throws IOException{
event.triggerResponse();
}
@Override @Override
public int write(ByteBuffer buffer) throws IOException public int write(ByteBuffer buffer) throws IOException
{ {
......
...@@ -11,7 +11,6 @@ import org.apache.log4j.Logger; ...@@ -11,7 +11,6 @@ import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.handler.StorageHandler; import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.lib.Packet;
import hpdos_rdma_offloaded.protocol.AckType; import hpdos_rdma_offloaded.protocol.AckType;
import hpdos_rdma_offloaded.protocol.InvalidationRequest; import hpdos_rdma_offloaded.protocol.InvalidationRequest;
import hpdos_rdma_offloaded.protocol.InvalidationResponse; import hpdos_rdma_offloaded.protocol.InvalidationResponse;
...@@ -74,12 +73,14 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService ...@@ -74,12 +73,14 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService
} }
@Override @Override
public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) { public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint)
{
logger.debug("Received new connection "); logger.debug("Received new connection ");
} }
@Override @Override
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) { public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint)
{
logger.debug("Closing Connection"); logger.debug("Closing Connection");
} }
......
...@@ -80,7 +80,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -80,7 +80,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
// logger.info("Time to replicate: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); // logger.info("Time to replicate: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
// stopwatch.reset(); // stopwatch.reset();
startTime = System.nanoTime(); startTime = System.nanoTime();
this.networkHandler.sendInvalidateRequest(request); // this.networkHandler.sendInvalidateRequest(request);
endTime = System.nanoTime(); endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000; timeTaken = (endTime-startTime) / 1000;
logger.info("Time to invalidate: " + timeTaken); logger.info("Time to invalidate: " + timeTaken);
...@@ -124,7 +124,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -124,7 +124,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
logger.info("Time to delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.info("Time to delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
stopwatch.start(); stopwatch.start();
this.networkHandler.sendInvalidateRequest(request); // this.networkHandler.sendInvalidateRequest(request);
stopwatch.stop(); stopwatch.stop();
logger.info("Time to invalidate delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.info("Time to invalidate delete: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
...@@ -182,12 +182,8 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -182,12 +182,8 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
NetworkHandler.invalidationStreams.remove(ipAddress); NetworkHandler.invalidationStreams.remove(ipAddress);
ExperimentStatistics.printStatistics(); ExperimentStatistics.printStatistics();
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
// TODO : Remove the connection from the static lists stores in Network Handler
// NetworkHandler.invalidationStreams.remove(rpcClientEndpoint.)
} }
......
package hpdos_rdma_offloaded.service;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerEvent;
import com.ibm.darpc.DaRPCService;
import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException;
//import hpdos_rdma_offloaded.ExperimentStatistics;
import hpdos_rdma_offloaded.handler.NetworkHandlerM;
import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.protocol.AckType;
import hpdos_rdma_offloaded.protocol.InvalidationRequest;
import hpdos_rdma_offloaded.protocol.InvalidationResponse;
import hpdos_rdma_offloaded.protocol.InvalidationRpcProtocol;
import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.RequestType;
import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol;
import rdma.DaRPCClientGroupM;
public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<Request, Response>{
NetworkHandlerM networkHandler;
StorageHandler storageHandler;
DaRPCClientGroupM<InvalidationRequest, InvalidationResponse> invalidationClientGroup;
final static Logger logger = Logger.getLogger(MasterMetadataServerService.class);
public MetadataServiceMaster(NetworkHandlerM networkHandler,StorageHandler storageHandler) throws Exception{
logger.debug("New MetadataServerService Created");
this.networkHandler = networkHandler;
this.storageHandler = storageHandler;
InvalidationRpcProtocol rpcProtocol = new InvalidationRpcProtocol();
invalidationClientGroup = DaRPCClientGroupM.createClientGroup(rpcProtocol, 100, 10, 16, 16);
}
@Override
public void processServerEvent(DaRPCServerEvent<Request, Response> event) throws IOException {
Request request = event.getReceiveMessage();
Response response = event.getSendMessage();
logger.info("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey()));
Stopwatch stopwatch = Stopwatch.createUnstarted();
long startTime, endTime, timeTaken;
if(request.getValue() != null)
logger.info(" value : "+ new String(request.getValue()));
try{
if (request.getRequestType() == RequestType.PUT) {
startTime = System.nanoTime();
this.storageHandler.create(request);
endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000;
response.event = event;
this.networkHandler.sendInvalidations(request, response);
this.networkHandler.sendReplications(request, response);
// ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime");
logger.info("Time to write to local cache: " + timeTaken);
response.setAck(AckType.SUCCESS);
response.setKey(null);
response.setValue(null);
}
else if(request.getRequestType() == RequestType.GET) {
stopwatch.start();
byte[] value = this.storageHandler.read(request);
stopwatch.stop();
logger.info("Time to read: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset();
if(value == null)
{
response.setAck(AckType.NOTFOUND);
}
else
{
response.setAck(AckType.SUCCESS_WITH_VALUE);
response.setValue(value);
}
event.triggerResponse();
}
else if(request.getRequestType() == RequestType.DELETE){
stopwatch.start();
this.storageHandler.delete(request);
stopwatch.stop();
logger.info("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset();
response.event = event;
this.networkHandler.sendInvalidations(request, response);
this.networkHandler.sendReplications(request, response);
response.setAck(AckType.SUCCESS);
}
else{
response.setAck(AckType.INVALID_REQUEST);
event.triggerResponse();
}
}
catch(RocksDBException ex){
ex.printStackTrace();
response.setAck(AckType.DBFAILED);
event.triggerResponse();
}
}
@Override
public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
logger.debug("Received new connection ");
ForkJoinPool.commonPool().submit(()->{
try{
System.out.println(rpcClientEndpoint.getDstAddr()+" "+rpcClientEndpoint.getSrcAddr());
String clientIP = ((InetSocketAddress)rpcClientEndpoint.getDstAddr()).getHostName();
InetSocketAddress clientSocketAddress = new InetSocketAddress(clientIP,1921);
System.out.println("Creating Endpoint");
var invalidationClientEp = invalidationClientGroup.createEndpoint();
invalidationClientEp.connect(clientSocketAddress, 100);
System.out.println("accepted");
if(invalidationClientEp.isConnected()){
this.networkHandler.addInvalidationStream(invalidationClientEp.createStream());
}
else{
System.out.println("Not able to Connect to client for invalidation");
}
}
catch(Exception e){
e.printStackTrace();
}
});
}
@Override
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
try
{
System.out.println("closed"+((InetSocketAddress)rpcClientEndpoint.getDstAddr()).getHostName());
} catch (IOException e) {
e.printStackTrace();
}
}
}
package rdma;
/*
* DaRPC: Data Center Remote Procedure Call
*
* Author: Patrick Stuedi <stu@zurich.ibm.com>
*
* Copyright (C) 2016-2018, IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ibm.disni.verbs.IbvCQ;
import com.ibm.disni.verbs.IbvWC;
import com.ibm.disni.verbs.RdmaCmId;
import com.ibm.disni.verbs.SVCPollCq;
import com.ibm.darpc.DaRPCMessage;
import com.ibm.darpc.DaRPCEndpoint;
import com.ibm.darpc.DaRPCEndpointGroup;
public class DaRPCClientEndpointM<R extends DaRPCMessage, T extends DaRPCMessage> extends DaRPCEndpoint<R,T> {
private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc");
private ConcurrentHashMap<Integer, DaRPCFutureM<R,T>> pendingFutures;
private AtomicInteger ticketCount;
private int streamCount;
private IbvWC[] wcList;
private SVCPollCq poll;
private ReentrantLock lock;
public DaRPCClientEndpointM(DaRPCEndpointGroup<? extends DaRPCClientEndpointM<R, T>, R, T> group, RdmaCmId idPriv, boolean serverSide) throws IOException {
super(group, idPriv, serverSide);
this.pendingFutures = new ConcurrentHashMap<Integer, DaRPCFutureM<R,T>>();
this.ticketCount = new AtomicInteger(0);
this.streamCount = 1;
this.lock = new ReentrantLock();
}
@Override
public void init() throws IOException {
super.init();
IbvCQ cq = getCqProvider().getCQ();
this.wcList = new IbvWC[getCqProvider().getCqSize()];
for (int i = 0; i < wcList.length; i++){
wcList[i] = new IbvWC();
}
this.poll = cq.poll(wcList, wcList.length);
}
public DaRPCStreamM<R, T> createStream() throws IOException {
int streamId = this.streamCount;
DaRPCStreamM<R,T> stream = new DaRPCStreamM<R,T>(this, streamId);
streamCount++;
return stream;
}
int sendRequest(DaRPCFutureM<R,T> future) throws IOException {
int ticket = getAndIncrement();
future.stamp(ticket);
pendingFutures.put(future.getTicket(), future);
while (!sendMessage(future.getSendMessage(), future.getTicket())){
pollOnce();
}
return ticket;
}
@Override
public void dispatchReceive(ByteBuffer recvBuffer, int ticket, int recvIndex) throws IOException {
DaRPCFutureM<R, T> future = pendingFutures.get(ticket);
if (future == null){
logger.info("no pending future (receive) for ticket " + ticket);
throw new IOException("no pending future (receive) for ticket " + ticket);
}
future.getReceiveMessage().update(recvBuffer);
postRecv(recvIndex);
if (future.touch()){
pendingFutures.remove(ticket);
freeSend(ticket);
}
future.signal(0);
}
@Override
public void dispatchSend(int ticket) throws IOException {
DaRPCFutureM<R, T> future = pendingFutures.get(ticket);
if (future == null){
logger.info("no pending future (send) for ticket " + ticket);
throw new IOException("no pending future (send) for ticket " + ticket);
}
if (future.touch()){
pendingFutures.remove(ticket);
freeSend(ticket);
}
}
private int getAndIncrement() {
return ticketCount.getAndIncrement() & Integer.MAX_VALUE;
}
public void pollOnce() throws IOException {
if (!lock.tryLock()){
return;
}
try {
_pollOnce();
} finally {
lock.unlock();
}
}
public void pollUntil(AtomicInteger future, long timeout) throws IOException {
boolean locked = false;
while(true){
locked = lock.tryLock();
if (future.get() > 0 || locked){
break;
}
}
try {
if (future.get() == 0){
_pollUntil(future, timeout);
}
} finally {
if (locked){
lock.unlock();
}
}
}
private int _pollOnce() throws IOException {
int res = poll.execute().getPolls();
if (res > 0) {
for (int i = 0; i < res; i++){
IbvWC wc = wcList[i];
dispatchCqEvent(wc);
}
}
return res;
}
private int _pollUntil(AtomicInteger future, long timeout) throws IOException {
long count = 0;
final long checkTimeOut = 1 << 14 /* 16384 */;
long startTime = System.nanoTime();
while(future.get() == 0){
int res = poll.execute().getPolls();
if (res > 0){
for (int i = 0; i < res; i++){
IbvWC wc = wcList[i];
dispatchCqEvent(wc);
}
}
if (count == checkTimeOut) {
count = 0;
if ((System.nanoTime() - startTime) / 1e6 > timeout) {
break;
}
}
count++;
}
return 1;
}
}
package rdma;
/*
* DaRPC: Data Center Remote Procedure Call
*
* Author: Patrick Stuedi <stu@zurich.ibm.com>
*
* Copyright (C) 2016-2018, IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import java.io.IOException;
import com.ibm.disni.RdmaCqProvider;
import com.ibm.disni.RdmaEndpointFactory;
import com.ibm.disni.verbs.IbvCQ;
import com.ibm.disni.verbs.IbvQP;
import com.ibm.disni.verbs.RdmaCmId;
import com.ibm.darpc.DaRPCMessage;
import com.ibm.darpc.DaRPCEndpointGroup;
import com.ibm.darpc.DaRPCProtocol;
public class DaRPCClientGroupM<R extends DaRPCMessage, T extends DaRPCMessage> extends DaRPCEndpointGroup<DaRPCClientEndpointM<R,T>, R, T> {
public static <R extends DaRPCMessage, T extends DaRPCMessage> DaRPCClientGroupM<R, T> createClientGroup(DaRPCProtocol<R, T> protocol, int timeout, int maxinline, int recvQueue, int sendQueue) throws Exception {
DaRPCClientGroupM<R,T> group = new DaRPCClientGroupM<R,T>(protocol, timeout, maxinline, recvQueue, sendQueue);
group.init(new RpcClientFactory<R,T>(group));
return group;
}
private DaRPCClientGroupM(DaRPCProtocol<R, T> protocol, int timeout, int maxinline, int recvQueue, int sendQueue)
throws Exception {
super(protocol, timeout, maxinline, recvQueue, sendQueue);
}
@Override
public void allocateResources(DaRPCClientEndpointM<R, T> endpoint) throws Exception {
endpoint.allocateResources();
}
@Override
public RdmaCqProvider createCqProvider(DaRPCClientEndpointM<R, T> endpoint) throws IOException {
return new RdmaCqProvider(endpoint.getIdPriv().getVerbs(), recvQueueSize() + sendQueueSize());
}
@Override
public IbvQP createQpProvider(DaRPCClientEndpointM<R, T> endpoint) throws IOException {
RdmaCqProvider cqProvider = endpoint.getCqProvider();
IbvCQ cq = cqProvider.getCQ();
IbvQP qp = this.createQP(endpoint.getIdPriv(), endpoint.getPd(), cq);
return qp;
}
public static class RpcClientFactory<R extends DaRPCMessage, T extends DaRPCMessage> implements RdmaEndpointFactory<DaRPCClientEndpointM<R,T>> {
private DaRPCClientGroupM<R, T> group;
public RpcClientFactory(DaRPCClientGroupM<R, T> group){
this.group = group;
}
@Override
public DaRPCClientEndpointM<R,T> createEndpoint(RdmaCmId id, boolean serverSide) throws IOException {
return new DaRPCClientEndpointM<R,T>(group, id, serverSide);
}
}
}
package rdma;
/*
* DaRPC: Data Center Remote Procedure Call
*
* Author: Patrick Stuedi <stu@zurich.ibm.com>
*
* Copyright (C) 2016-2018, IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import hpdos_rdma_offloaded.protocol.Response;
import com.ibm.darpc.DaRPCMessage;
public class DaRPCFutureM<R extends DaRPCMessage, T extends DaRPCMessage> implements Future<T> {
private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc");
protected static int RPC_PENDING = 0;
protected static int RPC_DONE = 1;
protected static int RPC_ERROR = 2;
private int ticket;
private R request;
private T response;
public Response metadataResponse;
private DaRPCStreamM<R, T> stream;
private DaRPCClientEndpointM<R,T> endpoint;
private boolean streamLogged;
private AtomicInteger status;
private AtomicInteger recvStatus;
public DaRPCFutureM(DaRPCStreamM<R,T> stream, DaRPCClientEndpointM<R,T> endpoint, R request, T response,Response metadataResponse, boolean streamLogged){
this.request = request;
this.response = response;
this.ticket = -1;
this.response = response;
this.stream = stream;
this.endpoint = endpoint;
this.status = new AtomicInteger(RPC_PENDING);
this.streamLogged = streamLogged;
this.recvStatus = new AtomicInteger(0);
}
public int getTicket() {
return this.ticket;
}
public R getSendMessage(){
return request;
}
public Response getMetadataResponse(){
return metadataResponse;
}
public T getReceiveMessage(){
return response;
}
public void stamp(int ticket) {
this.ticket = ticket;
}
@Override
public T get() throws InterruptedException, ExecutionException {
if (status.get() == RPC_PENDING){
try {
endpoint.pollUntil(status, Long.MAX_VALUE);
} catch(Exception e){
status.set(RPC_ERROR);
throw new InterruptedException(e.getMessage());
}
}
if (status.get() == RPC_DONE){
return this.getReceiveMessage();
} else if (status.get() == RPC_PENDING){
throw new InterruptedException("RPC timeout");
} else {
throw new InterruptedException("RPC error");
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
if (status.get() == RPC_PENDING){
try {
endpoint.pollUntil(status, timeout);
} catch(Exception e){
status.set(RPC_ERROR);
throw new InterruptedException(e.getMessage());
}
}
if (status.get() == RPC_DONE){
return this.getReceiveMessage();
} else if (status.get() == RPC_PENDING){
throw new InterruptedException("RPC timeout");
} else {
throw new InterruptedException("RPC error");
}
}
@Override
public boolean isDone() {
if (status.get() == 0) {
try {
endpoint.pollOnce();
} catch(Exception e){
status.set(RPC_ERROR);
logger.info(e.getMessage());
}
}
return status.get() > 0;
}
public synchronized void signal(int wcstatus) {
if (status.get() == 0){
if (wcstatus == 0){
status.set(RPC_DONE);
} else {
status.set(RPC_ERROR);
}
if (streamLogged){
stream.addFuture(this);
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
public boolean isStreamLogged() {
return streamLogged;
}
public boolean touch(){
if (recvStatus.incrementAndGet() == 2){
recvStatus.set(0);
return true;
} else {
return false;
}
}
}
package rdma;
/*
* DaRPC: Data Center Remote Procedure Call
*
* Author: Patrick Stuedi <stu@zurich.ibm.com>
*
* Copyright (C) 2016-2018, IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import hpdos_rdma_offloaded.protocol.Response;
import com.ibm.darpc.DaRPCMessage;
public class DaRPCStreamM<R extends DaRPCMessage,T extends DaRPCMessage> {
private static final Logger logger = LoggerFactory.getLogger("com.ibm.darpc");
private DaRPCClientEndpointM<R,T> endpoint;
private LinkedBlockingDeque<DaRPCFutureM<R, T>> completedList;
DaRPCStreamM(DaRPCClientEndpointM<R,T> endpoint, int streamId) throws IOException{
logger.info("new direct rpc stream");
this.endpoint = endpoint;
this.completedList = new LinkedBlockingDeque<DaRPCFutureM<R, T>>();
}
public DaRPCFutureM<R, T> request(R request, T response,Response metadataResponse, boolean streamLogged) throws IOException {
DaRPCFutureM<R, T> future = new DaRPCFutureM<R, T>(this, endpoint, request, response,metadataResponse, streamLogged);
endpoint.sendRequest(future);
return future;
}
public DaRPCFutureM<R, T> take() throws IOException {
try {
DaRPCFutureM<R, T> future = completedList.poll();
while (future == null){
endpoint.pollOnce();
future = completedList.poll();
}
return future;
} catch(Exception e){
throw new IOException(e);
}
}
public DaRPCFutureM<R,T> processStream() throws IOException{
endpoint.pollOnce();
return completedList.poll();
}
public DaRPCFutureM<R, T> take(int timeout) throws IOException {
try {
DaRPCFutureM<R, T> future = completedList.poll();
long sumtime = 0;
while (future == null && sumtime < timeout){
endpoint.pollOnce();
future = completedList.poll();
}
return future;
} catch (Exception e){
throw new IOException(e);
}
}
public DaRPCFutureM<R, T> poll() throws IOException {
DaRPCFutureM<R, T> future = completedList.poll();
if (future == null){
endpoint.pollOnce();
future = completedList.poll();
}
return future;
}
public void clear(){
completedList.clear();
}
void addFuture(DaRPCFutureM<R, T> future){
completedList.add(future);
}
public boolean isEmpty() {
return completedList.isEmpty();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment