Commit 043a1aab authored by Pramod S's avatar Pramod S

updated invalidation polling

parent 2b6c1a94
app.HOST=192.168.200.20
app.MASTER=true
app.NFOLLOWER=1
app.FOLLOWER1=192.168.200.40
app.FOLLOWER2=
app.MASTER_IP=NA
#this is how to pair threads to cpu cores
app.cpu_affinity=4
//No of thread for each connectin
app.rdma_cluster_size=4
app.rdma_receive_queue=32
app.rdma_send_queue=32
app.rdma_polling=false
app.rdma_max_inline=0
app.rdma_server_port=1920
app.NETWORK_HANDLER_THREADS=10
app.REPLICATION_HANDLER_THREADS=10
app.db_path=/tmp/rocks
#The below properties are for master node only.
app.follower_registration_port=9876
app.sal_registration_port=9875
#Below properties are for followers only.
...@@ -3,10 +3,11 @@ package hpdos_rdma_offloaded; ...@@ -3,10 +3,11 @@ package hpdos_rdma_offloaded;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.log4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExperimentStatistics { public class ExperimentStatistics {
final static Logger logger = Logger.getLogger(ExperimentStatistics.class); final static Logger logger = LoggerFactory.getLogger("ExperimentStatistics");
public static void collectStatistics(long latency, String path){ public static void collectStatistics(long latency, String path){
if (path.equals("localWriteTime")) { if (path.equals("localWriteTime")) {
BenchmarkingProperties.localWriteTime = BenchmarkingProperties.localWriteTime + latency; BenchmarkingProperties.localWriteTime = BenchmarkingProperties.localWriteTime + latency;
......
...@@ -10,9 +10,9 @@ import com.ibm.darpc.DaRPCServerEndpoint; ...@@ -10,9 +10,9 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup; import com.ibm.darpc.DaRPCServerGroup;
import com.ibm.disni.RdmaServerEndpoint; import com.ibm.disni.RdmaServerEndpoint;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import hpdos_rdma_offloaded.handler.NetworkHandlerM; import hpdos_rdma_offloaded.handler.NetworkHandlerM;
import hpdos_rdma_offloaded.handler.StorageHandler; import hpdos_rdma_offloaded.handler.StorageHandler;
...@@ -30,7 +30,7 @@ public class MetadataServer{ ...@@ -30,7 +30,7 @@ public class MetadataServer{
MetadataServiceMaster metadataService; MetadataServiceMaster metadataService;
FollowerMetadataService followerMetadataService; FollowerMetadataService followerMetadataService;
final static Logger logger = Logger.getLogger(MetadataServer.class); final static Logger logger = LoggerFactory.getLogger("MetadataServer");
public void run(Property property){ public void run(Property property){
try try
...@@ -48,6 +48,8 @@ public class MetadataServer{ ...@@ -48,6 +48,8 @@ public class MetadataServer{
endpoint.connect(new InetSocketAddress(property.followersIP[i], property.port), 100); endpoint.connect(new InetSocketAddress(property.followersIP[i], property.port), 100);
if(endpoint.isConnected()) if(endpoint.isConnected())
networkHandler.addReplicationStream(endpoint.createStream()); networkHandler.addReplicationStream(endpoint.createStream());
else
logger.error("Follower Not connected",property.followersIP[i]);
} }
var metadataServerService = new MetadataServiceMaster(networkHandler,storageHandler); var metadataServerService = new MetadataServiceMaster(networkHandler,storageHandler);
group = DaRPCServerGroup.createServerGroup(metadataServerService, property.clusterAffinities, group = DaRPCServerGroup.createServerGroup(metadataServerService, property.clusterAffinities,
...@@ -91,7 +93,7 @@ public class MetadataServer{ ...@@ -91,7 +93,7 @@ public class MetadataServer{
{ {
Property property = new Property(args[0]); Property property = new Property(args[0]);
RocksDB.loadLibrary(); RocksDB.loadLibrary();
BasicConfigurator.configure(); //BasicConfigurator.configure();
var server = new MetadataServer(); var server = new MetadataServer();
server.storageHandler = new StorageHandler(property.dbPath); server.storageHandler = new StorageHandler(property.dbPath);
server.run(property); server.run(property);
......
...@@ -23,8 +23,9 @@ import com.ibm.darpc.DaRPCFuture; ...@@ -23,8 +23,9 @@ import com.ibm.darpc.DaRPCFuture;
import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.DaRPCStream;
import com.ibm.disni.util.StopWatch; import com.ibm.disni.util.StopWatch;
import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import hpdos_rdma_offloaded.lib.Follower; import hpdos_rdma_offloaded.lib.Follower;
import hpdos_rdma_offloaded.lib.Packet; import hpdos_rdma_offloaded.lib.Packet;
...@@ -36,7 +37,7 @@ import hpdos_rdma_offloaded.protocol.Response; ...@@ -36,7 +37,7 @@ import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol; import hpdos_rdma_offloaded.protocol.RpcProtocol;
public class NetworkHandler { public class NetworkHandler {
final static Logger logger = Logger.getLogger(NetworkHandler.class); final static Logger logger = LoggerFactory.getLogger("NetworkHandler");
private ExecutorService executorService; private ExecutorService executorService;
// public HashMap<String, DaRPCStream<Request, Response>> streams; // public HashMap<String, DaRPCStream<Request, Response>> streams;
public static ConcurrentHashMap<String, DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationStreams = new ConcurrentHashMap<>(); public static ConcurrentHashMap<String, DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationStreams = new ConcurrentHashMap<>();
......
package hpdos_rdma_offloaded.handler; package hpdos_rdma_offloaded.handler;
import java.io.IOException; import java.io.IOException;
import org.apache.log4j.Logger; import org.slf4j.Logger;
import org.checkerframework.checker.units.qual.degrees; import org.slf4j.LoggerFactory;
import java.lang.System.Logger.Level; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import com.ibm.disni.util.NativeAffinity; import com.ibm.disni.util.NativeAffinity;
...@@ -18,22 +17,24 @@ import hpdos_rdma_offloaded.rdma.DaRPCStreamM; ...@@ -18,22 +17,24 @@ import hpdos_rdma_offloaded.rdma.DaRPCStreamM;
public class NetworkHandlerM implements Runnable{ public class NetworkHandlerM implements Runnable{
private CopyOnWriteArrayList<DaRPCStreamM<InvalidationRequest, InvalidationResponse>> invalidationStreams; private ConcurrentLinkedQueue<DaRPCStreamM<InvalidationRequest, InvalidationResponse>> invalidationStreams;
private CopyOnWriteArrayList<DaRPCStreamM<Request, Response>> replicationStreams; private ConcurrentLinkedQueue<DaRPCStreamM<Request, Response>> replicationStreams;
final static Logger logger = Logger.getLogger(NetworkHandlerM.class); final static Logger logger = LoggerFactory.getLogger("NetworkHandler");
public NetworkHandlerM() public NetworkHandlerM()
{ {
this.invalidationStreams = new CopyOnWriteArrayList<>(); this.invalidationStreams = new ConcurrentLinkedQueue<>();
this.replicationStreams = new CopyOnWriteArrayList<>(); this.replicationStreams = new ConcurrentLinkedQueue<>();
} }
public void addInvalidationStream(DaRPCStreamM<InvalidationRequest, InvalidationResponse> stream){ public void addInvalidationStream(DaRPCStreamM<InvalidationRequest, InvalidationResponse> stream){
this.invalidationStreams.add(stream); this.invalidationStreams.add(stream);
} }
public void addReplicationStream(DaRPCStreamM<Request,Response> stream) public void addReplicationStream(DaRPCStreamM<Request,Response> stream)
{ {
this.replicationStreams.add(stream); this.replicationStreams.add(stream);
} }
void processCloseInvalidationStream(DaRPCStreamM<InvalidationRequest,InvalidationResponse> stream) void processCloseInvalidationStream(DaRPCStreamM<InvalidationRequest,InvalidationResponse> stream)
{ {
stream.completedList.forEach(future -> stream.completedList.forEach(future ->
...@@ -46,7 +47,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -46,7 +47,7 @@ public class NetworkHandlerM implements Runnable{
future.metadataResponse.event.triggerResponse(); future.metadataResponse.event.triggerResponse();
}catch(IOException ex) }catch(IOException ex)
{ {
logger.error( ex); logger.error( ex.toString());
} }
} }
}); });
...@@ -59,7 +60,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -59,7 +60,7 @@ public class NetworkHandlerM implements Runnable{
future.metadataResponse.event.triggerResponse(); future.metadataResponse.event.triggerResponse();
}catch(IOException ex) }catch(IOException ex)
{ {
logger.error( ex); logger.error( ex.toString());
} }
} }
}); });
...@@ -94,7 +95,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -94,7 +95,7 @@ public class NetworkHandlerM implements Runnable{
future.metadataResponse.event.triggerResponse(); future.metadataResponse.event.triggerResponse();
}catch(IOException ex) }catch(IOException ex)
{ {
logger.error( ex); logger.error( ex.toString());
} }
} }
}); });
...@@ -107,7 +108,7 @@ public class NetworkHandlerM implements Runnable{ ...@@ -107,7 +108,7 @@ public class NetworkHandlerM implements Runnable{
future.metadataResponse.event.triggerResponse(); future.metadataResponse.event.triggerResponse();
}catch(IOException ex) }catch(IOException ex)
{ {
logger.error( ex); logger.error( ex.toString());
} }
} }
}); });
...@@ -164,23 +165,35 @@ public class NetworkHandlerM implements Runnable{ ...@@ -164,23 +165,35 @@ public class NetworkHandlerM implements Runnable{
InvalidationRequest inRequest = new InvalidationRequest(); InvalidationRequest inRequest = new InvalidationRequest();
//logger.debug("invdlidation send "+invalidationStreams.size()); //logger.debug("invdlidation send "+invalidationStreams.size());
inRequest.setKey( request.getKey()); inRequest.setKey( request.getKey());
response.invalidationTotal = invalidationStreams.size();
int i = 0;
for(var stream : invalidationStreams) for(var stream : invalidationStreams)
{ {
try try
{ {
stream.request(inRequest, new InvalidationResponse(), response, true); stream.request(inRequest, new InvalidationResponse(), response, true);
++i;
} }
catch(IOException ex) catch(IOException ex)
{ {
logger.error(ex); logger.error(ex.toString());
} }
} }
response.invalidationTotal = i;
if(invalidationStreams.size() == 0) if(i == 0)
{ {
logger.debug("Zero invalidation");
response.state.incrementAndGet(); response.state.incrementAndGet();
if(response.state.get() == Response.DONE)
{
try
{
response.event.triggerResponse();
}catch(IOException e)
{
logger.error(e.getMessage());
}
}
} }
} }
...@@ -196,25 +209,37 @@ public class NetworkHandlerM implements Runnable{ ...@@ -196,25 +209,37 @@ public class NetworkHandlerM implements Runnable{
{ {
repRequest.setValue(request.getValue()); repRequest.setValue(request.getValue());
} }
*/ */
response.replicatonTotal = replicationStreams.size(); int i = 0;
for(var stream : replicationStreams) for(var stream : replicationStreams)
{ {
try try
{ {
stream.request(request, new Response(), response, true); stream.request(request, new Response(), response, true);
++i;
} }
catch(IOException ex) catch(IOException ex)
{ {
logger.error(ex); logger.error(ex.toString());
} }
} }
response.replicatonTotal = i;
if(replicationStreams.size() == 0) //This is because when there are no followers
if(i == 0)
{ {
logger.debug("Zero replication");
response.state.incrementAndGet(); response.state.incrementAndGet();
//This will happens only when there are no followers
//and when this thread get delayed and invalidations got succesuful
if(response.state.get() == Response.DONE)
{
try{
response.event.triggerResponse();
}catch(IOException ex)
{
logger.error(ex.getMessage());
}
}
} }
} }
} }
...@@ -4,12 +4,9 @@ import java.io.IOException; ...@@ -4,12 +4,9 @@ import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.ClassNotFoundException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
...@@ -25,7 +22,7 @@ public class RegistrationHandler { ...@@ -25,7 +22,7 @@ public class RegistrationHandler {
int follower_registration_port; int follower_registration_port;
int sal_registration_port; int sal_registration_port;
NetworkHandler networkHandler; NetworkHandler networkHandler;
final static Logger logger = Logger.getLogger(RegistrationHandler.class); final static Logger logger = Logger.getLogger("RegistrationHandler");
public RegistrationHandler(boolean isMaster, String registrationServerIP, String masterIp, int registrationPort, NetworkHandler networkHandler) throws IOException { public RegistrationHandler(boolean isMaster, String registrationServerIP, String masterIp, int registrationPort, NetworkHandler networkHandler) throws IOException {
this.registrationServerIp = registrationServerIP; this.registrationServerIp = registrationServerIP;
this.follower_registration_port = 9876; this.follower_registration_port = 9876;
......
package hpdos_rdma_offloaded.handler; package hpdos_rdma_offloaded.handler;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.apache.log4j.Logger;
import com.ibm.disni.util.DiSNILogger;
import org.rocksdb.Options; import org.rocksdb.Options;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import hpdos_rdma_offloaded.protocol.Request; import hpdos_rdma_offloaded.protocol.Request;
...@@ -12,7 +16,8 @@ public class StorageHandler implements AutoCloseable{ ...@@ -12,7 +16,8 @@ public class StorageHandler implements AutoCloseable{
RocksDB rocksDB; RocksDB rocksDB;
Options rockDbOptions; Options rockDbOptions;
final static Logger logger = Logger.getLogger(StorageHandler.class);
final static Logger logger = LoggerFactory.getLogger("StorageHandler");
public StorageHandler(String dbpath) throws RocksDBException public StorageHandler(String dbpath) throws RocksDBException
{ {
logger.warn("Creating RocksDB"); logger.warn("Creating RocksDB");
......
...@@ -7,8 +7,10 @@ import com.ibm.darpc.DaRPCServerEndpoint; ...@@ -7,8 +7,10 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerEvent; import com.ibm.darpc.DaRPCServerEvent;
import com.ibm.darpc.DaRPCService; import com.ibm.darpc.DaRPCService;
import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import hpdos_rdma_offloaded.handler.StorageHandler; import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.protocol.AckType; import hpdos_rdma_offloaded.protocol.AckType;
...@@ -21,7 +23,7 @@ import hpdos_rdma_offloaded.protocol.RpcProtocol; ...@@ -21,7 +23,7 @@ import hpdos_rdma_offloaded.protocol.RpcProtocol;
public class FollowerMetadataService extends RpcProtocol implements DaRPCService<Request, Response>{ public class FollowerMetadataService extends RpcProtocol implements DaRPCService<Request, Response>{
StorageHandler storageHandler; StorageHandler storageHandler;
final static Logger logger = Logger.getLogger(FollowerMetadataService.class); final static Logger logger = LoggerFactory.getLogger("Metadata service");
DaRPCClientGroup<InvalidationRequest, InvalidationResponse> invalidationClientGroup; DaRPCClientGroup<InvalidationRequest, InvalidationResponse> invalidationClientGroup;
public FollowerMetadataService(StorageHandler storageHandler) throws Exception{ public FollowerMetadataService(StorageHandler storageHandler) throws Exception{
logger.debug("New MetadataServerService Created"); logger.debug("New MetadataServerService Created");
...@@ -72,7 +74,13 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService ...@@ -72,7 +74,13 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService
@Override @Override
public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint)
{ {
logger.debug("Received new connection "); try
{
logger.debug("Received new connection "+rpcClientEndpoint.getDstAddr());
}catch(Exception ex)
{
logger.error(ex.getMessage());
}
} }
@Override @Override
......
...@@ -11,8 +11,10 @@ import com.ibm.darpc.DaRPCServerEndpoint; ...@@ -11,8 +11,10 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerEvent; import com.ibm.darpc.DaRPCServerEvent;
import com.ibm.darpc.DaRPCService; import com.ibm.darpc.DaRPCService;
import org.apache.log4j.Logger;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//import hpdos_rdma_offloaded.ExperimentStatistics; //import hpdos_rdma_offloaded.ExperimentStatistics;
import hpdos_rdma_offloaded.handler.NetworkHandlerM; import hpdos_rdma_offloaded.handler.NetworkHandlerM;
...@@ -31,9 +33,8 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R ...@@ -31,9 +33,8 @@ public class MetadataServiceMaster extends RpcProtocol implements DaRPCService<R
NetworkHandlerM networkHandler; NetworkHandlerM networkHandler;
StorageHandler storageHandler; StorageHandler storageHandler;
DaRPCClientGroupM<InvalidationRequest, InvalidationResponse> invalidationClientGroup; DaRPCClientGroupM<InvalidationRequest, InvalidationResponse> invalidationClientGroup;
final static Logger logger = LoggerFactory.getLogger("MetadataService");
final static Logger logger = Logger.getLogger(MasterMetadataServerService.class);
public MetadataServiceMaster(NetworkHandlerM networkHandler,StorageHandler storageHandler) throws Exception{ public MetadataServiceMaster(NetworkHandlerM networkHandler,StorageHandler storageHandler) throws Exception{
logger.debug("New MetadataServerService Created"); logger.debug("New MetadataServerService Created");
this.networkHandler = networkHandler; this.networkHandler = networkHandler;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment