Commit d64d2cc0 authored by Paras Garg's avatar Paras Garg

log4j implementation

parent 54f188c0
app.HOST=192.168.200.20
app.NO_OF_MASTERS=1
app.MASTER_HOST1=192.168.200.21
#app.MASTER_HOST2=192.168.200.41
app.MASTER_HOST1=192.168.200.20
#app.MASTER_HOST2=192.168.200.51
app.INVALIDATION_PORT=1922
app.CACHE_SIZE=1
#Setup properties
app.HOST=192.168.200.20
app.NO_OF_MASTERS=1
app.MASTER_HOST1=192.168.200.20
#app.MASTER_HOST2=192.168.200.41
app.INVALIDATION_PORT=1922
app.CACHE_SIZE=1
#Benchmarking properties
reads=0.1
writes=0.8
deletes=0.1
clients=1
requests_per_client=100000
requests_per_second=1000000
key_size=10
value_size=10
......@@ -39,8 +39,9 @@
apply plugin: "application"
apply plugin: "java-library"
mainClassName = "MetadataClient"
mainClassName = "StandaloneRDMAClient"
//mainClassName = "MetadataClient"
mainClassName = "BenchmarkingClient"
//mainClassName= "StandaloneRDMAClient"
version '1.0-SNAPSHOT'
repositories {
......@@ -56,6 +57,8 @@ dependencies {
implementation 'com.google.guava:guava:28.0-jre'
implementation 'com.ibm.darpc:darpc:1.9'
implementation 'com.github.ben-manes.caffeine:caffeine:3.0.4'
compile 'log4j:log4j:1.2.17'
compile 'org.slf4j:slf4j-log4j12:1.7.18'
// Use JUnit test framework
// testImplementation 'junit:junit:4.12'
}
\ No newline at end of file
}
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.log4j.Logger;
public class BenchmarkingClient {
public Properties properties;
public double runExperiment(String threadId, MetadataClient client) throws Exception{
Double sleepTime = 1000.0 * 1000.0 / BenchmarkingProperties.requestsPerSecond;
long sleepTimeMicros = Double.valueOf(sleepTime).longValue();
logger.debug(sleepTimeMicros);
Thread.sleep((long)(Math.random() * 500)); // randomizing start time to avoid spikes in request
Double requestsPerClient = BenchmarkingProperties.requestsPerClient;
double totalBracket = BenchmarkingProperties.reads
+ BenchmarkingProperties.writes
+ BenchmarkingProperties.deletes;
double writeBracket, readBracket, deleteBracket;
writeBracket = BenchmarkingProperties.writes * 1.0 / totalBracket;
readBracket = writeBracket + BenchmarkingProperties.reads / totalBracket;
deleteBracket = readBracket + BenchmarkingProperties.deletes / totalBracket;
// int temp = 5;
while (requestsPerClient > 0) {
double toss = Math.random();
Stopwatch stopwatch = Stopwatch.createUnstarted();
if (toss < writeBracket) {
byte[] key = new byte[BenchmarkingProperties.keySize];
byte[] value = new byte[BenchmarkingProperties.valueSize];
new Random().nextBytes(key);
new Random().nextBytes(value);
stopwatch.start();
client.networkHandler.put(key, value);
stopwatch.stop();
collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 100);
}
else if (toss < readBracket) {
byte[] key = new byte[BenchmarkingProperties.keySize];
new Random().nextBytes(key);
stopwatch.start();
client.networkHandler.get(key);
stopwatch.stop();
collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 101);
}
else if(toss < deleteBracket){
byte[] key = new byte[BenchmarkingProperties.keySize];
new Random().nextBytes(key);
stopwatch.start();
client.networkHandler.delete(key);
stopwatch.stop();
collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 102);
}
if (requestsPerClient % 10000 == 0) {
logger.debug("Ten thousand requests completed " + requestsPerClient);
}
requestsPerClient -- ;
// Stopwatch stopwatch = Stopwatch.createUnstarted();
// stopwatch.start();
//testWait(sleepTimeMicros);
// stopwatch.stop();
// logger.debug("Microseconds waited: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
}
return 0;
}
public void collectExperimentStatistics(long time, int type){
// logger.debug("Time: " + time);
if (type == 100) {
BenchmarkingProperties.writeCount ++;
BenchmarkingProperties.writeTime = BenchmarkingProperties.writeTime + time;
}
else if (type == 101) {
BenchmarkingProperties.readCount ++;
BenchmarkingProperties.readTime = BenchmarkingProperties.readTime + time;
}
else if(type == 102){
BenchmarkingProperties.deleteCount ++;
BenchmarkingProperties.deleteTime = BenchmarkingProperties.deleteTime + time;
}
}
public void printStatistics(){
logger.debug("Total writes: " + BenchmarkingProperties.writeCount);
logger.debug("Total reads: " + BenchmarkingProperties.readCount);
logger.debug("Total deletes: " + BenchmarkingProperties.deleteCount);
logger.debug("Total write time: " + BenchmarkingProperties.writeTime);
logger.debug("Total read time: " + BenchmarkingProperties.readTime);
logger.debug("Total delete time: " + BenchmarkingProperties.deleteTime);
logger.debug("Average write time in microseconds: " + (BenchmarkingProperties.writeTime / BenchmarkingProperties.writeCount));
logger.debug("Average read time in microseconds: " + (BenchmarkingProperties.readTime / BenchmarkingProperties.readCount));
logger.debug("Average delete time in microseconds: " + (BenchmarkingProperties.deleteTime / BenchmarkingProperties.deleteCount));
}
public void testWait(long microseconds){
final long INTERVAL = microseconds * 1000;
long start = System.nanoTime();
long end=0;
do{
end = System.nanoTime();
}while(start + INTERVAL >= end);
// logger.debug(end - start);
}
final static Logger logger = Logger.getLogger(BenchmarkingClient.class);
public static void main(String[] args) throws Exception {
BenchmarkingClient benchmarkingClient = new BenchmarkingClient();
logger.debug("Staring to benchmark...");
benchmarkingClient.properties= new Properties();
InputStream inputStream = new FileInputStream(args[0]);
benchmarkingClient.properties.load(inputStream);
BenchmarkingProperties.reads = Double.valueOf((String)benchmarkingClient.properties.get("reads"));
BenchmarkingProperties.writes = Double.valueOf((String)benchmarkingClient.properties.get("writes"));
BenchmarkingProperties.deletes = Double.valueOf((String)benchmarkingClient.properties.get("deletes"));
BenchmarkingProperties.clients = Integer.valueOf((String)benchmarkingClient.properties.get("clients"));
BenchmarkingProperties.requestsPerClient = Double.valueOf((String)benchmarkingClient.properties.get("requests_per_client"));
BenchmarkingProperties.requestsPerSecond = Double.valueOf((String)benchmarkingClient.properties.get("requests_per_second"));
BenchmarkingProperties.keySize = Integer.valueOf((String)benchmarkingClient.properties.get("key_size"));
BenchmarkingProperties.valueSize = Integer.valueOf((String)benchmarkingClient.properties.get("value_size"));
logger.debug("================= Properties loaded for benchmarking ================= ");
logger.debug("Percentage of reads: " + BenchmarkingProperties.reads * 100);
logger.debug("Percentage of writes: " + BenchmarkingProperties.writes * 100);
logger.debug("Percentage of deletes: " + BenchmarkingProperties.deletes * 100);
logger.debug("Clients to be spawned: " + BenchmarkingProperties.clients);
logger.debug("Requests per client: " + BenchmarkingProperties.requestsPerClient);
logger.debug("Requests per second: " + BenchmarkingProperties.requestsPerSecond);
logger.debug("Key Size: " + BenchmarkingProperties.keySize);
logger.debug("Value Size: " + BenchmarkingProperties.valueSize);
logger.debug("====================================================================== ");
// MetadataClient client = null;
MetadataClient client = new MetadataClient(benchmarkingClient.properties);
ExecutorService experimentExecutors = Executors.newFixedThreadPool(BenchmarkingProperties.clients);
Set<Callable<Double>> callables = new HashSet<>();
logger.debug("INITIALIZING EXPERIMENT...");
for (int i = 1; i <= BenchmarkingProperties.clients; i++) {
int finalI = i;
callables.add(() -> benchmarkingClient.runExperiment(Integer.toString(finalI), client));
}
Thread.sleep(5000);
logger.debug("STARTING EXPERIMENT...");
List<Future<Double>> futures = experimentExecutors.invokeAll(callables);
for (Future<Double> future: futures) {
future.get();
}
logger.debug("DONE...");
benchmarkingClient.printStatistics();
experimentExecutors.shutdown();
}
}
class BenchmarkingProperties {
static Double reads;
static Double writes;
static Double deletes;
static int clients;
static Double requestsPerClient;
static Double requestsPerSecond;
static int keySize;
static int valueSize;
// Result statistics
static int readCount;
static int writeCount;
static int deleteCount;
static long readTime = 0;
static long writeTime = 0 ;
static long deleteTime = 0;
}
......@@ -13,6 +13,7 @@ import com.ibm.darpc.DaRPCClientEndpoint;
import com.ibm.darpc.DaRPCClientGroup;
import com.ibm.darpc.DaRPCStream;
import org.apache.log4j.Logger;
import org.apache.log4j.helpers.SyslogQuietWriter;
import hpdos.cache.SalCache;
......@@ -31,10 +32,11 @@ public class MetadataClient{
// SalCache cache;
public Cache<String, String> cache;
NetworkHandler networkHandler;
final static Logger logger = Logger.getLogger(MetadataClient.class);
public MetadataClient(Properties properties) throws Exception
{
String invalidationServerIP = String.valueOf((String)properties.get("app.HOST"));
logger.debug("Invalidation server ip: " + invalidationServerIP);
List<String> metadataMasters = new ArrayList<>();
// String metadataMasterIp = String.valueOf((String)properties.get("app.MASTER_HOST1"));
int cacheSize = Integer.valueOf((String)properties.get("app.CACHE_SIZE"));
......@@ -56,7 +58,7 @@ public class MetadataClient{
stream = clientEp.createStream();
NetworkHandler.streams.put(metadataMasterIp, stream);
NetworkHandler.ips.add(metadataMasterIp);
System.out.println("Connected");
logger.debug("Connected");
}
this.networkHandler.connectToFollowers();
this.networkHandler.initializeLoadBalancer();
......
......@@ -22,6 +22,7 @@ import com.ibm.darpc.DaRPCStream;
import com.ibm.darpc.DaRPCClientGroup.RpcClientFactory;
import org.apache.commons.math3.util.Pair;
import org.apache.log4j.Logger;
import hpdos.cache.SalCache;
import hpdos.packet.Packet;
......@@ -33,7 +34,7 @@ import hpdos.protocol.RpcProtocol;
public class NetworkHandler {
final static Logger logger = Logger.getLogger(NetworkHandler.class);
public static LinkedHashMap<String, DaRPCStream<Request, Response>> streams = new LinkedHashMap<String, DaRPCStream<Request, Response>>();
public static List<String> ips = new ArrayList<>();
public static HashMap<String, List<String>> follower_ips = new HashMap<>();
......@@ -79,12 +80,12 @@ public class NetworkHandler {
public byte[] get(byte[] key) throws Exception
{
System.out.println("Getting From local cache");
logger.info("Getting From local cache");
// var value = this.cache.getIfPresent(key);
String value = this.cache.getIfPresent(new String(key));
if(value != null)
return value.getBytes(StandardCharsets.UTF_8);
System.out.println("Not found in cache, Getting from server");
logger.info("Not found in cache, Getting from server");
Request request= new Request();
request.setRequestType(RequestType.GET);
request.setKey(key);
......@@ -96,31 +97,37 @@ public class NetworkHandler {
int masterPartition = partition(key);
String ip = NetworkHandler.ips.get(masterPartition);
DaRPCStream<Request, Response> stream = null;
// Load balancing - TODO
// Load balancing
int val = loadBalancer.get(ip);
if (val == 0) {
stream = NetworkHandler.streams.get(ip);
val = val + 1 ;
loadBalancer.put(ip, val);
}
else
{
List<DaRPCStream<Request, Response>> temp = NetworkHandler.followerStreams.get(ip);
stream = temp.get(val - 1);
val = val + 1;
if (val - 1 == temp.size()) {
val = 0;
if (NetworkHandler.followerStreams.size() > 1 ) {
// logger.trace("Follower streams " + NetworkHandler.followerStreams.keySet());
if (val == 0) {
stream = NetworkHandler.streams.get(ip);
val = val + 1 ;
loadBalancer.put(ip, val);
}
else
{
List<DaRPCStream<Request, Response>> temp = NetworkHandler.followerStreams.get(ip);
stream = temp.get(val - 1);
val = val + 1;
if (val - 1 == temp.size()) {
val = 0;
}
logger.info("Putting val "+val);
loadBalancer.put(ip, val);
}
System.out.println("Putting val "+val);
loadBalancer.put(ip, val);
}
else{
stream = NetworkHandler.streams.get(ip);
}
stream.request(request, response, false).get();
if(response.getAck() == AckType.SUCCESS_WITH_VALUE){
System.out.println("Adding to cache");
logger.info("Adding to cache");
this.cache.put(new String(key), new String(response.getValue()));
return response.getValue();
}
......@@ -131,7 +138,7 @@ public class NetworkHandler {
public int put(byte[] key,byte[] value) throws Exception
{
System.out.println("Putting KV pair on server ....");
logger.info("Putting KV pair on server ....");
Request request= new Request();
request.setRequestType(RequestType.PUT);
request.setKey(key);
......@@ -148,15 +155,15 @@ public class NetworkHandler {
if(response.getAck() == AckType.SUCCESS){
this.cache.put(new String(key), new String(value));
System.out.println("Updating cache..");
logger.info("Updating cache..");
}
return response.getAck();
}
public int delete(byte[] key) throws Exception
{
System.out.println("Invalidating Local Cache and sending delete request to server");
// this.cache.invalidate(key);
logger.info("Invalidating Local Cache and sending delete request to server");
this.cache.invalidate(new String(key));
Request request= new Request();
request.setRequestType(RequestType.DELETE);
request.setKey(key);
......@@ -233,7 +240,7 @@ public class NetworkHandler {
}
public void connectToFollowers() throws Exception{
System.out.println("Beginning to connect to followers");
// logger.info("Beginning to connect to followers");
for (String key : NetworkHandler.follower_ips.keySet()) {
List<String> temp = follower_ips.get(key);
List<DaRPCStream<Request, Response>> tempStreams = new ArrayList<>();
......
......@@ -14,6 +14,8 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup;
import com.ibm.disni.RdmaServerEndpoint;
import org.apache.log4j.Logger;
import hpdos.handlers.NetworkHandler;
import hpdos.protocol.InvalidationRequest;
import hpdos.protocol.InvalidationResponse;
......@@ -21,6 +23,7 @@ import hpdos.services.InvalidationService;
public class InvalidationServer
{
final static Logger logger = Logger.getLogger(InvalidationServer.class);
Cache<String, String> cache;
RdmaServerEndpoint<DaRPCServerEndpoint<InvalidationRequest, InvalidationResponse>> serverEp;
public InvalidationServer(Cache<String, String> cache2,String invalidationServerIP, int invalidationServerPort, NetworkHandler networkHandler)throws Exception
......@@ -43,13 +46,13 @@ public class InvalidationServer
try{
int conns = numOfConnections;
System.out.println("Waiting for server to send connection request for invalidation");
logger.debug("Waiting for server to send connection request for invalidation");
while (conns>0) {
serverEp.accept();
conns -- ;
}
System.out.println("Got Connected for Invalidation");
logger.debug("Got Connected for Invalidation");
}catch(Exception e){
e.printStackTrace();
}
......@@ -57,12 +60,12 @@ public class InvalidationServer
var t= new Thread(runnable);
t.setName("Server Connection");
t.start();
System.out.println("started Invalidation server");
logger.debug("started Invalidation server");
}
public void sendInvalidationRegistrationRequest(String masterIpAddress, String hostIpAddress) throws IOException, ClassNotFoundException{
System.out.println("Sending invalidation registration request to ." + masterIpAddress);
// logger.info("Sending invalidation registration request to ." + masterIpAddress);
Socket socket = new Socket(masterIpAddress, 9875);
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(hostIpAddress);
......@@ -84,15 +87,15 @@ public class InvalidationServer
index++;
}
NetworkHandler.follower_ips.put(master, temp);
for (String string : NetworkHandler.follower_ips.keySet()) {
System.out.println("For master: " + string + " the followers are: ");
for (String string2 : NetworkHandler.follower_ips.get(string)) {
System.out.println(string2);
}
}
// for (String string : NetworkHandler.follower_ips.keySet()) {
// logger.info("For master: " + string + " the followers are: ");
// for (String string2 : NetworkHandler.follower_ips.get(string)) {
// logger.info(string2);
// }
// }
// for (Follower follower : followers) {
// System.out.println(follower.getIpAdress());
// logger.info(follower.getIpAdress());
// }
socket.close();
......
......@@ -7,6 +7,8 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerEvent;
import com.ibm.darpc.DaRPCService;
import org.apache.log4j.Logger;
import hpdos.handlers.NetworkHandler;
import hpdos.protocol.InvalidationRequest;
import hpdos.protocol.InvalidationResponse;
......@@ -15,13 +17,14 @@ import hpdos.protocol.InvalidationRpcProtocol;
public class InvalidationService extends InvalidationRpcProtocol implements DaRPCService<InvalidationRequest, InvalidationResponse>{
// Cache<String, String> cache;
NetworkHandler networkHandler;
final static Logger logger = Logger.getLogger(InvalidationService.class);
public InvalidationService(NetworkHandler networkHandler)
{
this.networkHandler = networkHandler;
}
@Override
public void processServerEvent(DaRPCServerEvent<InvalidationRequest, InvalidationResponse> event) throws IOException {
System.out.println("Got Invalidation Request");
// System.out.println("Got Invalidation Request");
InvalidationRequest request = event.getReceiveMessage();
InvalidationResponse response = event.getSendMessage();
try
......@@ -40,16 +43,16 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
public void open(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp)
{
System.out.println("Recieved New Connection for invalidation");
logger.debug("Recieved New Connection for invalidation");
try{
System.out.println(serverEp.getDstAddr());
logger.debug(serverEp.getDstAddr());
}catch(Exception e){}
}
public void close(DaRPCServerEndpoint<InvalidationRequest,InvalidationResponse> serverEp)
{
System.out.println("Closing Connection for invalidation");
logger.debug("Closing Connection for invalidation");
try{
System.out.println(serverEp.getDstAddr());
logger.debug(serverEp.getDstAddr());
}catch(Exception e){}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment