Commit 96f46ba8 authored by Paras Garg's avatar Paras Garg

Fix

parent b6c7e9b4
app.HOST=192.168.200.20
app.NO_OF_MASTERS=2
app.MASTER_HOST1=192.168.200.20
app.MASTER_HOST2=192.168.200.40
app.INVALIDATION_PORT=1922
app.CACHE_SIZE=1
import java.lang.reflect.InaccessibleObjectException; import java.lang.reflect.InaccessibleObjectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
...@@ -9,6 +13,8 @@ import com.ibm.darpc.DaRPCClientEndpoint; ...@@ -9,6 +13,8 @@ import com.ibm.darpc.DaRPCClientEndpoint;
import com.ibm.darpc.DaRPCClientGroup; import com.ibm.darpc.DaRPCClientGroup;
import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.DaRPCStream;
import org.apache.log4j.helpers.SyslogQuietWriter;
import hpdos.cache.SalCache; import hpdos.cache.SalCache;
import hpdos.handlers.NetworkHandler; import hpdos.handlers.NetworkHandler;
import hpdos.invalidationServer.InvalidationServer; import hpdos.invalidationServer.InvalidationServer;
...@@ -22,39 +28,71 @@ public class MetadataClient{ ...@@ -22,39 +28,71 @@ public class MetadataClient{
private DaRPCStream<Request, Response> stream; private DaRPCStream<Request, Response> stream;
private DaRPCClientEndpoint<Request, Response> clientEp; private DaRPCClientEndpoint<Request, Response> clientEp;
private InvalidationServer invalidationServer; private InvalidationServer invalidationServer;
public MetadataClient() throws Exception // SalCache cache;
public Cache<String, String> cache;
NetworkHandler networkHandler;
public MetadataClient(Properties properties) throws Exception
{ {
Cache<byte[],byte[]> cache = Caffeine.newBuilder() String invalidationServerIP = String.valueOf((String)properties.get("app.HOST"));
List<String> metadataMasters = new ArrayList<>();
// String metadataMasterIp = String.valueOf((String)properties.get("app.MASTER_HOST1"));
int cacheSize = Integer.valueOf((String)properties.get("app.CACHE_SIZE"));
int numberOfMasters = Integer.valueOf((String)properties.get("app.NO_OF_MASTERS"));
this.networkHandler = new NetworkHandler(cacheSize, numberOfMasters);
this.invalidationServer = new InvalidationServer(cache,invalidationServerIP, this.networkHandler);
this.invalidationServer.acceptInvalidationConnections(numberOfMasters);
for(int i=1;i<=numberOfMasters;i++){
metadataMasters.add(String.valueOf((String)properties.get("app.MASTER_HOST"+i)));
}
for (String metadataMasterIp : metadataMasters) {
this.invalidationServer.sendInvalidationRegistrationRequest(metadataMasterIp, invalidationServerIP);
RpcProtocol rpcProtocol = new RpcProtocol();
DaRPCClientGroup<Request, Response> group = DaRPCClientGroup.createClientGroup(rpcProtocol, 100, 0, 16, 16);
InetSocketAddress address = new InetSocketAddress(metadataMasterIp, 1920);
clientEp = group.createEndpoint();
clientEp.connect(address, 1000);
stream = clientEp.createStream();
NetworkHandler.streams.put(metadataMasterIp, stream);
NetworkHandler.ips.add(metadataMasterIp);
System.out.println("Connected");
}
this.cache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.SECONDS) .expireAfterWrite(30, TimeUnit.SECONDS)
.maximumSize(10_000) .maximumSize(cacheSize)
.build(); .build();
invalidationServer = new InvalidationServer(cache);
invalidationServer.acceptSingleConnection();
System.out.println("started Invalidation server");
RpcProtocol rpcProtocol = new RpcProtocol();
DaRPCClientGroup<Request, Response> group = DaRPCClientGroup.createClientGroup(rpcProtocol, 100, 0, 16, 16);
InetSocketAddress address = new InetSocketAddress("192.168.200.20", 1920);
clientEp = group.createEndpoint();
clientEp.connect(address, 1000);
stream = clientEp.createStream();
} }
public byte[] get(byte[] key) throws Exception public byte[] get(byte[] key) throws Exception
{ {
System.out.println("Getting From local cache");
// var value = this.cache.getIfPresent(key);
String value = this.cache.getIfPresent(new String(key));
if(value != null)
return value.getBytes(StandardCharsets.UTF_8);
System.out.println("Not found in cache Getting from server");
Request request= new Request(); Request request= new Request();
request.setRequestType(RequestType.GET); request.setRequestType(RequestType.GET);
request.setKey(key); request.setKey(key);
Response response = new Response(); Response response = new Response();
stream.request(request, response, false).get(); stream.request(request, response, false).get();
if(response.getAck() == AckType.SUCCESS_WITH_VALUE) if(response.getAck() == AckType.SUCCESS_WITH_VALUE){
System.out.println("Adding to cache");
this.cache.put(new String(key), new String(response.getValue()));
return response.getValue(); return response.getValue();
}
return null; return null;
} }
public int put(byte[] key,byte[] value) throws Exception public int put(byte[] key,byte[] value) throws Exception
{ {
System.out.println("Putting KV pair on server ....");
Request request= new Request(); Request request= new Request();
request.setRequestType(RequestType.PUT); request.setRequestType(RequestType.PUT);
request.setKey(key); request.setKey(key);
...@@ -62,11 +100,17 @@ public class MetadataClient{ ...@@ -62,11 +100,17 @@ public class MetadataClient{
Response response = new Response(); Response response = new Response();
stream.request(request, response, false).get(); stream.request(request, response, false).get();
if(response.getAck() == AckType.SUCCESS){
this.cache.put(new String(key), new String(value));
System.out.println("Updating cache..");
}
return response.getAck(); return response.getAck();
} }
public int delete(byte[] key) throws Exception public int delete(byte[] key) throws Exception
{ {
System.out.println("Invalidating Local Cache and sending delete request to server");
// this.cache.invalidate(key);
Request request= new Request(); Request request= new Request();
request.setRequestType(RequestType.DELETE); request.setRequestType(RequestType.DELETE);
request.setKey(key); request.setKey(key);
......
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.NetworkInterface;
import java.util.Enumeration;
import java.util.Properties;
import java.util.Scanner; import java.util.Scanner;
import hpdos.protocol.AckType; import hpdos.protocol.AckType;
public class StandaloneRDMAClient { public class StandaloneRDMAClient {
//Donot delete this line
//Used to find which core process is using
//while true; do echo -ne "`ps -o pid,psr,comm -p <pid>`"; done
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
MetadataClient client = new MetadataClient();
Scanner scanner = new Scanner(System.in); Scanner scanner = new Scanner(System.in);
System.out.println("Enter Invalidation Server IP to bind");
Properties properties= new Properties();
InputStream inputStream = new FileInputStream(args[0]);
properties.load(inputStream);
// String address = scanner.nextLine();
MetadataClient client = new MetadataClient(properties);
int option; int option;
while (true) { while (true) {
System.out.println("Enter 1. for read and 2 for write 3 for delete: "); System.out.println("Enter 1. for read and 2 for write 3 for delete: ");
...@@ -21,7 +36,9 @@ public class StandaloneRDMAClient { ...@@ -21,7 +36,9 @@ public class StandaloneRDMAClient {
key[i] = (byte)inputkey.charAt(i); key[i] = (byte)inputkey.charAt(i);
} }
byte[] value = client.get(key); byte[] value = client.networkHandler.get(key);
System.out.println("Got Response ");
if(value != null) if(value != null)
System.out.println(new String(value)); System.out.println(new String(value));
} }
...@@ -42,8 +59,8 @@ public class StandaloneRDMAClient { ...@@ -42,8 +59,8 @@ public class StandaloneRDMAClient {
{ {
value[i] = (byte)inputvalue.charAt(i); value[i] = (byte)inputvalue.charAt(i);
} }
int ack = client.put(key, value); int ack = client.networkHandler.put(key, value);
System.out.println("Response Ack got " +ack); System.out.println("Response got Ack " +ack);
if (ack == AckType.SUCCESS) { if (ack == AckType.SUCCESS) {
System.out.println("Success"); System.out.println("Success");
} }
...@@ -57,8 +74,8 @@ public class StandaloneRDMAClient { ...@@ -57,8 +74,8 @@ public class StandaloneRDMAClient {
{ {
key[i] = (byte)inputkey.charAt(i); key[i] = (byte)inputkey.charAt(i);
} }
int ack = client.delete(key); int ack = client.networkHandler.delete(key);
System.out.println("Response Ack got " +ack); System.out.println("Response got Ack " +ack);
if (ack == AckType.SUCCESS) { if (ack == AckType.SUCCESS) {
System.out.println("Success"); System.out.println("Success");
} }
......
...@@ -6,12 +6,8 @@ import com.github.benmanes.caffeine.cache.Cache; ...@@ -6,12 +6,8 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
public class SalCache { public class SalCache {
public static Cache<byte[], byte[]> cache; public static Cache<String, String> cache =Caffeine.newBuilder()
public SalCache() .expireAfterWrite(30, TimeUnit.SECONDS)
{ .maximumSize(10)
SalCache.cache = Caffeine.newBuilder() .build();
.expireAfterWrite(30, TimeUnit.SECONDS)
.maximumSize(10_000)
.build();
}
} }
package hpdos.invalidationServer; package hpdos.invalidationServer;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.ibm.darpc.DaRPCServerEndpoint; import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup; import com.ibm.darpc.DaRPCServerGroup;
import com.ibm.disni.RdmaServerEndpoint; import com.ibm.disni.RdmaServerEndpoint;
import hpdos.handlers.NetworkHandler;
import hpdos.protocol.InvalidationRequest; import hpdos.protocol.InvalidationRequest;
import hpdos.protocol.InvalidationResponse; import hpdos.protocol.InvalidationResponse;
import hpdos.services.InvalidationService; import hpdos.services.InvalidationService;
public class InvalidationServer public class InvalidationServer
{ {
Cache<byte[], byte[]> cache; Cache<String, String> cache;
RdmaServerEndpoint<DaRPCServerEndpoint<InvalidationRequest, InvalidationResponse>> serverEp; RdmaServerEndpoint<DaRPCServerEndpoint<InvalidationRequest, InvalidationResponse>> serverEp;
public InvalidationServer(Cache<byte[], byte[]> cache)throws Exception public InvalidationServer(Cache<String, String> cache2,String invalidationServerIP, NetworkHandler networkHandler)throws Exception
{ {
this.cache = cache; this.cache = cache2;
long[] clusterAffinities = new long[1]; long[] clusterAffinities = new long[1];
clusterAffinities[0] = 1<<1; clusterAffinities[0] = 1<<1;
DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null; DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null;
InvalidationService service = new InvalidationService(this.cache); InvalidationService service = new InvalidationService(networkHandler);
group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, true, 2, 2, 2, 1); group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, true, 2, 2, 2, 1);
serverEp = group.createServerEndpoint(); serverEp = group.createServerEndpoint();
InetSocketAddress address = new InetSocketAddress("192.168.200.20", 1921); InetSocketAddress address = new InetSocketAddress(invalidationServerIP, 1922);
serverEp.bind(address, 100); serverEp.bind(address, 2);
} }
public void acceptSingleConnection() throws Exception public void acceptInvalidationConnections(int numOfConnections) throws Exception
{ {
System.out.println("Waiting for server to send connection request for invalidation");
Runnable runnable = ()-> Runnable runnable = ()->
{ {
try{ try{
serverEp.accept(); int conns = numOfConnections;
System.out.println("Waiting for server to send connection request for invalidation");
while (conns>0) {
serverEp.accept();
conns -- ;
}
System.out.println("Got Connected for Invalidation");
}catch(Exception e){ }catch(Exception e){
e.printStackTrace(); e.printStackTrace();
} }
}; };
new Thread(runnable).start(); var t= new Thread(runnable);
t.setName("Server Connection");
System.out.println("Got Connected for Invalidation"); t.start();
System.out.println("started Invalidation server");
}
public void sendInvalidationRegistrationRequest(String masterIpAddress, String hostIpAddress) throws IOException{
System.out.println("Sending invalidation registration request.");
Socket socket = new Socket(masterIpAddress, 9875);
ObjectOutputStream oos = null;
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(hostIpAddress);
socket.close();
} }
} }
...@@ -6,5 +6,7 @@ public interface AckType ...@@ -6,5 +6,7 @@ public interface AckType
public static int NOTFOUND = 1; public static int NOTFOUND = 1;
public static int NOTALLOWED = 2; public static int NOTALLOWED = 2;
public static int DBFAILED = 3; public static int DBFAILED = 3;
public static int SUCCESS_WITH_VALUE = 4; public static int FAILED = 4;
public static int SUCCESS_WITH_VALUE = 5;
public static int INVALID_REQUEST = 6;
} }
...@@ -16,15 +16,15 @@ public class Request implements DaRPCMessage ...@@ -16,15 +16,15 @@ public class Request implements DaRPCMessage
@Override @Override
public int write(ByteBuffer buffer) throws IOException public int write(ByteBuffer buffer) throws IOException
{ {
System.out.println("Request Write Method"); // System.out.println("Request Write Method");
buffer.putInt(requestType); buffer.putInt(requestType);
buffer.put(key); buffer.put(key);
System.out.println("length : "+key.length); // System.out.println("length : "+key.length);
//if operation type is get and delete then value is not required //if operation type is get and delete then value is not required
if(requestType == RequestType.PUT) if(requestType == RequestType.PUT)
{ {
buffer.put(value); buffer.put(value);
System.out.println(" "+value.length); // System.out.println(" "+value.length);
//size of key+value+operationType //size of key+value+operationType
return 4+key.length+value.length; return 4+key.length+value.length;
} }
...@@ -35,7 +35,7 @@ public class Request implements DaRPCMessage ...@@ -35,7 +35,7 @@ public class Request implements DaRPCMessage
@Override @Override
public void update(ByteBuffer buffer) throws IOException public void update(ByteBuffer buffer) throws IOException
{ {
System.out.println("Request update method"+buffer.capacity()); // System.out.println("Request update method"+buffer.capacity());
requestType = buffer.getInt(); requestType = buffer.getInt();
if(key == null || key.length != 128) if(key == null || key.length != 128)
this.key = new byte[128]; this.key = new byte[128];
......
...@@ -14,11 +14,11 @@ public class Response implements DaRPCMessage{ ...@@ -14,11 +14,11 @@ public class Response implements DaRPCMessage{
@Override @Override
public int write(ByteBuffer buffer) throws IOException public int write(ByteBuffer buffer) throws IOException
{ {
System.out.println("Response write Method"); // System.out.println("Response write Method");
buffer.putInt(ack); buffer.putInt(ack);
if(ack == AckType.SUCCESS_WITH_VALUE) if(ack == AckType.SUCCESS_WITH_VALUE)
{ {
System.out.println("length "+value.length); // System.out.println("length "+value.length);
buffer.put(value); buffer.put(value);
return 4+value.length; return 4+value.length;
} }
...@@ -28,7 +28,7 @@ public class Response implements DaRPCMessage{ ...@@ -28,7 +28,7 @@ public class Response implements DaRPCMessage{
@Override @Override
public void update(ByteBuffer buffer) throws IOException public void update(ByteBuffer buffer) throws IOException
{ {
System.out.println("Response update method "+buffer.limit()); //System.out.println("Response update method "+buffer.limit());
ack = buffer.getInt(); ack = buffer.getInt();
if(ack == AckType.SUCCESS_WITH_VALUE) if(ack == AckType.SUCCESS_WITH_VALUE)
{ {
......
...@@ -7,15 +7,17 @@ import com.ibm.darpc.DaRPCServerEndpoint; ...@@ -7,15 +7,17 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerEvent; import com.ibm.darpc.DaRPCServerEvent;
import com.ibm.darpc.DaRPCService; import com.ibm.darpc.DaRPCService;
import hpdos.handlers.NetworkHandler;
import hpdos.protocol.InvalidationRequest; import hpdos.protocol.InvalidationRequest;
import hpdos.protocol.InvalidationResponse; import hpdos.protocol.InvalidationResponse;
import hpdos.protocol.InvalidationRpcProtocol; import hpdos.protocol.InvalidationRpcProtocol;
public class InvalidationService extends InvalidationRpcProtocol implements DaRPCService<InvalidationRequest, InvalidationResponse>{ public class InvalidationService extends InvalidationRpcProtocol implements DaRPCService<InvalidationRequest, InvalidationResponse>{
Cache<byte[], byte[]> cache; // Cache<String, String> cache;
public InvalidationService(Cache<byte[], byte[]> cache) NetworkHandler networkHandler;
public InvalidationService(NetworkHandler networkHandler)
{ {
this.cache = cache; this.networkHandler = networkHandler;
} }
@Override @Override
public void processServerEvent(DaRPCServerEvent<InvalidationRequest, InvalidationResponse> event) throws IOException { public void processServerEvent(DaRPCServerEvent<InvalidationRequest, InvalidationResponse> event) throws IOException {
...@@ -25,7 +27,7 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP ...@@ -25,7 +27,7 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
try try
{ {
byte[] key = request.getKey(); byte[] key = request.getKey();
cache.invalidate(key); networkHandler.cache.invalidate(new String(key));
response.setAck(1); response.setAck(1);
} }
catch(Exception e) catch(Exception e)
...@@ -38,10 +40,16 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP ...@@ -38,10 +40,16 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
public void open(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp) public void open(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp)
{ {
System.out.println("Recieved New Connection for invalidation");
try{
System.out.println(serverEp.getDstAddr());
}catch(Exception e){}
} }
public void close(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp) public void close(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp)
{ {
System.out.println("Closing Connection for invalidation");
try{
System.out.println(serverEp.getDstAddr());
}catch(Exception e){}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment