Commit f3dbffa4 authored by Paras Garg's avatar Paras Garg

Completed version 1.0.0 of HPDOS. Need to benchmark.

parent 61493bc7
...@@ -74,7 +74,7 @@ public class MetadataServer implements Runnable{ ...@@ -74,7 +74,7 @@ public class MetadataServer implements Runnable{
storageHandler = new StorageHandler(properties.getProperty("app.db_path")); storageHandler = new StorageHandler(properties.getProperty("app.db_path"));
networkHandler = new NetworkHandler(isMaster); networkHandler = new NetworkHandler(isMaster);
// metadataServerService = new MasterMetadataServerService(this.networkHandler,storageHandler); // metadataServerService = new MasterMetadataServerService(this.networkHandler,storageHandler);
registrationHandler = new RegistrationHandler(isMaster,host,registrationPort, networkHandler); registrationHandler = new RegistrationHandler(isMaster, host, registrationPort, networkHandler);
if(isMaster) if(isMaster)
{ {
......
...@@ -3,12 +3,17 @@ package hpdos_rdma_offloaded.handler; ...@@ -3,12 +3,17 @@ package hpdos_rdma_offloaded.handler;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.ClassNotFoundException; import java.lang.ClassNotFoundException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import hpdos_rdma_offloaded.lib.Follower;
public class RegistrationHandler { public class RegistrationHandler {
public ServerSocket server; public ServerSocket server;
public int port = 9876; public int port = 9876;
...@@ -41,7 +46,7 @@ public class RegistrationHandler { ...@@ -41,7 +46,7 @@ public class RegistrationHandler {
} }
private void sendRegistrationRequest(String message) throws IOException { private void sendRegistrationRequest(String message) throws IOException {
System.out.println("Sending a registration request"); System.out.println("Sending a registration request with message: "+message);
InetAddress host = InetAddress.getByName(this.registrationServerIp); InetAddress host = InetAddress.getByName(this.registrationServerIp);
Socket socket = null; Socket socket = null;
ObjectOutputStream oos = null; ObjectOutputStream oos = null;
...@@ -97,7 +102,14 @@ public class RegistrationHandler { ...@@ -97,7 +102,14 @@ public class RegistrationHandler {
while (true) { while (true) {
Socket socket = serverSocket.accept(); Socket socket = serverSocket.accept();
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()); ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
String ip = (String) ois.readObject(); String ip = (String) ois.readObject();
String ips = registrationServerIp + ";";
for (Follower follower : NetworkHandler.followers) {
ips = ips + follower.getIpAdress() + ";";
}
objectOutputStream.writeObject(ips);
NetworkHandler networkHandler = new NetworkHandler(); NetworkHandler networkHandler = new NetworkHandler();
networkHandler.connectToSal(ip); networkHandler.connectToSal(ip);
// clientProcessingPool.submit(new ClientTask(clientSocket)); // clientProcessingPool.submit(new ClientTask(clientSocket));
......
...@@ -28,7 +28,7 @@ public class ReplicationHandler { ...@@ -28,7 +28,7 @@ public class ReplicationHandler {
public static void replicateMetadata(Packet packet) throws InterruptedException, ExecutionException{ public static void replicateMetadata(Packet packet) throws InterruptedException, ExecutionException{
Set<Callable<DaRPCFuture<Request, Response>>> callables = new HashSet<>(); Set<Callable<DaRPCFuture<Request, Response>>> callables = new HashSet<>();
for(Follower follower : followers){ for(Follower follower : NetworkHandler.followers){
callables.add(()->{ callables.add(()->{
Request request = new Request(); Request request = new Request();
Response response = new Response(); Response response = new Response();
......
package hpdos_rdma_offloaded.lib; package hpdos_rdma_offloaded.lib;
import java.io.Serializable;
import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.DaRPCStream;
import hpdos_rdma_offloaded.protocol.*; import hpdos_rdma_offloaded.protocol.*;
public class Follower { public class Follower implements Serializable {
private static final long serialVersionUID = 0L;
String ipAddress; String ipAddress;
String followerId; String followerId;
DaRPCStream<Request, Response> stream; DaRPCStream<Request, Response> stream;
......
...@@ -121,7 +121,8 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -121,7 +121,8 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
@Override @Override
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) { public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
System.out.println("Closing Connection"); System.out.println("Closing Connection");
// try // try
// { // {
// invalidationClientEpStreams.remove(rpcClientEndpoint.getDstAddr()); // invalidationClientEpStreams.remove(rpcClientEndpoint.getDstAddr());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment