Commit a155b14c authored by Paras Garg's avatar Paras Garg

Simplified things

parent 397f695d
......@@ -228,29 +228,9 @@ public class BenchmarkingClient {
}
logger.debug("DONE...");
NetworkHandler networkHandler = new NetworkHandler();
networkHandler.printStatistics();
networkHandler.benchmarkingProperties.printStatistics();
experimentExecutors.shutdown();
}
}
class BenchmarkingProperties {
static Double reads;
static Double writes;
static Double deletes;
static int clients;
static long requestsPerClient;
static Double requestsPerSecond;
static int keySize;
static int valueSize;
// Result statistics
static int readCount;
static int writeCount;
static int deleteCount;
static long readTime = 0;
static long writeTime = 0 ;
static long deleteTime = 0;
static long timePerRequest = 0;
}
import java.lang.reflect.InaccessibleObjectException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.ibm.darpc.DaRPCClientEndpoint;
import com.ibm.darpc.DaRPCClientGroup;
import com.ibm.darpc.DaRPCStream;
import org.apache.log4j.Logger;
import org.apache.log4j.helpers.SyslogQuietWriter;
import hpdos.cache.SalCache;
import hpdos.handlers.NetworkHandler;
import hpdos.handlers.NetworkHandlerM;
import hpdos.invalidationServer.InvalidationServer;
import hpdos.protocol.AckType;
import hpdos.protocol.Request;
import hpdos.protocol.RequestType;
import hpdos.protocol.Response;
import hpdos.protocol.RpcProtocol;
import hpdos.protocol.ByteArray;
import hpdos.protocol.Property;
public class MetadataClient{
private DaRPCStream<Request, Response> stream;
private DaRPCClientEndpoint<Request, Response> clientEp;
public class MetadataClient
{
private InvalidationServer invalidationServer;
// SalCache cache;
public Cache<String, String> cache;
NetworkHandler networkHandler;
private NetworkHandlerM networkHandlerM;
public Cache<ByteArray, byte[]> cache;
final static Logger logger = Logger.getLogger(MetadataClient.class);
public MetadataClient(Properties properties) throws Exception
public MetadataClient(Property properties) throws Exception
{
String invalidationServerIP = String.valueOf((String)properties.get("app.HOST"));
String invalidationServerIP = properties.invalidationServerIP;
logger.debug("Invalidation server ip: " + invalidationServerIP);
List<String> metadataMasters = new ArrayList<>();
// String metadataMasterIp = String.valueOf((String)properties.get("app.MASTER_HOST1"));
int cacheSize = Integer.valueOf((String)properties.get("app.CACHE_SIZE"));
int numberOfMasters = Integer.valueOf((String)properties.get("app.NO_OF_MASTERS"));
int invalidationServerPort = Integer.valueOf((String)properties.get("app.INVALIDATION_PORT"));
this.networkHandler = new NetworkHandler(cacheSize, numberOfMasters);
this.invalidationServer = new InvalidationServer(cache,invalidationServerIP, invalidationServerPort, this.networkHandler);
this.invalidationServer.acceptInvalidationConnections(numberOfMasters);
for(int i=1;i<=numberOfMasters;i++){
metadataMasters.add(String.valueOf((String)properties.get("app.MASTER_HOST"+i)));
}
for (String metadataMasterIp : metadataMasters) {
this.invalidationServer.sendInvalidationRegistrationRequest(metadataMasterIp, invalidationServerIP);
RpcProtocol rpcProtocol = new RpcProtocol();
DaRPCClientGroup<Request, Response> group = DaRPCClientGroup.createClientGroup(rpcProtocol, 100, 0, 16, 16);
InetSocketAddress address = new InetSocketAddress(metadataMasterIp, 1920);
clientEp = group.createEndpoint();
clientEp.connect(address, 1000);
stream = clientEp.createStream();
NetworkHandler.streams.put(metadataMasterIp, stream);
NetworkHandler.ips.add(metadataMasterIp);
logger.debug("Connected");
}
this.networkHandler.connectToFollowers();
this.networkHandler.initializeLoadBalancer();
this.cache = Caffeine.newBuilder()
.expireAfterWrite(300, TimeUnit.SECONDS)
.maximumSize(cacheSize)
.build();
this.cache = Caffeine.newBuilder()
.expireAfterWrite(300, TimeUnit.SECONDS)
.maximumSize(properties.cacheSize)
.build();
this.invalidationServer = new InvalidationServer(properties.invalidationServerIP, properties.invalidationServerPort,cache);
this.invalidationServer.acceptInvalidationConnections(properties.numberOfMasters);
this.networkHandlerM = new NetworkHandlerM(properties);
}
public byte[] get(byte[] key) throws Exception
{
System.out.println("Getting From local cache");
// var value = this.cache.getIfPresent(key);
String value = this.cache.getIfPresent(new String(key));
logger.debug("Getting From local cache");
byte[] value = this.cache.getIfPresent(new ByteArray(key));
if(value != null)
return value.getBytes(StandardCharsets.UTF_8);
System.out.println("Not found in cache Getting from server");
Request request= new Request();
request.setRequestType(RequestType.GET);
request.setKey(key);
return value;
logger.debug("Not found in cache Getting from server");
Response response = new Response();
stream.request(request, response, false).get();
if(response.getAck() == AckType.SUCCESS_WITH_VALUE){
var futureResponse = networkHandlerM.get(key).get(100, TimeUnit.MILLISECONDS);
if(futureResponse.getAck() == AckType.SUCCESS_WITH_VALUE){
System.out.println("Adding to cache");
this.cache.put(new String(key), new String(response.getValue()));
return response.getValue();
this.cache.put(new ByteArray(key), futureResponse.getValue());
return futureResponse.getValue();
}
return null;
}
public int put(byte[] key,byte[] value) throws Exception
{
System.out.println("Putting KV pair on server ....");
Request request= new Request();
request.setRequestType(RequestType.PUT);
request.setKey(key);
request.setValue(value);
Response response = new Response();
stream.request(request, response, false).get();
var response = networkHandlerM.put(key,value).get(100, TimeUnit.MILLISECONDS);
logger.debug("Putting KV pair on server ....");
if(response.getAck() == AckType.SUCCESS){
this.cache.put(new String(key), new String(value));
this.cache.put(new ByteArray(key), value);
System.out.println("Updating cache..");
}
return response.getAck();
......@@ -114,14 +66,9 @@ public class MetadataClient{
public int delete(byte[] key) throws Exception
{
System.out.println("Invalidating Local Cache and sending delete request to server");
// this.cache.invalidate(key);
Request request= new Request();
request.setRequestType(RequestType.DELETE);
request.setKey(key);
Response response = new Response();
stream.request(request, response, false).get();
logger.debug("Invalidating Local Cache and sending delete request to server");
//this.cache.invalidate(key);
var response = this.networkHandlerM.delete(key).get(100,TimeUnit.MILLISECONDS);
return response.getAck();
}
}
\ No newline at end of file
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.NetworkInterface;
import java.util.Enumeration;
import java.util.Properties;
import java.util.Scanner;
import hpdos.protocol.AckType;
import hpdos.protocol.Property;
public class StandaloneRDMAClient {
//Donot delete this line
......@@ -14,9 +11,7 @@ public class StandaloneRDMAClient {
public static void main(String[] args) throws Exception {
Scanner scanner = new Scanner(System.in);
System.out.println("Enter Invalidation Server IP to bind");
Properties properties= new Properties();
InputStream inputStream = new FileInputStream(args[0]);
properties.load(inputStream);
Property properties = new Property(args[0]);
// String address = scanner.nextLine();
......@@ -36,7 +31,7 @@ public class StandaloneRDMAClient {
key[i] = (byte)inputkey.charAt(i);
}
byte[] value = client.networkHandler.get(key);
byte[] value = client.get(key);
System.out.println("Got Response ");
if(value != null)
......@@ -59,7 +54,7 @@ public class StandaloneRDMAClient {
{
value[i] = (byte)inputvalue.charAt(i);
}
int ack = client.networkHandler.put(key, value);
int ack = client.put(key, value);
System.out.println("Response got Ack " +ack);
if (ack == AckType.SUCCESS) {
System.out.println("Success");
......@@ -74,7 +69,7 @@ public class StandaloneRDMAClient {
{
key[i] = (byte)inputkey.charAt(i);
}
int ack = client.networkHandler.delete(key);
int ack = client.delete(key);
System.out.println("Response got Ack " +ack);
if (ack == AckType.SUCCESS) {
System.out.println("Success");
......
package hpdos.handlers;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import com.ibm.darpc.DaRPCClientGroup;
import com.ibm.darpc.DaRPCEndpoint;
import com.ibm.darpc.DaRPCFuture;
import com.ibm.darpc.DaRPCStream;
import hpdos.protocol.Property;
import hpdos.protocol.Request;
import hpdos.protocol.RequestType;
import hpdos.protocol.Response;
import hpdos.protocol.RpcProtocol;
public class NetworkHandlerM {
DaRPCClientGroup<Request, Response> group;
ArrayList<DaRPCStream<Request,Response>> streams;
ArrayList<DaRPCEndpoint<Request,Response>> eps;
int loadBalanceNode;
public NetworkHandlerM( Property properties) throws Exception
{
RpcProtocol rpcProtocol = new RpcProtocol();
group = DaRPCClientGroup.createClientGroup(rpcProtocol, 100, 0, 16, 16);
for(var ip : properties.masters)
{
InetSocketAddress address = new InetSocketAddress(ip, 1920);
var clientEp = group.createEndpoint();
clientEp.connect(address, 1000);
if(clientEp.isConnected())
{
eps.add(clientEp);
streams.add(clientEp.createStream());
}
}
}
public int balance(byte[] key)
{
loadBalanceNode++;
loadBalanceNode = loadBalanceNode & Integer.MAX_VALUE;
return loadBalanceNode% streams.size();
}
public DaRPCFuture<Request,Response> get(byte[] key) throws IOException
{
int id = balance(key);
Request request = new Request();
request.setRequestType(RequestType.GET);
request.setKey(key);
Response response = new Response();
var future = streams.get(id).request(request, response, false);
return future;
}
public DaRPCFuture<Request,Response> put(byte[] key,byte[] value) throws IOException
{
Request request = new Request();
request.setRequestType(RequestType.PUT);
request.setKey(key);
request.setValue(value);
var future = streams.get(0).request(request, new Response(), false);
return future;
}
public DaRPCFuture<Request,Response> delete(byte[] key) throws IOException
{
Request request = new Request();
request.setRequestType(RequestType.DELETE);
request.setKey(key);
var future = streams.get(0).request(request, new Response(), false);
return future;
}
}
......@@ -3,12 +3,11 @@ package hpdos.invalidationServer;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import hpdos.packet.*;
import com.github.benmanes.caffeine.cache.Cache;
import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup;
......@@ -17,6 +16,7 @@ import com.ibm.disni.RdmaServerEndpoint;
import org.apache.log4j.Logger;
import hpdos.handlers.NetworkHandler;
import hpdos.protocol.ByteArray;
import hpdos.protocol.InvalidationRequest;
import hpdos.protocol.InvalidationResponse;
import hpdos.services.InvalidationService;
......@@ -24,53 +24,39 @@ import hpdos.services.InvalidationService;
public class InvalidationServer
{
final static Logger logger = Logger.getLogger(InvalidationServer.class);
Cache<String, String> cache;
RdmaServerEndpoint<DaRPCServerEndpoint<InvalidationRequest, InvalidationResponse>> serverEp;
private long[] clusterAffinities;
public InvalidationServer(Cache<String, String> cache2,String invalidationServerIP, int invalidationServerPort, NetworkHandler networkHandler)throws Exception
{
this.cache = cache2;
long[] clusterAffinities = new long[1];
clusterAffinities[0] = 1L << 1 ;
public InvalidationServer(String invalidationServerIP, int invalidationServerPort,Cache<ByteArray,byte[]> cache)throws Exception
{
long[] clusterAffinities;
clusterAffinities = new long[1];
clusterAffinities[0] = 1L << 1 ;
// clusterAffinities = new long[4];
// for (int i = 0; i < 4; i++) {
// clusterAffinities[i] = 1L << i ;
// }
DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null;
InvalidationService service = new InvalidationService(networkHandler);
group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, false, 16, 16, 16, 4);
// for (int i = 0; i < 4; i++)
// {
// long cpu = 1L << i;
// clusterAffinities[i] = cpu;
// System.out.println(cpu);
// };
// DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null;
// InvalidationService service = new InvalidationService(networkHandler);
// group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 0, false, 16, 16, 32, 4);
// group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, 1000, 0, false, 16, 16, 16, 4);
serverEp = group.createServerEndpoint();
InetSocketAddress address = new InetSocketAddress(invalidationServerIP, invalidationServerPort);
serverEp.bind(address, 2);
DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null;
InvalidationService service = new InvalidationService(cache);
group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, false, 16, 16, 16, 4);
serverEp = group.createServerEndpoint();
InetSocketAddress address = new InetSocketAddress(invalidationServerIP, invalidationServerPort);
serverEp.bind(address, 2);
}
public void acceptInvalidationConnections(int numOfConnections) throws Exception
{
Runnable runnable = ()->
{
try{
try
{
int conns = numOfConnections;
logger.debug("Waiting for server to send connection request for invalidation");
while (conns>0) {
serverEp.accept();
conns -- ;
}
logger.debug("Got Connected for Invalidation");
}catch(Exception e){
}
catch(Exception e){
e.printStackTrace();
}
};
......@@ -78,10 +64,9 @@ public class InvalidationServer
t.setName("Server Connection");
t.start();
logger.debug("started Invalidation server");
}
public void sendInvalidationRegistrationRequest(String masterIpAddress, String hostIpAddress) throws IOException, ClassNotFoundException{
public void sendInvalidationRegistrationRequest(String masterIpAddress, String hostIpAddress,NetworkHandler networkHandler) throws IOException, ClassNotFoundException{
// logger.info("Sending invalidation registration request to ." + masterIpAddress);
Socket socket = new Socket(masterIpAddress, 9875);
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
......@@ -103,18 +88,16 @@ public class InvalidationServer
temp.add(ip);
index++;
}
NetworkHandler.follower_ips.put(master, temp);
networkHandler.follower_ips.put(master, temp);
// for (String string : NetworkHandler.follower_ips.keySet()) {
// logger.info("For master: " + string + " the followers are: ");
// for (String string2 : NetworkHandler.follower_ips.get(string)) {
// logger.info(string2);
// }
// logger.info("For master: " + string + " the followers are: ");
// for (String string2 : NetworkHandler.follower_ips.get(string)) {
// logger.info(string2);
// }
// }
// for (Follower follower : followers) {
// logger.info(follower.getIpAdress());
// }
socket.close();
}
......
package hpdos.protocol;
import org.apache.log4j.Logger;
public class BenchmarkingProperties {
public Double reads;
public Double writes;
public Double deletes;
public int clients;
public long requestsPerClient;
public Double requestsPerSecond;
public int keySize;
public int valueSize;
// Result statistics
public int readCount;
public int writeCount;
public int deleteCount;
public long readTime;
public long writeTime;
public long deleteTime;
public long timePerRequest;
final static Logger logger = Logger.getLogger(BenchmarkingProperties.class);
public void printStatistics(){
logger.debug("Total writes: " + writeCount);
logger.debug("Total reads: " + readCount);
logger.debug("Total deletes: " + deleteCount);
logger.debug("Total write time: " + writeTime);
logger.debug("Total read time: " + readTime);
logger.debug("Total delete time: " + deleteTime);
logger.debug("Average write time in microseconds: " + (writeTime / writeCount));
logger.debug("Average read time in microseconds: " + (readTime / readCount));
logger.debug("Average delete time in microseconds: " + (deleteTime / deleteCount));
// logger.debug("Average Time/Request : " + timePerRequest/(writeCount + readCount + deleteCount));
// long avgTimePerReq = timePerRequest/(writeCount + readCount + deleteCount);
// logger.debug("Actual Requests/second : " + (1000000/avgTimePerReq));
}
public void collectExperimentStatistics(long time, int type){
// logger.debug("Time: " + time);
if (type == 100) {
writeCount ++;
writeTime = writeTime + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
else if (type == 101) {
readCount ++;
readTime = readTime + time;
// actualReqPerSecond = actualReqPerSecond + time;
}
else if(type == 102){
deleteCount ++;
deleteTime = deleteTime + time;
// actualReqPerSecond = actualReqPerSecond + time;
}
}
}
\ No newline at end of file
package hpdos.protocol;
public class ByteArray {
public byte[] key;
public ByteArray(byte[] key)
{
this.key = key;
}
@Override
public boolean equals(Object other)
{
if( other == null || !(other instanceof ByteArray))
return false;
if(this.key == other)
return true;
ByteArray b = (ByteArray)other;
int length = b.key.length;
if (this.key.length != length)
return false;
for (int i=0; i<length; i++)
if (this.key[i] != b.key[i])
return false;
return true;
}
}
package hpdos.protocol;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Property {
public String invalidationServerIP;
public int cacheSize;
public int numberOfMasters;
public int invalidationServerPort;
public String[] masters;
public Property(String path) throws IOException
{
Properties properties= new Properties();
InputStream inputStream = new FileInputStream(path);
properties.load(inputStream);
invalidationServerIP = properties.getProperty("app.HOST");
cacheSize = Integer.valueOf(properties.getProperty("app.CACHE_SIZE"));
numberOfMasters = Integer.valueOf(properties.getProperty("app.NO_OF_MASTERS"));
invalidationServerPort = Integer.valueOf(properties.getProperty("app.INVALIDATION_PORT"));
masters = new String[numberOfMasters];
for(int i=1;i<=numberOfMasters;i++)
{
masters[i] = properties.getProperty("app.MASTER_HOST"+i);
}
}
}
......@@ -9,28 +9,31 @@ import com.ibm.darpc.DaRPCService;
import org.apache.log4j.Logger;
import hpdos.handlers.NetworkHandler;
import hpdos.protocol.ByteArray;
import hpdos.protocol.InvalidationRequest;
import hpdos.protocol.InvalidationResponse;
import hpdos.protocol.InvalidationRpcProtocol;
public class InvalidationService extends InvalidationRpcProtocol implements DaRPCService<InvalidationRequest, InvalidationResponse>{
// Cache<String, String> cache;
NetworkHandler networkHandler;
public Cache<ByteArray, byte[]> cache;
final static Logger logger = Logger.getLogger(InvalidationService.class);
public InvalidationService(NetworkHandler networkHandler)
public InvalidationService(Cache<ByteArray,byte[]> cache)
{
this.networkHandler = networkHandler;
this.cache = cache;
}
@Override
public void processServerEvent(DaRPCServerEvent<InvalidationRequest, InvalidationResponse> event) throws IOException {
// System.out.println("Got Invalidation Request");
public void processServerEvent(DaRPCServerEvent<InvalidationRequest, InvalidationResponse> event) throws IOException
{
logger.debug("Got Invalidation Request");
InvalidationRequest request = event.getReceiveMessage();
InvalidationResponse response = event.getSendMessage();
try
{
byte[] key = request.getKey();
networkHandler.cache.invalidate(new String(key));
cache.invalidate(new ByteArray(key));
response.setAck(1);
}
catch(Exception e)
......@@ -48,6 +51,7 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
logger.debug(serverEp.getDstAddr());
}catch(Exception e){}
}
public void close(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp)
{
logger.debug("Closing Connection for invalidation");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment