Commit 90c74082 authored by Pramod S's avatar Pramod S

Changed Replication and Invalidation Polling

parent f1c96ec5
......@@ -6,6 +6,12 @@
<attribute name="gradle_used_by_scope" value="main,test"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="bin/main" path="src/main/resources">
<attributes>
<attribute name="gradle_scope" value="main"/>
<attribute name="gradle_used_by_scope" value="main,test"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11/"/>
<classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/>
<classpathentry kind="output" path="bin/default"/>
......
app.HOST=192.168.200.20
app.MASTER=true
app.NFOLLOWER=2
app.FOLLOWER1=
app.FOLLOWER2=
app.MASTER_IP=NA
#this is how to pair threads to cpu cores
app.cpu_affinity=4
......
package hpdos_rdma_offloaded;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Properties;
import java.util.concurrent.ForkJoinPool;
import hpdos_rdma_offloaded.lib.Property;
import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup;
......@@ -14,105 +12,66 @@ import com.ibm.disni.RdmaServerEndpoint;
import org.apache.log4j.Logger;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.handler.NetworkHandler;
import hpdos_rdma_offloaded.handler.RegistrationHandler;
import hpdos_rdma_offloaded.handler.NetworkHandlerM;
import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol;
import hpdos_rdma_offloaded.service.FollowerMetadataService;
import hpdos_rdma_offloaded.service.MasterMetadataServerService;
public class MetadataServer implements Runnable{
private String host;
private String masterIp;
private int poolsize;
private int recvQueue;
private int sendQueue;
private int clusterSize;
private int wqSize = recvQueue;
private boolean polling;
private int maxinline = 0;
private int rdma_port;
private long[] clusterAffinities;
import hpdos_rdma_offloaded.service.MetadataServiceMaster;
import rdma.DaRPCClientGroupM;
boolean isMaster;
public class MetadataServer{
StorageHandler storageHandler;
NetworkHandler networkHandler;
MasterMetadataServerService metadataServerService = null;
FollowerMetadataService followerMetadataServerService = null;
RegistrationHandler registrationHandler = null;
public final static Properties properties= new Properties();
String propertyPath;
final static Logger logger = Logger.getLogger(MetadataServer.class);
public MetadataServer(String propertyPath) throws Exception,RocksDBException {
this.propertyPath = propertyPath;
InputStream inputStream = new FileInputStream(propertyPath);
properties.load(inputStream);
this.isMaster = Boolean.valueOf((String)properties.get("app.MASTER"));
host = properties.getProperty("app.HOST");
poolsize = Integer.valueOf( properties.getProperty("app.cpu_affinity"));
recvQueue = Integer.valueOf(properties.getProperty("app.rdma_receive_queue"));
sendQueue = Integer.valueOf( properties.getProperty("app.rdma_send_queue"));
clusterSize = Integer.valueOf(properties.getProperty("app.rdma_cluster_size"));
wqSize = recvQueue;
maxinline = Integer.valueOf(properties.getProperty("app.rdma_max_inline"));
rdma_port = Integer.valueOf(properties.getProperty("app.rdma_server_port"));
polling = Boolean.valueOf(properties.getProperty("app.rdma_polling"));
masterIp = String.valueOf(properties.getProperty("app.MASTER_IP"));
int registrationPort = Integer.valueOf(properties.getProperty("app.follower_registration_port"));
clusterAffinities = new long[poolsize];
for (int i = 0; i < poolsize; i++)
{
long cpu = 1L << i;
clusterAffinities[i] = cpu;
// System.out.println(cpu);
}
isMaster = Boolean.valueOf(properties.getProperty("app.MASTER"));
storageHandler = new StorageHandler(properties.getProperty("app.db_path"));
networkHandler = new NetworkHandler(isMaster);
// metadataServerService = new MasterMetadataServerService(this.networkHandler,storageHandler);
if(isMaster)
{
logger.debug("STARTING MASTER SERVICES.");
registrationHandler = new RegistrationHandler(isMaster, host, masterIp, registrationPort, networkHandler);
metadataServerService = new MasterMetadataServerService(this.networkHandler,storageHandler);
}
else
{
logger.debug("STARTING FOLLOWER SERVICES.");
registrationHandler = new RegistrationHandler(isMaster, host, masterIp, registrationPort, networkHandler);
followerMetadataServerService = new FollowerMetadataService(storageHandler);
}
MetadataServiceMaster metadataService;
FollowerMetadataService followerMetadataService;
}
final static Logger logger = Logger.getLogger(MetadataServer.class);
public void run(){
// MetadataServerService rpcService = new MetadataServerService();
DaRPCServerGroup<Request, Response> group = null;
public void run(Property property){
try
{
if (isMaster) {
group = DaRPCServerGroup.createServerGroup(this.metadataServerService, this.clusterAffinities, 1000, maxinline, polling, recvQueue, sendQueue, wqSize, clusterSize);
} else {
group = DaRPCServerGroup.createServerGroup(this.followerMetadataServerService, this.clusterAffinities, 1000, maxinline, polling, recvQueue, sendQueue, wqSize, clusterSize);
DaRPCServerGroup<Request, Response> group = null;
if(property.isMaster)
{
logger.debug("STARTING MASTER SERVICES.");
NetworkHandlerM networkHandler = new NetworkHandlerM();
RpcProtocol rpcProtocol = new RpcProtocol();
DaRPCClientGroupM<Request,Response> followerClientGroup = DaRPCClientGroupM.createClientGroup(rpcProtocol, 100, 10, 16, 16);
for(int i=0;i<property.nFollowers;i++)
{
var endpoint = followerClientGroup.createEndpoint();
endpoint.connect(new InetSocketAddress(property.followersIP[i], property.port), 100);
if(endpoint.isConnected())
networkHandler.addReplicationStream(endpoint.createStream());
}
var metadataServerService = new MetadataServiceMaster(networkHandler,storageHandler);
group = DaRPCServerGroup.createServerGroup(metadataServerService, property.clusterAffinities,
1000, property.maxinline, property.polling, property.recvQueue,
property.sendQueue, property.wqSize, property.clusterSize);
ForkJoinPool.commonPool().execute(networkHandler);
}
else
{
logger.debug("STARTING FOLLOWER SERVICES.");
//registrationHandler = new RegistrationHandler(isMaster, host, masterIp, registrationPort, networkHandler);
var followerMetadataServerService = new FollowerMetadataService(storageHandler);
group = DaRPCServerGroup.createServerGroup(followerMetadataServerService, property.clusterAffinities,
1000, property.maxinline, property.polling, property.recvQueue,
property.sendQueue, property.wqSize, property.clusterSize);
}
RdmaServerEndpoint<DaRPCServerEndpoint<Request, Response>> serverEp = null;
serverEp = group.createServerEndpoint();
InetSocketAddress address = new InetSocketAddress(host, rdma_port);
RdmaServerEndpoint<DaRPCServerEndpoint<Request, Response>> serverEp = group.createServerEndpoint();
InetSocketAddress address = new InetSocketAddress(property.host, property.port);
serverEp.bind(address, 100);
while(true){
try
{
logger.debug("Listening to RDMA requests a, IP: " + host + " , PORT: " + rdma_port);
logger.debug("Listening to RDMA requests a, IP: " + property.host + " , PORT: " + property.port);
serverEp.accept();
logger.debug("Accepted connection.");
}
......@@ -129,11 +88,10 @@ public class MetadataServer implements Runnable{
public static void main(String[] args) throws Exception
{
RocksDB.loadLibrary();
//var process = Runtime.getRuntime().exec("/bin/sh export LD_LIBRARY_PATH=/usr/local/lib");
//process.wait();
// System.out.println("Exit value "+process.exitValue());
Runnable r = new MetadataServer(args[0]);
new Thread(r).start();
Property property = new Property(args[0]);
RocksDB.loadLibrary();
var server = new MetadataServer();
server.storageHandler = new StorageHandler(property.dbPath);
server.run(property);
}
}
......@@ -44,7 +44,6 @@ public class NetworkHandler {
boolean isMaster;
// ConcurrentHashMap<SocketAddress,DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationClientEpStreams;
public NetworkHandler(boolean isMaster){
this.executorService = Executors.newFixedThreadPool(10);
this.isMaster = isMaster;
}
......@@ -200,9 +199,8 @@ public class NetworkHandler {
}
}
public void sendInvalidateRequest(Request request) throws InterruptedException, ExecutionException{
/* Paras method
public DaRPCFuture<InvalidationRequest, InvalidationResponse> sendInvalidateRequest(Request request){
//Paras method
/* public ArrayList<DaRPCFuture<InvalidationRequest, InvalidationResponse>> sendInvalidateRequest(Request request){
System.out.println("Sending Invalidation Request");
InvalidationRequest invalidationRequest = new InvalidationRequest();
......@@ -222,8 +220,10 @@ public class NetworkHandler {
}
return requestFutures;
*/
}/*
// Pramod method
/*
public void sendInvalidateRequest(Request request) throws InterruptedException, ExecutionException{
ExecutorService executorService = Executors.newWorkStealingPool();
logger.info("Sending Invalidation Request");
InvalidationRequest invalidationRequest = new InvalidationRequest();
......@@ -243,7 +243,9 @@ public class NetworkHandler {
InvalidationResponse response = future.get().get();
}
executorService.shutdown();
}
*/
public boolean consumeReplicationRequestFutures(ArrayList<DaRPCFuture<InvalidationRequest,InvalidationResponse>> requestFutures )
throws InterruptedException,ExecutionException{
......@@ -259,6 +261,7 @@ public class NetworkHandler {
}
if(requestFutures.size() == 0)
return true;
//if not done now wait for 100 milliseconds before sending response
requestFutures.get(0).get(100,TimeUnit.MILLISECONDS);
......
package hpdos_rdma_offloaded.handler;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import com.ibm.disni.util.NativeAffinity;
import hpdos_rdma_offloaded.protocol.InvalidationRequest;
import hpdos_rdma_offloaded.protocol.InvalidationResponse;
import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.RequestType;
import hpdos_rdma_offloaded.protocol.Response;
import rdma.DaRPCFutureM;
import rdma.DaRPCStreamM;
public class NetworkHandlerM implements Runnable{
private CopyOnWriteArrayList<DaRPCStreamM<InvalidationRequest, InvalidationResponse>> invalidationStreams;
private CopyOnWriteArrayList<DaRPCStreamM<Request, Response>> replicationStreams;
public NetworkHandlerM()
{
this.invalidationStreams = new CopyOnWriteArrayList<>();
this.replicationStreams = new CopyOnWriteArrayList<>();
}
public void addInvalidationStream(DaRPCStreamM<InvalidationRequest, InvalidationResponse> stream){
this.invalidationStreams.add(stream);
}
public void addReplicationStream(DaRPCStreamM<Request,Response> stream)
{
this.replicationStreams.add(stream);
}
public void run()
{
NativeAffinity.setAffinity(1<<1);
while(true)
{
for(var stream : invalidationStreams)
{
try
{
DaRPCFutureM<InvalidationRequest,InvalidationResponse> future = stream.processStream();
if(future.metadataResponse.invalidationTotal == future.metadataResponse.invalidationDone.incrementAndGet())
{
if(Response.DONE == future.metadataResponse.state.incrementAndGet())
future.metadataResponse.event.triggerResponse();
}
}catch(Exception exx)
{
exx.printStackTrace();
}
}
for(var stream : replicationStreams)
{
try
{
DaRPCFutureM<Request,Response> future = stream.processStream();
if(future.metadataResponse.replicatonTotal == future.metadataResponse.replicationDone.incrementAndGet())
{
if(Response.DONE == future.metadataResponse.state.incrementAndGet())
future.metadataResponse.event.triggerResponse();
}
}catch(Exception exx)
{
exx.printStackTrace();
}
}
}
}
public void sendInvalidations(Request request,Response response)
{
InvalidationRequest inRequest = new InvalidationRequest();
inRequest.setKey( request.getKey());
response.invalidationTotal = invalidationStreams.size();
for(var stream : invalidationStreams)
{
try
{
stream.request(inRequest, new InvalidationResponse(), response, false);
}
catch(IOException ex)
{
System.out.println(ex);
}
}
}
public void sendReplications(Request request,Response response)
{
Request repRequest = new Request();
repRequest.setRequestType(request.getRequestType());
repRequest.setKey(request.getKey());
if(request.getRequestType() == RequestType.PUT)
{
repRequest.setValue(request.getValue());
}
response.replicatonTotal = replicationStreams.size();
for(var stream : replicationStreams)
{
try
{
stream.request(repRequest, new Response(), response, false);
}
catch(IOException ex)
{
System.out.println(ex);
}
}
}
}
......@@ -23,9 +23,9 @@ import hpdos_rdma_offloaded.protocol.*;
public class ReplicationHandler {
public static List<Follower> followers = new ArrayList<>();
public List<Follower> followers = new ArrayList<>();
final static Logger logger = Logger.getLogger(ReplicationHandler.class);
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
//public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public ReplicationHandler(){}
public void addFollower(Follower follower){
......@@ -33,7 +33,7 @@ public class ReplicationHandler {
}
public void replicateMetadata(Packet packet) throws InterruptedException, ExecutionException{
Stopwatch stopWatch = Stopwatch.createUnstarted();
/* Stopwatch stopWatch = Stopwatch.createUnstarted();
stopWatch.start();
Set<Callable<DaRPCFuture<Request, Response>>> callables = new HashSet<>();
for(Follower follower : NetworkHandler.followers){
......@@ -54,7 +54,15 @@ public class ReplicationHandler {
}
stopWatch.stop();
logger.info("Replication future time: " + stopWatch.elapsed(TimeUnit.MICROSECONDS));
stopWatch.reset();
stopWatch.reset();*/
}
public void setFollowers(ArrayList<Follower> followers)
{
this.followers = followers;
}
public void replicate(Request request)
{
}
}
package hpdos_rdma_offloaded.lib;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Property {
public String host;
public String masterIp;
public String dbPath;
public int poolsize;
public int recvQueue;
public int sendQueue;
public int clusterSize;
public int wqSize = recvQueue;
public boolean polling;
public int maxinline = 0;
public int port;
public long[] clusterAffinities;
public boolean isMaster;
public int nFollowers;
public String[] followersIP;
public Property(String path) throws IOException
{
InputStream inputStream = new FileInputStream(path);
Properties prop = new Properties();
prop.load(inputStream);
isMaster = Boolean.valueOf((String)prop.get("app.MASTER"));
host = prop.getProperty("app.HOST");
dbPath = prop.getProperty("app.db_path");
poolsize = Integer.valueOf( prop.getProperty("app.cpu_affinity"));
recvQueue = Integer.valueOf(prop.getProperty("app.rdma_receive_queue"));
sendQueue = Integer.valueOf( prop.getProperty("app.rdma_send_queue"));
clusterSize = Integer.valueOf(prop.getProperty("app.rdma_cluster_size"));
wqSize = recvQueue;
maxinline = Integer.valueOf(prop.getProperty("app.rdma_max_inline"));
port = Integer.valueOf(prop.getProperty("app.rdma_server_port"));
polling = Boolean.valueOf(prop.getProperty("app.rdma_polling"));
masterIp = String.valueOf(prop.getProperty("app.MASTER_IP"));
clusterAffinities = new long[poolsize];
for (int i = 0; i < poolsize; i++)
{
long cpu = 1L << i;
clusterAffinities[i] = cpu;
}
if(isMaster)
{
nFollowers = Integer.valueOf(prop.getProperty("app.NFOLLOWER"));
followersIP = new String[nFollowers];
for(int i = 1;i<=nFollowers;i++)
followersIP[i-1] = prop.getProperty("app.FOLLOWER"+i);
}
}
}
......@@ -18,7 +18,8 @@ public class Request implements DaRPCMessage
{
// System.out.println("Request Write Method");
buffer.putInt(requestType);
buffer.put(key);
if(key == null)
buffer.put(key);
//if operation type is get and delete then value is not required
if(requestType == RequestType.PUT)
{
......
......@@ -6,5 +6,6 @@ public interface RequestType
public static int GET =101;
public static int DELETE =102;
public static int INVALIDATE = 103;
public static int GETSERVERS = 104;
public static int ADDFOLLOWER = 105;
};
......@@ -2,15 +2,50 @@ package hpdos_rdma_offloaded.protocol;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import com.ibm.darpc.DaRPCMessage;
import com.ibm.darpc.DaRPCServerEvent;
public class Response implements DaRPCMessage{
public static int SERIALIZED_SIZE = 260;
public static int SERIALIZED_SIZE = 260;
public static int INVALIDATION_DONE = 1;
public static int REPLICATION_DONE = 2;
public static int DONE = 3;
public static int NEW = 0;
public AtomicInteger invalidationDone;
public AtomicInteger replicationDone;
public int invalidationTotal;
public int replicatonTotal ;
public AtomicInteger state;
public DaRPCServerEvent<Request,Response> event;
private int ack;
public byte[] key;
public byte[] value;
public byte[] value;
public Response()
{
state = new AtomicInteger(NEW);
replicationDone = new AtomicInteger(0);
invalidationDone = new AtomicInteger(0);
}
public void setEvent(DaRPCServerEvent<Request,Response> event)
{
this.event = event;
}
public void triggerResponse() throws IOException{
event.triggerResponse();
}
@Override
public int write(ByteBuffer buffer) throws IOException
{
......
......@@ -11,7 +11,6 @@ import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.lib.Packet;
import hpdos_rdma_offloaded.protocol.AckType;
import hpdos_rdma_offloaded.protocol.InvalidationRequest;
import hpdos_rdma_offloaded.protocol.InvalidationResponse;
......@@ -74,12 +73,14 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService
}
@Override
public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint)
{
logger.debug("Received new connection ");
}
@Override
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint)
{
logger.debug("Closing Connection");
}
......
......@@ -182,12 +182,8 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
NetworkHandler.invalidationStreams.remove(ipAddress);
ExperimentStatistics.printStatistics();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// TODO : Remove the connection from the static lists stores in Network Handler
// NetworkHandler.invalidationStreams.remove(rpcClientEndpoint.)
}
......
package hpdos_rdma_offloaded.service;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerEvent;
import com.ibm.darpc.DaRPCService;
import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException;
//import hpdos_rdma_offloaded.ExperimentStatistics;
import hpdos_rdma_offloaded.handler.NetworkHandlerM;
import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.protocol.AckType;
import hpdos_rdma_offloaded.protocol.InvalidationRequest;
import hpdos_rdma_offloaded.protocol.InvalidationResponse;
import hpdos_rdma_offloaded.protocol.InvalidationRpcProtocol;
import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.protocol.RequestType;
import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol;
import rdma.DaRPCClientGroupM;
public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<Request, Response>{
NetworkHandlerM networkHandler;
StorageHandler storageHandler;
DaRPCClientGroupM<InvalidationRequest, InvalidationResponse> invalidationClientGroup;
final static Logger logger = Logger.getLogger(MasterMetadataServerService.class);
public MetadataServiceMaster(NetworkHandlerM networkHandler,StorageHandler storageHandler) throws Exception{
logger.debug("New MetadataServerService Created");
this.networkHandler = networkHandler;
this.storageHandler = storageHandler;
InvalidationRpcProtocol rpcProtocol = new InvalidationRpcProtocol();
invalidationClientGroup = DaRPCClientGroupM.createClientGroup(rpcProtocol, 100, 10, 16, 16);
}
@Override
public void processServerEvent(DaRPCServerEvent<Request, Response> event) throws IOException {
Request request = event.getReceiveMessage();
Response response = event.getSendMessage();
logger.info("Received "+request.getRequestType()+" Request for key: " + new String(request.getKey()));
Stopwatch stopwatch = Stopwatch.createUnstarted();
long startTime, endTime, timeTaken;
if(request.getValue() != null)
logger.info(" value : "+ new String(request.getValue()));
try{
if (request.getRequestType() == RequestType.PUT) {
startTime = System.nanoTime();
this.storageHandler.create(request);
endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000;
response.event = event;
this.networkHandler.sendInvalidations(request, response);
this.networkHandler.sendReplications(request, response);
// ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime");
logger.info("Time to write to local cache: " + timeTaken);
response.setAck(AckType.SUCCESS);
response.setKey(null);
response.setValue(null);
}
else if(request.getRequestType() == RequestType.GET) {
stopwatch.start();
byte[] value = this.storageHandler.read(request);
stopwatch.stop();
logger.info("Time to read: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset();
if(value == null)
{
response.setAck(AckType.NOTFOUND);