Commit b2170bd2 authored by Pramod S's avatar Pramod S

Fixed a bug. BUG: Upon disconnection, the client stream was not removed.

parent 1df83a44
...@@ -35,10 +35,10 @@ import hpdos_rdma_offloaded.protocol.RpcProtocol; ...@@ -35,10 +35,10 @@ import hpdos_rdma_offloaded.protocol.RpcProtocol;
public class NetworkHandler { public class NetworkHandler {
private ExecutorService executorService; private ExecutorService executorService;
// public HashMap<String, DaRPCStream<Request, Response>> streams; // public HashMap<String, DaRPCStream<Request, Response>> streams;
public static List <DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationStreams = new ArrayList<>(); public static ConcurrentHashMap<String, DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationStreams = new ConcurrentHashMap<>();
public static List<Follower> followers = new ArrayList<>(); public static List<Follower> followers = new ArrayList<>();
boolean isMaster; boolean isMaster;
ConcurrentHashMap<SocketAddress,DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationClientEpStreams; // ConcurrentHashMap<SocketAddress,DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationClientEpStreams;
public NetworkHandler(boolean isMaster){ public NetworkHandler(boolean isMaster){
this.executorService = Executors.newFixedThreadPool(10); this.executorService = Executors.newFixedThreadPool(10);
...@@ -49,7 +49,7 @@ public class NetworkHandler { ...@@ -49,7 +49,7 @@ public class NetworkHandler {
public void setClientStreams(ConcurrentHashMap<SocketAddress,DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationClientEpStreams) public void setClientStreams(ConcurrentHashMap<SocketAddress,DaRPCStream<InvalidationRequest, InvalidationResponse>> invalidationClientEpStreams)
{ {
this.invalidationClientEpStreams = invalidationClientEpStreams; // this.invalidationClientEpStreams = invalidationClientEpStreams;
} }
public void connectToSal(String ip) throws Exception{ public void connectToSal(String ip) throws Exception{
...@@ -64,7 +64,7 @@ public class NetworkHandler { ...@@ -64,7 +64,7 @@ public class NetworkHandler {
DaRPCClientEndpoint<InvalidationRequest, InvalidationResponse> clientEp = group.createEndpoint(); DaRPCClientEndpoint<InvalidationRequest, InvalidationResponse> clientEp = group.createEndpoint();
clientEp.connect(address, 1000); clientEp.connect(address, 1000);
DaRPCStream<InvalidationRequest, InvalidationResponse> stream = clientEp.createStream(); DaRPCStream<InvalidationRequest, InvalidationResponse> stream = clientEp.createStream();
NetworkHandler.invalidationStreams.add(stream); NetworkHandler.invalidationStreams.put(ip, stream);
} }
public void connectToFollower(String uid , String ip) throws Exception { public void connectToFollower(String uid , String ip) throws Exception {
...@@ -185,7 +185,7 @@ public class NetworkHandler { ...@@ -185,7 +185,7 @@ public class NetworkHandler {
invalidationRequest.setKey(request.getKey()); invalidationRequest.setKey(request.getKey());
Set<Callable<DaRPCFuture<InvalidationRequest, InvalidationResponse>>> callables = new HashSet<>(); Set<Callable<DaRPCFuture<InvalidationRequest, InvalidationResponse>>> callables = new HashSet<>();
ArrayList<DaRPCFuture<InvalidationRequest,InvalidationResponse>> requestFutures = new ArrayList<>();; ArrayList<DaRPCFuture<InvalidationRequest,InvalidationResponse>> requestFutures = new ArrayList<>();;
for (DaRPCStream<InvalidationRequest, InvalidationResponse> invalidationStream : NetworkHandler.invalidationStreams) { for (DaRPCStream<InvalidationRequest, InvalidationResponse> invalidationStream : NetworkHandler.invalidationStreams.values()) {
callables.add(()->{ callables.add(()->{
InvalidationResponse response = new InvalidationResponse(); InvalidationResponse response = new InvalidationResponse();
DaRPCFuture<InvalidationRequest, InvalidationResponse> future = invalidationStream.request(invalidationRequest, response, false); DaRPCFuture<InvalidationRequest, InvalidationResponse> future = invalidationStream.request(invalidationRequest, response, false);
......
...@@ -121,8 +121,23 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer ...@@ -121,8 +121,23 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
@Override @Override
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) { public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
System.out.println("Closing Connection"); try {
SocketAddress socketAddress = rpcClientEndpoint.getDstAddr();
String ipAddress = socketAddress.toString();
int startIndex = ipAddress.indexOf("/") + 1;
int endIndex = ipAddress.indexOf(":");
ipAddress = ipAddress.substring(startIndex, endIndex);
System.out.println("Closing Connection for ip address: " + ipAddress );
NetworkHandler.invalidationStreams.remove(ipAddress);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// TODO : Remove the connection from the static lists stores in Network Handler // TODO : Remove the connection from the static lists stores in Network Handler
// NetworkHandler.invalidationStreams.remove(rpcClientEndpoint.)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment