Commit cfb4eeec authored by Pramod S's avatar Pramod S

Added config

parent 043a1aab
app.HOST=192.168.200.20
app.MASTER=true
app.NFOLLOWER=1
app.FOLLOWER1=192.168.200.40
app.FOLLOWER2=
app.MASTER_IP=NA
#this is how to pair threads to cpu cores
app.cpu_affinity=6
//No of thread for each connectin
app.rdma_cluster_size=4
app.rdma_receive_queue=32
app.rdma_send_queue=32
app.rdma_polling=false
app.rdma_max_inline=0
app.rdma_server_port=1920
app.NETWORK_HANDLER_THREADS=10
app.REPLICATION_HANDLER_THREADS=10
app.db_path=/tmp/rocks
#The below properties are for master node only.
app.follower_registration_port=9876
app.sal_registration_port=9875
#Below properties are for followers only.
...@@ -44,10 +44,14 @@ public class MetadataServer{ ...@@ -44,10 +44,14 @@ public class MetadataServer{
DaRPCClientGroupM<Request,Response> followerClientGroup = DaRPCClientGroupM.createClientGroup(rpcProtocol, 100, 10, 16, 16); DaRPCClientGroupM<Request,Response> followerClientGroup = DaRPCClientGroupM.createClientGroup(rpcProtocol, 100, 10, 16, 16);
for(int i=0;i<property.nFollowers;i++) for(int i=0;i<property.nFollowers;i++)
{ {
logger.debug("connecting to follower");
var endpoint = followerClientGroup.createEndpoint(); var endpoint = followerClientGroup.createEndpoint();
endpoint.connect(new InetSocketAddress(property.followersIP[i], property.port), 100); endpoint.connect(new InetSocketAddress(property.followersIP[i], property.port), 100);
if(endpoint.isConnected()) if(endpoint.isConnected())
{
logger.debug("follower connected",property.followersIP[i]);
networkHandler.addReplicationStream(endpoint.createStream()); networkHandler.addReplicationStream(endpoint.createStream());
}
else else
logger.error("Follower Not connected",property.followersIP[i]); logger.error("Follower Not connected",property.followersIP[i]);
} }
...@@ -55,7 +59,9 @@ public class MetadataServer{ ...@@ -55,7 +59,9 @@ public class MetadataServer{
group = DaRPCServerGroup.createServerGroup(metadataServerService, property.clusterAffinities, group = DaRPCServerGroup.createServerGroup(metadataServerService, property.clusterAffinities,
1000, property.maxinline, property.polling, property.recvQueue, 1000, property.maxinline, property.polling, property.recvQueue,
property.sendQueue, property.wqSize, property.clusterSize); property.sendQueue, property.wqSize, property.clusterSize);
ForkJoinPool.commonPool().execute(networkHandler); Thread networkhandlerThread = new Thread(networkHandler);
networkhandlerThread.setName("networkHandlerThread");;
networkhandlerThread.start();
} }
else else
{ {
...@@ -71,11 +77,13 @@ public class MetadataServer{ ...@@ -71,11 +77,13 @@ public class MetadataServer{
InetSocketAddress address = new InetSocketAddress(property.host, property.port); InetSocketAddress address = new InetSocketAddress(property.host, property.port);
serverEp.bind(address, 100); serverEp.bind(address, 100);
while(true){ while(true)
{
try try
{ {
logger.debug("Listening to RDMA requests a, IP: " + property.host + " , PORT: " + property.port); logger.debug("Listening to RDMA requests a, IP: " + property.host + " , PORT: " + property.port);
serverEp.accept(); serverEp.accept();
Thread.getAllStackTraces();
logger.debug("Accepted connection."); logger.debug("Accepted connection.");
} }
catch (IOException e) catch (IOException e)
...@@ -91,11 +99,13 @@ public class MetadataServer{ ...@@ -91,11 +99,13 @@ public class MetadataServer{
public static void main(String[] args) throws Exception public static void main(String[] args) throws Exception
{ {
Property property = new Property(args[0]); Property property = new Property("./app.config");
//Property property = new Property(args[0]);
RocksDB.loadLibrary(); RocksDB.loadLibrary();
//BasicConfigurator.configure(); //BasicConfigurator.configure();
var server = new MetadataServer(); var server = new MetadataServer();
server.storageHandler = new StorageHandler(property.dbPath); server.storageHandler = new StorageHandler(property.dbPath);
server.run(property); server.run(property);
} }
} }
...@@ -32,7 +32,9 @@ public class NetworkHandlerM implements Runnable{ ...@@ -32,7 +32,9 @@ public class NetworkHandlerM implements Runnable{
public void addReplicationStream(DaRPCStreamM<Request,Response> stream) public void addReplicationStream(DaRPCStreamM<Request,Response> stream)
{ {
logger.debug("size"+replicationStreams.size());
this.replicationStreams.add(stream); this.replicationStreams.add(stream);
logger.debug("size"+replicationStreams.size());
} }
void processCloseInvalidationStream(DaRPCStreamM<InvalidationRequest,InvalidationResponse> stream) void processCloseInvalidationStream(DaRPCStreamM<InvalidationRequest,InvalidationResponse> stream)
...@@ -85,6 +87,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -85,6 +87,7 @@ public class NetworkHandlerM implements Runnable{
} }
void processCloseReplicationStream(DaRPCStreamM<Request,Response> stream) void processCloseReplicationStream(DaRPCStreamM<Request,Response> stream)
{ {
logger.info("stream closed");
stream.completedList.forEach(future -> stream.completedList.forEach(future ->
{ {
if(future.metadataResponse.replicationDone.incrementAndGet() == future.metadataResponse.replicatonTotal) if(future.metadataResponse.replicationDone.incrementAndGet() == future.metadataResponse.replicatonTotal)
...@@ -148,7 +151,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -148,7 +151,7 @@ public class NetworkHandlerM implements Runnable{
}); });
replicationStreams.forEach(stream -> replicationStreams.forEach(stream ->
{ {
if( ! stream.endpoint.isClosed()) if(stream.endpoint.isClosed())
{ {
processCloseReplicationStream(stream); processCloseReplicationStream(stream);
replicationStreams.remove(stream); replicationStreams.remove(stream);
...@@ -183,8 +186,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -183,8 +186,7 @@ public class NetworkHandlerM implements Runnable{
if(i == 0) if(i == 0)
{ {
logger.debug("Zero invalidation"); logger.debug("Zero invalidation");
response.state.incrementAndGet(); if(response.state.incrementAndGet() == Response.DONE)
if(response.state.get() == Response.DONE)
{ {
try try
{ {
...@@ -200,7 +202,6 @@ public class NetworkHandlerM implements Runnable{ ...@@ -200,7 +202,6 @@ public class NetworkHandlerM implements Runnable{
public void sendReplications(Request request,Response response) public void sendReplications(Request request,Response response)
{ {
logger.debug("replication send "+replicationStreams.size());
/*Request repRequest = new Request(); /*Request repRequest = new Request();
repRequest.setRequestType(request.getRequestType()); repRequest.setRequestType(request.getRequestType());
repRequest.setKey(request.getKey()); repRequest.setKey(request.getKey());
...@@ -211,6 +212,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -211,6 +212,7 @@ public class NetworkHandlerM implements Runnable{
} }
*/ */
int i = 0; int i = 0;
logger.debug("sending Replication "+replicationStreams.size());
for(var stream : replicationStreams) for(var stream : replicationStreams)
{ {
try try
...@@ -228,11 +230,11 @@ public class NetworkHandlerM implements Runnable{ ...@@ -228,11 +230,11 @@ public class NetworkHandlerM implements Runnable{
if(i == 0) if(i == 0)
{ {
logger.debug("Zero replication"); logger.debug("Zero replication");
response.state.incrementAndGet();
//This will happens only when there are no followers //This will happens only when there are no followers
//and when this thread get delayed and invalidations got succesuful //and when this thread get delayed and invalidations got succesuful
if(response.state.get() == Response.DONE) if(response.state.incrementAndGet() == Response.DONE)
{ {
logger.debug("sending Response");
try{ try{
response.event.triggerResponse(); response.event.triggerResponse();
}catch(IOException ex) }catch(IOException ex)
......
...@@ -62,10 +62,13 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -62,10 +62,13 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
this.storageHandler.create(request); this.storageHandler.create(request);
endTime = System.nanoTime(); endTime = System.nanoTime();
timeTaken = (endTime-startTime) / 1000; timeTaken = (endTime-startTime) / 1000;
response.state.getAndIncrement();
//ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime"); //ExperimentStatistics.collectStatistics(timeTaken, "localWriteTime");
logger.debug("Time to write to local cache: " + timeTaken); logger.debug("Time to write to local cache: " + timeTaken);
if(response.state.incrementAndGet() == Response.DONE)
{
logger.debug("sendingResponse");
event.triggerResponse();
}
response.setAck(AckType.SUCCESS); response.setAck(AckType.SUCCESS);
response.setKey(null); response.setKey(null);
response.setValue(null); response.setValue(null);
...@@ -93,7 +96,11 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -93,7 +96,11 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
stopwatch.start(); stopwatch.start();
this.storageHandler.delete(request); this.storageHandler.delete(request);
stopwatch.stop(); stopwatch.stop();
response.state.getAndIncrement(); if(response.state.incrementAndGet() == Response.DONE)
{
logger.debug("sendingResponse");
event.triggerResponse();
}
logger.debug("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS)); logger.debug("Time to delete to local cache: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
stopwatch.reset(); stopwatch.reset();
...@@ -106,11 +113,16 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -106,11 +113,16 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
event.triggerResponse(); event.triggerResponse();
} }
} }
catch(RocksDBException ex){ catch(RocksDBException ex)
ex.printStackTrace(); {
logger.debug(ex.getMessage());
response.setAck(AckType.DBFAILED); response.setAck(AckType.DBFAILED);
event.triggerResponse(); event.triggerResponse();
} }
catch(IOException ex)
{
logger.debug(ex.getMessage());
}
} }
@Override @Override
...@@ -124,13 +136,16 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -124,13 +136,16 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
InetSocketAddress clientSocketAddress = new InetSocketAddress(clientIP,1921); InetSocketAddress clientSocketAddress = new InetSocketAddress(clientIP,1921);
logger.debug("Creating Endpoint"); logger.debug("Creating Endpoint");
var invalidationClientEp = invalidationClientGroup.createEndpoint(); var invalidationClientEp = invalidationClientGroup.createEndpoint();
logger.debug("Connecting Endpoint");
invalidationClientEp.connect(clientSocketAddress, 1000); invalidationClientEp.connect(clientSocketAddress, 1000);
logger.debug("connect call success");
if(invalidationClientEp.isConnected()){ if(invalidationClientEp.isConnected())
{
logger.debug("accepted"); logger.debug("accepted");
this.networkHandler.addInvalidationStream(invalidationClientEp.createStream()); this.networkHandler.addInvalidationStream(invalidationClientEp.createStream());
} }
else{ else
{
logger.error("Not able to Connect to client for invalidation"); logger.error("Not able to Connect to client for invalidation");
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment