Commit 54f188c0 authored by Paras Garg's avatar Paras Garg

Completed version 1.0.0 of HPDOS. Benchmarking remaining.

parent 96f46ba8
app.HOST=192.168.200.20
app.NO_OF_MASTERS=2
app.MASTER_HOST1=192.168.200.20
app.MASTER_HOST2=192.168.200.40
app.NO_OF_MASTERS=1
app.MASTER_HOST1=192.168.200.21
#app.MASTER_HOST2=192.168.200.41
app.INVALIDATION_PORT=1922
app.CACHE_SIZE=1
......@@ -39,8 +39,9 @@ public class MetadataClient{
// String metadataMasterIp = String.valueOf((String)properties.get("app.MASTER_HOST1"));
int cacheSize = Integer.valueOf((String)properties.get("app.CACHE_SIZE"));
int numberOfMasters = Integer.valueOf((String)properties.get("app.NO_OF_MASTERS"));
int invalidationServerPort = Integer.valueOf((String)properties.get("app.INVALIDATION_PORT"));
this.networkHandler = new NetworkHandler(cacheSize, numberOfMasters);
this.invalidationServer = new InvalidationServer(cache,invalidationServerIP, this.networkHandler);
this.invalidationServer = new InvalidationServer(cache,invalidationServerIP, invalidationServerPort, this.networkHandler);
this.invalidationServer.acceptInvalidationConnections(numberOfMasters);
for(int i=1;i<=numberOfMasters;i++){
metadataMasters.add(String.valueOf((String)properties.get("app.MASTER_HOST"+i)));
......@@ -57,8 +58,10 @@ public class MetadataClient{
NetworkHandler.ips.add(metadataMasterIp);
System.out.println("Connected");
}
this.networkHandler.connectToFollowers();
this.networkHandler.initializeLoadBalancer();
this.cache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.SECONDS)
.expireAfterWrite(300, TimeUnit.SECONDS)
.maximumSize(cacheSize)
.build();
......
......@@ -2,6 +2,7 @@ package hpdos.handlers;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
......@@ -34,7 +35,11 @@ import hpdos.protocol.RpcProtocol;
public class NetworkHandler {
public static LinkedHashMap<String, DaRPCStream<Request, Response>> streams = new LinkedHashMap<String, DaRPCStream<Request, Response>>();
public static LinkedList<String> ips = new LinkedList<>();
public static List<String> ips = new ArrayList<>();
public static HashMap<String, List<String>> follower_ips = new HashMap<>();
public static HashMap<String, List<DaRPCStream<Request, Response>>> followerStreams = new HashMap<>();
public HashMap<String, Integer> loadBalancer = new HashMap<>();
public int numberOfMasters;
public Cache<String, String> cache;
......@@ -90,10 +95,28 @@ public class NetworkHandler {
// Partitioning
int masterPartition = partition(key);
String ip = NetworkHandler.ips.get(masterPartition);
DaRPCStream<Request, Response> stream = null;
// Load balancing - TODO
int val = loadBalancer.get(ip);
if (val == 0) {
stream = NetworkHandler.streams.get(ip);
val = val + 1 ;
loadBalancer.put(ip, val);
}
else
{
DaRPCStream<Request, Response> stream = NetworkHandler.streams.get(ip);
List<DaRPCStream<Request, Response>> temp = NetworkHandler.followerStreams.get(ip);
stream = temp.get(val - 1);
val = val + 1;
if (val - 1 == temp.size()) {
val = 0;
}
System.out.println("Putting val "+val);
loadBalancer.put(ip, val);
}
stream.request(request, response, false).get();
if(response.getAck() == AckType.SUCCESS_WITH_VALUE){
......@@ -119,7 +142,6 @@ public class NetworkHandler {
int masterPartition = partition(key);
String ip = NetworkHandler.ips.get(masterPartition);
// Load balancing - TODO
DaRPCStream<Request, Response> stream = NetworkHandler.streams.get(ip);
stream.request(request, response, false).get();
......@@ -200,9 +222,38 @@ public class NetworkHandler {
public int partition(byte[] key)
{
String str_key = new String(key).trim();
int hash = 7;
hash = 31 * hash + (key == null ? 0 : key.hashCode());
hash = 31 * hash + (str_key == null ? 0 : str_key.hashCode());
// System.out.println("key: " + str_key);
// System.out.println("key size" + str_key.length());
// System.out.println("number of masters: " + numberOfMasters);
// System.out.println("Calculated hash: "+ hash % numberOfMasters );
return hash % numberOfMasters;
}
public void connectToFollowers() throws Exception{
System.out.println("Beginning to connect to followers");
for (String key : NetworkHandler.follower_ips.keySet()) {
List<String> temp = follower_ips.get(key);
List<DaRPCStream<Request, Response>> tempStreams = new ArrayList<>();
for (String ip : temp) {
RpcProtocol rpcProtocol = new RpcProtocol();
DaRPCClientGroup<Request, Response> group = DaRPCClientGroup.createClientGroup(rpcProtocol, 100, 0, 16, 16);
InetSocketAddress address = new InetSocketAddress(ip, 1921);
DaRPCClientEndpoint<Request, Response> clientEp = group.createEndpoint();
clientEp.connect(address, 1000);
DaRPCStream<Request, Response> followerStream = clientEp.createStream();
tempStreams.add(followerStream);
}
NetworkHandler.followerStreams.put(key, tempStreams);
}
}
public void initializeLoadBalancer() {
for (String ip : NetworkHandler.followerStreams.keySet()) {
this.loadBalancer.put(ip, 0);
}
}
}
package hpdos.invalidationServer;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import hpdos.packet.*;
import com.github.benmanes.caffeine.cache.Cache;
import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup;
......@@ -20,7 +23,7 @@ public class InvalidationServer
{
Cache<String, String> cache;
RdmaServerEndpoint<DaRPCServerEndpoint<InvalidationRequest, InvalidationResponse>> serverEp;
public InvalidationServer(Cache<String, String> cache2,String invalidationServerIP, NetworkHandler networkHandler)throws Exception
public InvalidationServer(Cache<String, String> cache2,String invalidationServerIP, int invalidationServerPort, NetworkHandler networkHandler)throws Exception
{
this.cache = cache2;
long[] clusterAffinities = new long[1];
......@@ -30,7 +33,7 @@ public class InvalidationServer
group = DaRPCServerGroup.createServerGroup(service, clusterAffinities, -1, 1, true, 2, 2, 2, 1);
serverEp = group.createServerEndpoint();
InetSocketAddress address = new InetSocketAddress(invalidationServerIP, 1922);
InetSocketAddress address = new InetSocketAddress(invalidationServerIP, invalidationServerPort);
serverEp.bind(address, 2);
}
public void acceptInvalidationConnections(int numOfConnections) throws Exception
......@@ -58,12 +61,40 @@ public class InvalidationServer
}
public void sendInvalidationRegistrationRequest(String masterIpAddress, String hostIpAddress) throws IOException{
System.out.println("Sending invalidation registration request.");
public void sendInvalidationRegistrationRequest(String masterIpAddress, String hostIpAddress) throws IOException, ClassNotFoundException{
System.out.println("Sending invalidation registration request to ." + masterIpAddress);
Socket socket = new Socket(masterIpAddress, 9875);
ObjectOutputStream oos = null;
oos = new ObjectOutputStream(socket.getOutputStream());
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(hostIpAddress);
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
// List<Follower> followers = (List<Follower>) ois.readObject();
String ips = (String) ois.readObject();
String[] follower_ips = ips.split(";");
String master = follower_ips[0];
List<String> temp = new ArrayList<>();
int index = 0;
for (String ip : follower_ips) {
if (index == 0) {
index++;
continue;
}
temp.add(ip);
index++;
}
NetworkHandler.follower_ips.put(master, temp);
for (String string : NetworkHandler.follower_ips.keySet()) {
System.out.println("For master: " + string + " the followers are: ");
for (String string2 : NetworkHandler.follower_ips.get(string)) {
System.out.println(string2);
}
}
// for (Follower follower : followers) {
// System.out.println(follower.getIpAdress());
// }
socket.close();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment