Commit 63b4305d authored by Paras Garg's avatar Paras Garg

Added config

parent 731e788d
app.HOST=192.168.200.20 app.HOST=192.168.200.20
app.NO_OF_MASTERS=1 app.INVALIDATION_PORT=1921
app.NO_OF_MASTERS=2
app.MASTER_HOST1=192.168.200.20 app.MASTER_HOST1=192.168.200.20
app.MASTER_HOST2=192.168.200.40 app.MASTER_HOST2=192.168.200.40
app.INVALIDATION_PORT=1922 app.CACHE_SIZE=100
app.CACHE_SIZE=1
...@@ -3,28 +3,27 @@ import java.util.concurrent.TimeUnit; ...@@ -3,28 +3,27 @@ import java.util.concurrent.TimeUnit;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import hpdos.handlers.NetworkHandlerM; import hpdos.handlers.NetworkHandlerM;
import hpdos.invalidationServer.InvalidationServer;
import hpdos.protocol.AckType; import hpdos.protocol.AckType;
import hpdos.protocol.ByteArray; import hpdos.protocol.ByteArray;
import hpdos.protocol.Property; import hpdos.protocol.Property;
import hpdos.services.InvalidationServer;
public class MetadataClient public class MetadataClient
{ {
private InvalidationServer invalidationServer; private InvalidationServer invalidationServer;
private NetworkHandlerM networkHandlerM; private NetworkHandlerM networkHandlerM;
public Cache<ByteArray, byte[]> cache; public Cache<ByteArray, byte[]> cache;
final static Logger logger = Logger.getLogger(MetadataClient.class); final static Logger logger = Logger.getLogger(MetadataClient.class);
public MetadataClient(Property properties) throws Exception public MetadataClient(Property properties) throws Exception
{ {
String invalidationServerIP = properties.invalidationServerIP; BasicConfigurator.configure();
logger.info("Invalidation server ip: " + invalidationServerIP); logger.info("Invalidation server ip: " + properties.invalidationServerIP);
this.cache = Caffeine.newBuilder() this.cache = Caffeine.newBuilder()
.expireAfterWrite(300, TimeUnit.SECONDS) .expireAfterWrite(300, TimeUnit.SECONDS)
.maximumSize(properties.cacheSize) .maximumSize(properties.cacheSize)
...@@ -42,12 +41,11 @@ public class MetadataClient ...@@ -42,12 +41,11 @@ public class MetadataClient
if(value != null) if(value != null)
return value; return value;
logger.info("Not found in cache Getting from server"); logger.info("Not found in cache Getting from server");
//var futureResponse = networkHandlerM.get(key).get(1000, TimeUnit.MILLISECONDS);
var futureResponse = networkHandlerM.get(key).get(); var futureResponse = networkHandlerM.get(key).get(1000, TimeUnit.MILLISECONDS);
if(futureResponse.getAck() == AckType.SUCCESS_WITH_VALUE){ //var futureResponse = networkHandlerM.get(key).get();
System.out.println("Adding to cache"); if(futureResponse.getAck() == AckType.SUCCESS_WITH_VALUE)
{
this.cache.put(new ByteArray(key), futureResponse.getValue()); this.cache.put(new ByteArray(key), futureResponse.getValue());
return futureResponse.getValue(); return futureResponse.getValue();
} }
...@@ -56,9 +54,12 @@ public class MetadataClient ...@@ -56,9 +54,12 @@ public class MetadataClient
public int put(byte[] key,byte[] value) throws Exception public int put(byte[] key,byte[] value) throws Exception
{ {
//var response = networkHandlerM.put(key,value).get(1000, TimeUnit.MILLISECONDS); var response = networkHandlerM.put(key,value).get(1000, TimeUnit.MILLISECONDS);
var response = networkHandlerM.put(key,value).get(); //var response = networkHandlerM.put(key,value).get();
//this.cache.invalidate(new ByteArray(key));
logger.info("Putting KV pair on server ...."); logger.info("Putting KV pair on server ....");
if(response == null)
return -1;
if(response.getAck() == AckType.SUCCESS){ if(response.getAck() == AckType.SUCCESS){
this.cache.put(new ByteArray(key), value); this.cache.put(new ByteArray(key), value);
System.out.println("Updating cache.."); System.out.println("Updating cache..");
...@@ -69,9 +70,11 @@ public class MetadataClient ...@@ -69,9 +70,11 @@ public class MetadataClient
public int delete(byte[] key) throws Exception public int delete(byte[] key) throws Exception
{ {
logger.info("Invalidating Local Cache and sending delete request to server"); logger.info("Invalidating Local Cache and sending delete request to server");
//this.cache.invalidate(key); //this.cache.invalidate(new ByteArray(key));
// var response = this.networkHandlerM.delete(key).get(1000,TimeUnit.MILLISECONDS); var response = this.networkHandlerM.delete(key).get(1000,TimeUnit.MILLISECONDS);
var response = this.networkHandlerM.delete(key).get(); //var response = this.networkHandlerM.delete(key).get();
return response.getAck(); if(response != null)
return response.getAck();
return -1;
} }
} }
\ No newline at end of file
...@@ -11,9 +11,6 @@ public class StandaloneRDMAClient { ...@@ -11,9 +11,6 @@ public class StandaloneRDMAClient {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Scanner scanner = new Scanner(System.in); Scanner scanner = new Scanner(System.in);
Property properties = new Property(args[0]); Property properties = new Property(args[0]);
// String address = scanner.nextLine();
System.out.println("prepare");
MetadataClient client = new MetadataClient(properties); MetadataClient client = new MetadataClient(properties);
System.out.println("starting"); System.out.println("starting");
int option; int option;
...@@ -86,8 +83,6 @@ public class StandaloneRDMAClient { ...@@ -86,8 +83,6 @@ public class StandaloneRDMAClient {
long micros = stopwatch.elapsed(TimeUnit.MICROSECONDS); long micros = stopwatch.elapsed(TimeUnit.MICROSECONDS);
System.out.println("Request took: "+micros+" microseconds."); System.out.println("Request took: "+micros+" microseconds.");
*/ */
} }
} }
} }
...@@ -7,7 +7,6 @@ public class ByteArray { ...@@ -7,7 +7,6 @@ public class ByteArray {
public ByteArray(byte[] key) public ByteArray(byte[] key)
{ {
this.key = key; this.key = key;
System.out.println("new butearray");
h = 0; h = 0;
} }
@Override @Override
...@@ -25,9 +24,9 @@ public class ByteArray { ...@@ -25,9 +24,9 @@ public class ByteArray {
for (int i=0; i<length; i++) for (int i=0; i<length; i++)
if (this.key[i] != b.key[i]) if (this.key[i] != b.key[i])
return false; return false;
return true;
return true;
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
......
...@@ -5,9 +5,11 @@ import java.nio.ByteBuffer; ...@@ -5,9 +5,11 @@ import java.nio.ByteBuffer;
import com.ibm.darpc.DaRPCMessage; import com.ibm.darpc.DaRPCMessage;
public class InvalidationRequest implements DaRPCMessage { public class InvalidationRequest implements DaRPCMessage
{
public byte[] key; public byte[] key;
private static int SERIALIZED_SIZE = 128 ; private static int SERIALIZED_SIZE = 128 ;
@Override @Override
public int write(ByteBuffer buffer) throws IOException { public int write(ByteBuffer buffer) throws IOException {
buffer.put(key); buffer.put(key);
......
...@@ -5,7 +5,8 @@ import java.nio.ByteBuffer; ...@@ -5,7 +5,8 @@ import java.nio.ByteBuffer;
import com.ibm.darpc.DaRPCMessage; import com.ibm.darpc.DaRPCMessage;
public class InvalidationResponse implements DaRPCMessage{ public class InvalidationResponse implements DaRPCMessage
{
public static int SERIALIZED_SIZE = 4; public static int SERIALIZED_SIZE = 4;
private int ack; private int ack;
...@@ -27,9 +28,11 @@ public class InvalidationResponse implements DaRPCMessage{ ...@@ -27,9 +28,11 @@ public class InvalidationResponse implements DaRPCMessage{
return SERIALIZED_SIZE; return SERIALIZED_SIZE;
} }
public void setAck(int ack){ public void setAck(int ack){
this.ack = ack; this.ack = ack;
} }
public int getAck() public int getAck()
{ {
return this.ack; return this.ack;
......
...@@ -27,6 +27,5 @@ public class Property { ...@@ -27,6 +27,5 @@ public class Property {
{ {
masters[i-1] = properties.getProperty("app.MASTER_HOST"+i); masters[i-1] = properties.getProperty("app.MASTER_HOST"+i);
} }
} }
} }
...@@ -16,15 +16,12 @@ public class Request implements DaRPCMessage ...@@ -16,15 +16,12 @@ public class Request implements DaRPCMessage
@Override @Override
public int write(ByteBuffer buffer) throws IOException public int write(ByteBuffer buffer) throws IOException
{ {
// System.out.println("Request Write Method");
buffer.putInt(requestType); buffer.putInt(requestType);
buffer.put(key); buffer.put(key);
// System.out.println("length : "+key.length);
//if operation type is get and delete then value is not required //if operation type is get and delete then value is not required
if(requestType == RequestType.PUT) if(requestType == RequestType.PUT)
{ {
buffer.put(value); buffer.put(value);
// System.out.println(" "+value.length);
//size of key+value+operationType //size of key+value+operationType
return 4+key.length+value.length; return 4+key.length+value.length;
} }
...@@ -35,7 +32,6 @@ public class Request implements DaRPCMessage ...@@ -35,7 +32,6 @@ public class Request implements DaRPCMessage
@Override @Override
public void update(ByteBuffer buffer) throws IOException public void update(ByteBuffer buffer) throws IOException
{ {
// System.out.println("Request update method"+buffer.capacity());
requestType = buffer.getInt(); requestType = buffer.getInt();
if(key == null || key.length != 128) if(key == null || key.length != 128)
this.key = new byte[128]; this.key = new byte[128];
......
...@@ -5,6 +5,5 @@ public interface RequestType ...@@ -5,6 +5,5 @@ public interface RequestType
public static int GET =101; public static int GET =101;
public static int PUT =100; public static int PUT =100;
public static int DELETE =102; public static int DELETE =102;
public static int INVALIDATE = 103; public static int INVALIDATE = 103;
}; };
\ No newline at end of file
...@@ -14,11 +14,9 @@ public class Response implements DaRPCMessage{ ...@@ -14,11 +14,9 @@ public class Response implements DaRPCMessage{
@Override @Override
public int write(ByteBuffer buffer) throws IOException public int write(ByteBuffer buffer) throws IOException
{ {
// System.out.println("Response write Method");
buffer.putInt(ack); buffer.putInt(ack);
if(ack == AckType.SUCCESS_WITH_VALUE) if(ack == AckType.SUCCESS_WITH_VALUE)
{ {
// System.out.println("length "+value.length);
buffer.put(value); buffer.put(value);
return 4+value.length; return 4+value.length;
} }
...@@ -28,7 +26,6 @@ public class Response implements DaRPCMessage{ ...@@ -28,7 +26,6 @@ public class Response implements DaRPCMessage{
@Override @Override
public void update(ByteBuffer buffer) throws IOException public void update(ByteBuffer buffer) throws IOException
{ {
//System.out.println("Response update method "+buffer.limit());
ack = buffer.getInt(); ack = buffer.getInt();
if(ack == AckType.SUCCESS_WITH_VALUE) if(ack == AckType.SUCCESS_WITH_VALUE)
{ {
......
package hpdos.invalidationServer; package hpdos.services;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
...@@ -19,7 +19,6 @@ import hpdos.handlers.NetworkHandler; ...@@ -19,7 +19,6 @@ import hpdos.handlers.NetworkHandler;
import hpdos.protocol.ByteArray; import hpdos.protocol.ByteArray;
import hpdos.protocol.InvalidationRequest; import hpdos.protocol.InvalidationRequest;
import hpdos.protocol.InvalidationResponse; import hpdos.protocol.InvalidationResponse;
import hpdos.services.InvalidationService;
public class InvalidationServer public class InvalidationServer
{ {
...@@ -29,12 +28,7 @@ public class InvalidationServer ...@@ -29,12 +28,7 @@ public class InvalidationServer
{ {
long[] clusterAffinities; long[] clusterAffinities;
clusterAffinities = new long[1]; clusterAffinities = new long[1];
clusterAffinities[0] = 1L << 1 ; clusterAffinities[0] = 1L << 1 ;
// clusterAffinities = new long[4];
// for (int i = 0; i < 4; i++) {
// clusterAffinities[i] = 1L << i ;
// }
DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null; DaRPCServerGroup<InvalidationRequest, InvalidationResponse> group = null;
InvalidationService service = new InvalidationService(cache); InvalidationService service = new InvalidationService(cache);
group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, false, 16, 16, 16, 4); group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, false, 16, 16, 16, 4);
......
...@@ -18,7 +18,7 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP ...@@ -18,7 +18,7 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
public Cache<ByteArray, byte[]> cache; public Cache<ByteArray, byte[]> cache;
final static Logger logger = Logger.getLogger(InvalidationService.class); final static Logger logger = Logger.getLogger("InvalidationService");
public InvalidationService(Cache<ByteArray,byte[]> cache) public InvalidationService(Cache<ByteArray,byte[]> cache)
{ {
this.cache = cache; this.cache = cache;
...@@ -47,16 +47,24 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP ...@@ -47,16 +47,24 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
public void open(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp) public void open(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp)
{ {
logger.info("Recieved New Connection for invalidation"); logger.info("Recieved New Connection for invalidation");
try{ try
{
logger.info(serverEp.getDstAddr()); logger.info(serverEp.getDstAddr());
}catch(Exception e){} }catch(Exception ex)
{
logger.error(ex.getMessage());
}
} }
public void close(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp) public void close(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp)
{ {
logger.info("Closing Connection for invalidation"); logger.info("Closing Connection for invalidation");
try{ try
{
logger.info(serverEp.getDstAddr()); logger.info(serverEp.getDstAddr());
}catch(Exception e){} }catch(Exception ex)
{
logger.error(ex.getMessage());
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment