Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
H
hpdos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
SYNERG
hpdos
Commits
24ff629b
Commit
24ff629b
authored
Oct 25, 2021
by
Pramod S
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Added log4j
parent
b2170bd2
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
76 additions
and
55 deletions
+76
-55
code/hpdos_rdma_offloaded/app.config
code/hpdos_rdma_offloaded/app.config
+6
-5
code/hpdos_rdma_offloaded/app_follower.config
code/hpdos_rdma_offloaded/app_follower.config
+2
-1
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/MetadataServer.java
...ed/src/main/java/hpdos_rdma_offloaded/MetadataServer.java
+12
-7
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/handler/NetworkHandler.java
...ain/java/hpdos_rdma_offloaded/handler/NetworkHandler.java
+14
-12
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/handler/RegistrationHandler.java
...ava/hpdos_rdma_offloaded/handler/RegistrationHandler.java
+18
-13
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/handler/StorageHandler.java
...ain/java/hpdos_rdma_offloaded/handler/StorageHandler.java
+6
-5
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/service/FollowerMetadataService.java
...hpdos_rdma_offloaded/service/FollowerMetadataService.java
+8
-6
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/service/MasterMetadataServerService.java
...s_rdma_offloaded/service/MasterMetadataServerService.java
+10
-6
No files found.
code/hpdos_rdma_offloaded/app.config
View file @
24ff629b
app
.
HOST
=
192
.
168
.
200
.
20
app
.
MASTER
=
true
app
.
MASTER_IP
=
NA
#this is how to pair threads to cpu cores
app
.
cpu_affinity
=
2
app
.
cpu_affinity
=
4
//
No
of
thread
for
each
connectin
app
.
rdma_cluster_size
=
1
app
.
rdma_receive_queue
=
16
app
.
rdma_send_queue
=
16
app
.
rdma_cluster_size
=
4
app
.
rdma_receive_queue
=
32
app
.
rdma_send_queue
=
32
app
.
rdma_polling
=
false
app
.
rdma_max_inline
=
0
app
.
rdma_server_port
=
1920
...
...
@@ -17,4 +18,4 @@ app.db_path=/tmp/rocks
app
.
follower_registration_port
=
9876
app
.
sal_registration_port
=
9875
#
Below
properties
are
for
followers
only
.
\ No newline at end of file
#Below properties are for followers only.
code/hpdos_rdma_offloaded/app_follower.config
View file @
24ff629b
app
.
HOST
=
192
.
168
.
200
.
20
app
.
MASTER
=
false
app
.
MASTER_IP
=
192
.
168
.
200
.
41
app
.
cpu_affinity
=
4
app
.
rdma_cluster_size
=
3
app
.
rdma_receive_queue
=
16
...
...
@@ -15,4 +16,4 @@ app.db_path=/tmp/rocks
app
.
follower_registration_port
=
9876
app
.
sal_registration_port
=
9875
#
Below
properties
are
for
followers
only
.
\ No newline at end of file
#Below properties are for followers only.
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/MetadataServer.java
View file @
24ff629b
...
...
@@ -12,6 +12,7 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import
com.ibm.darpc.DaRPCServerGroup
;
import
com.ibm.disni.RdmaServerEndpoint
;
import
org.apache.log4j.Logger
;
import
org.rocksdb.RocksDB
;
import
org.rocksdb.RocksDBException
;
...
...
@@ -25,6 +26,7 @@ import hpdos_rdma_offloaded.service.MasterMetadataServerService;
public
class
MetadataServer
implements
Runnable
{
private
String
host
;
private
String
masterIp
;
private
int
poolsize
;
private
int
recvQueue
;
private
int
sendQueue
;
...
...
@@ -43,7 +45,7 @@ public class MetadataServer implements Runnable{
RegistrationHandler
registrationHandler
=
null
;
public
final
static
Properties
properties
=
new
Properties
();
String
propertyPath
;
final
static
Logger
logger
=
Logger
.
getLogger
(
MetadataServer
.
class
);
public
MetadataServer
(
String
propertyPath
)
throws
Exception
,
RocksDBException
{
this
.
propertyPath
=
propertyPath
;
InputStream
inputStream
=
new
FileInputStream
(
propertyPath
);
...
...
@@ -60,6 +62,7 @@ public class MetadataServer implements Runnable{
maxinline
=
Integer
.
valueOf
(
properties
.
getProperty
(
"app.rdma_max_inline"
));
rdma_port
=
Integer
.
valueOf
(
properties
.
getProperty
(
"app.rdma_server_port"
));
polling
=
Boolean
.
valueOf
(
properties
.
getProperty
(
"app.rdma_polling"
));
masterIp
=
String
.
valueOf
(
properties
.
getProperty
(
"app.MASTER_IP"
));
int
registrationPort
=
Integer
.
valueOf
(
properties
.
getProperty
(
"app.follower_registration_port"
));
clusterAffinities
=
new
long
[
poolsize
];
...
...
@@ -67,23 +70,25 @@ public class MetadataServer implements Runnable{
{
long
cpu
=
1L
<<
i
;
clusterAffinities
[
i
]
=
cpu
;
System
.
out
.
println
(
cpu
);
//
System.out.println(cpu);
}
isMaster
=
Boolean
.
valueOf
(
properties
.
getProperty
(
"app.MASTER"
));
storageHandler
=
new
StorageHandler
(
properties
.
getProperty
(
"app.db_path"
));
networkHandler
=
new
NetworkHandler
(
isMaster
);
// metadataServerService = new MasterMetadataServerService(this.networkHandler,storageHandler);
registrationHandler
=
new
RegistrationHandler
(
isMaster
,
host
,
registrationPort
,
networkHandler
);
if
(
isMaster
)
{
System
.
out
.
println
(
"STARTING MASTER SERVICES."
);
logger
.
debug
(
"STARTING MASTER SERVICES."
);
registrationHandler
=
new
RegistrationHandler
(
isMaster
,
host
,
masterIp
,
registrationPort
,
networkHandler
);
metadataServerService
=
new
MasterMetadataServerService
(
this
.
networkHandler
,
storageHandler
);
}
else
{
System
.
out
.
println
(
"STARTING FOLLOWER SERVICES."
);
logger
.
debug
(
"STARTING FOLLOWER SERVICES."
);
registrationHandler
=
new
RegistrationHandler
(
isMaster
,
host
,
masterIp
,
registrationPort
,
networkHandler
);
followerMetadataServerService
=
new
FollowerMetadataService
(
storageHandler
);
}
...
...
@@ -107,9 +112,9 @@ public class MetadataServer implements Runnable{
while
(
true
){
try
{
// System.out.println
("Listening to RDMA requests a, IP: " + host + " , PORT: " + rdma_port);
logger
.
debug
(
"Listening to RDMA requests a, IP: "
+
host
+
" , PORT: "
+
rdma_port
);
serverEp
.
accept
();
// System.out.println
("Accepted connection.");
logger
.
debug
(
"Accepted connection."
);
}
catch
(
IOException
e
)
{
...
...
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/handler/NetworkHandler.java
View file @
24ff629b
...
...
@@ -21,6 +21,7 @@ import com.ibm.darpc.DaRPCClientGroup;
import
com.ibm.darpc.DaRPCFuture
;
import
com.ibm.darpc.DaRPCStream
;
import
org.apache.log4j.Logger
;
import
org.rocksdb.RocksDBException
;
import
hpdos_rdma_offloaded.lib.Follower
;
...
...
@@ -33,6 +34,7 @@ import hpdos_rdma_offloaded.protocol.Response;
import
hpdos_rdma_offloaded.protocol.RpcProtocol
;
public
class
NetworkHandler
{
final
static
Logger
logger
=
Logger
.
getLogger
(
NetworkHandler
.
class
);
private
ExecutorService
executorService
;
// public HashMap<String, DaRPCStream<Request, Response>> streams;
public
static
ConcurrentHashMap
<
String
,
DaRPCStream
<
InvalidationRequest
,
InvalidationResponse
>>
invalidationStreams
=
new
ConcurrentHashMap
<>();
...
...
@@ -54,7 +56,7 @@ public class NetworkHandler {
public
void
connectToSal
(
String
ip
)
throws
Exception
{
Thread
.
sleep
(
3000
);
System
.
out
.
println
(
"Making rdma connection to client for invalidation request."
);
logger
.
debug
(
"Making rdma connection to client for invalidation request."
);
// Follower follower = new Follower();
// follower.setFollowerId(uid);
// follower.setIpAddress(ip);
...
...
@@ -69,7 +71,7 @@ public class NetworkHandler {
public
void
connectToFollower
(
String
uid
,
String
ip
)
throws
Exception
{
Thread
.
sleep
(
3000
);
System
.
out
.
println
(
"Making rdma connection to follower."
);
logger
.
debug
(
"Making rdma connection to follower."
);
Follower
follower
=
new
Follower
();
follower
.
setFollowerId
(
uid
);
follower
.
setIpAddress
(
ip
);
...
...
@@ -86,10 +88,10 @@ public class NetworkHandler {
// Change the Futre to CompletableFuture to achieve correct asynchronousness
public
void
create
(
Packet
packet
)
throws
InterruptedException
,
ExecutionException
,
RocksDBException
{
System
.
out
.
println
(
"Received create request for key/value: "
+
new
String
(
packet
.
getKey
())
+
"/"
+
new
String
(
packet
.
getValue
()));
logger
.
info
(
"Received create request for key/value: "
+
new
String
(
packet
.
getKey
())
+
"/"
+
new
String
(
packet
.
getValue
()));
if
(
this
.
isMaster
)
{
System
.
out
.
println
(
"Starting replication"
);
logger
.
info
(
"Starting replication"
);
Future
<
Response
>
futureReplication
=
this
.
executorService
.
submit
(()->{
Response
replicationResponse
=
new
Response
();
// Write code to replicate the data to other nics
...
...
@@ -97,15 +99,15 @@ public class NetworkHandler {
return
replicationResponse
;
});
Response
response
=
futureReplication
.
get
();
System
.
out
.
println
(
"Replication complete"
);
logger
.
info
(
"Replication complete"
);
}
}
public
void
read
(
Packet
packet
)
throws
RocksDBException
,
InterruptedException
,
ExecutionException
{
System
.
out
.
println
(
"Received read request for key/value: "
+
new
String
(
packet
.
getKey
()));
logger
.
info
(
"Received read request for key/value: "
+
new
String
(
packet
.
getKey
()));
if
(
this
.
isMaster
)
{
System
.
out
.
println
(
"Starting replication"
);
//
System.out.println("Starting replication");
Future
<
Response
>
futureReplication
=
this
.
executorService
.
submit
(()->{
Response
replicationResponse
=
new
Response
();
// Write code to replicate the data to other nics
...
...
@@ -113,7 +115,6 @@ public class NetworkHandler {
return
replicationResponse
;
});
Response
response
=
futureReplication
.
get
();
System
.
out
.
println
(
"Replicating complete Ack "
+
response
.
getAck
());
}
}
...
...
@@ -134,7 +135,7 @@ public class NetworkHandler {
});
response
=
futureReplication
.
get
();
futureInvalidation
.
get
();
System
.
out
.
println
(
"Replicating complete Ack "
+
response
.
getAck
());
logger
.
info
(
"Replicating complete Ack "
+
response
.
getAck
());
}
}
...
...
@@ -143,7 +144,7 @@ public class NetworkHandler {
public
void
delete
(
Packet
packet
)
throws
RocksDBException
,
InterruptedException
,
ExecutionException
{
if
(
this
.
isMaster
)
{
System
.
out
.
println
(
"Starting replication"
);
logger
.
info
(
"Starting replication"
);
Future
<
Response
>
futureReplication
=
this
.
executorService
.
submit
(()->{
Response
replicationResponse
=
new
Response
();
// Write code to replicate the data to other nics
...
...
@@ -151,7 +152,7 @@ public class NetworkHandler {
return
replicationResponse
;
});
Response
response
=
futureReplication
.
get
();
System
.
out
.
println
(
"Replicating complete "
+
response
.
getAck
());
logger
.
info
(
"Replicating complete "
+
response
.
getAck
());
}
}
...
...
@@ -180,7 +181,7 @@ public class NetworkHandler {
*/
// Pramod method
ExecutorService
executorService
=
Executors
.
newFixedThreadPool
(
10
);
System
.
out
.
println
(
"Sending Invalidation Request"
);
logger
.
info
(
"Sending Invalidation Request"
);
InvalidationRequest
invalidationRequest
=
new
InvalidationRequest
();
invalidationRequest
.
setKey
(
request
.
getKey
());
Set
<
Callable
<
DaRPCFuture
<
InvalidationRequest
,
InvalidationResponse
>>>
callables
=
new
HashSet
<>();
...
...
@@ -197,6 +198,7 @@ public class NetworkHandler {
for
(
Future
<
DaRPCFuture
<
InvalidationRequest
,
InvalidationResponse
>>
future:
futures
)
{
InvalidationResponse
response
=
future
.
get
().
get
();
}
executorService
.
shutdown
();
}
public
boolean
consumeReplicationRequestFutures
(
ArrayList
<
DaRPCFuture
<
InvalidationRequest
,
InvalidationResponse
>>
requestFutures
)
throws
InterruptedException
,
ExecutionException
{
...
...
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/handler/RegistrationHandler.java
View file @
24ff629b
...
...
@@ -12,6 +12,8 @@ import java.util.ArrayList;
import
java.util.List
;
import
java.util.UUID
;
import
org.apache.log4j.Logger
;
import
hpdos_rdma_offloaded.lib.Follower
;
public
class
RegistrationHandler
{
...
...
@@ -19,11 +21,12 @@ public class RegistrationHandler {
public
int
port
=
9876
;
boolean
isMaster
;
String
registrationServerIp
;
String
masterIp
;
int
follower_registration_port
;
int
sal_registration_port
;
NetworkHandler
networkHandler
;
public
RegistrationHandler
(
boolean
isMaster
,
String
registrationServerIP
,
int
registrationPort
,
NetworkHandler
networkHandler
)
throws
IOException
{
final
static
Logger
logger
=
Logger
.
getLogger
(
RegistrationHandler
.
class
);
public
RegistrationHandler
(
boolean
isMaster
,
String
registrationServerIP
,
String
masterIp
,
int
registrationPort
,
NetworkHandler
networkHandler
)
throws
IOException
{
this
.
registrationServerIp
=
registrationServerIP
;
this
.
follower_registration_port
=
9876
;
this
.
sal_registration_port
=
9875
;
...
...
@@ -38,21 +41,23 @@ public class RegistrationHandler {
// Follower follower = new Follower();
// follower.setFollowerId( UUID.randomUUID().toString());
// follower.setIpAddress("192.168.200.20");
this
.
masterIp
=
masterIp
;
String
followerUUID
=
UUID
.
randomUUID
().
toString
();
String
followerIp
=
registrationServerIP
;
String
message
=
followerUUID
+
";"
+
followerIp
;
sendRegistrationRequest
(
message
);
sendRegistrationRequest
(
m
asterIp
,
m
essage
);
}
}
private
void
sendRegistrationRequest
(
String
message
)
throws
IOException
{
System
.
out
.
println
(
"Sending a registration request with message: "
+
message
);
private
void
sendRegistrationRequest
(
String
m
asterIp
,
String
m
essage
)
throws
IOException
{
logger
.
debug
(
"Sending a registration request with message: "
+
message
);
InetAddress
host
=
InetAddress
.
getByName
(
this
.
registrationServerIp
);
Socket
socket
=
null
;
ObjectOutputStream
oos
=
null
;
socket
=
new
Socket
(
this
.
registrationServerIp
,
9876
);
socket
=
new
Socket
(
masterIp
,
9876
);
oos
=
new
ObjectOutputStream
(
socket
.
getOutputStream
());
oos
.
writeObject
(
message
);
socket
.
close
();
}
public
void
startFollowerRegistrationHandlerServer
(){
...
...
@@ -64,20 +69,20 @@ public class RegistrationHandler {
{
InetAddress
addr
=
InetAddress
.
getByName
(
registrationServerIp
);
ServerSocket
serverSocket
=
new
ServerSocket
(
follower_registration_port
,
50
,
addr
);
System
.
out
.
println
(
"Started the follower registration service..."
);
logger
.
debug
(
"Started the follower registration service..."
);
while
(
true
)
{
Socket
socket
=
serverSocket
.
accept
();
System
.
out
.
println
(
"Accepted connection"
);
logger
.
debug
(
"Accepted connection"
);
ObjectInputStream
ois
=
new
ObjectInputStream
(
socket
.
getInputStream
());
String
message
=
(
String
)
ois
.
readObject
();
String
arr
[]
=
message
.
split
(
";"
);
String
followerUUID
=
arr
[
0
];
String
followerIp
=
arr
[
1
];
System
.
out
.
println
(
"Got follower ip: "
+
followerIp
+
" , with UUID of: "
+
followerUUID
);
logger
.
debug
(
"Got follower ip: "
+
followerIp
+
" , with UUID of: "
+
followerUUID
);
NetworkHandler
networkHandler
=
new
NetworkHandler
();
networkHandler
.
connectToFollower
(
followerUUID
,
followerIp
);
System
.
out
.
println
(
"RDMA connection establised"
);
NetworkHandler
networkHandler
=
new
NetworkHandler
();
networkHandler
.
connectToFollower
(
followerUUID
,
followerIp
);
logger
.
debug
(
"RDMA connection establised"
);
// Add the follower details to the respective class/component
// clientProcessingPool.submit(new ClientTask(clientSocket));
}
...
...
@@ -98,7 +103,7 @@ public class RegistrationHandler {
try
{
InetAddress
addr
=
InetAddress
.
getByName
(
registrationServerIp
);
ServerSocket
serverSocket
=
new
ServerSocket
(
sal_registration_port
,
50
,
addr
);
System
.
out
.
println
(
"Started the SAL registration service..."
);
logger
.
debug
(
"Started the SAL registration service..."
);
while
(
true
)
{
Socket
socket
=
serverSocket
.
accept
();
ObjectInputStream
ois
=
new
ObjectInputStream
(
socket
.
getInputStream
());
...
...
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/handler/StorageHandler.java
View file @
24ff629b
package
hpdos_rdma_offloaded.handler
;
import
org.rocksdb.RocksDB
;
import
org.apache.log4j.Logger
;
import
org.rocksdb.Options
;
import
org.rocksdb.RocksDBException
;
...
...
@@ -11,27 +12,27 @@ public class StorageHandler implements AutoCloseable{
RocksDB
rocksDB
;
Options
rockDbOptions
;
final
static
Logger
logger
=
Logger
.
getLogger
(
StorageHandler
.
class
);
public
StorageHandler
(
String
dbpath
)
throws
RocksDBException
{
System
.
out
.
println
(
"Creating RocksDB"
);
logger
.
debug
(
"Creating RocksDB"
);
this
.
rockDbOptions
=
new
Options
();
rockDbOptions
.
setCreateIfMissing
(
true
);
this
.
rocksDB
=
RocksDB
.
open
(
rockDbOptions
,
dbpath
);
System
.
out
.
println
(
"Created RocksDB"
);
logger
.
debug
(
"Created RocksDB"
);
}
public
void
close
()
{
rocksDB
.
close
();
rockDbOptions
.
close
();
System
.
out
.
println
(
"Closing RocksDB instance"
);
logger
.
debug
(
"Closing RocksDB instance"
);
}
public
void
create
(
byte
[]
key
,
byte
[]
value
)
throws
RocksDBException
{
rocksDB
.
put
(
key
,
value
);
}
public
void
create
(
Request
request
)
throws
RocksDBException
{
System
.
out
.
println
(
"Writing to local"
);
logger
.
info
(
"Writing to local"
);
rocksDB
.
put
(
request
.
getKey
(),
request
.
getValue
());
}
public
byte
[]
read
(
byte
[]
key
)
throws
RocksDBException
...
...
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/service/FollowerMetadataService.java
View file @
24ff629b
...
...
@@ -7,6 +7,7 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import
com.ibm.darpc.DaRPCServerEvent
;
import
com.ibm.darpc.DaRPCService
;
import
org.apache.log4j.Logger
;
import
org.rocksdb.RocksDBException
;
import
hpdos_rdma_offloaded.handler.StorageHandler
;
...
...
@@ -21,9 +22,10 @@ import hpdos_rdma_offloaded.protocol.RpcProtocol;
public
class
FollowerMetadataService
extends
RpcProtocol
implements
DaRPCService
<
Request
,
Response
>{
StorageHandler
storageHandler
;
final
static
Logger
logger
=
Logger
.
getLogger
(
FollowerMetadataService
.
class
);
DaRPCClientGroup
<
InvalidationRequest
,
InvalidationResponse
>
invalidationClientGroup
;
public
FollowerMetadataService
(
StorageHandler
storageHandler
)
throws
Exception
{
System
.
out
.
println
(
"New MetadataServerService Created"
);
logger
.
debug
(
"New MetadataServerService Created"
);
this
.
storageHandler
=
storageHandler
;
}
...
...
@@ -32,10 +34,10 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService
Request
request
=
event
.
getReceiveMessage
();
Response
response
=
event
.
getSendMessage
();
System
.
out
.
print
(
"Received "
+
request
.
getRequestType
()+
" Request for key: "
+
new
String
(
request
.
getKey
()));
logger
.
info
(
"Received "
+
request
.
getRequestType
()+
" Request for key: "
+
new
String
(
request
.
getKey
()));
if
(
request
.
getValue
()
!=
null
)
System
.
out
.
print
(
" value : "
+
new
String
(
request
.
getValue
())
);
System
.
out
.
println
();
logger
.
info
(
" value : "
+
new
String
(
request
.
getValue
())
);
//
System.out.println();
try
{
if
(
request
.
getRequestType
()
==
RequestType
.
PUT
)
{
...
...
@@ -73,12 +75,12 @@ public class FollowerMetadataService extends RpcProtocol implements DaRPCService
@Override
public
void
open
(
DaRPCServerEndpoint
<
Request
,
Response
>
rpcClientEndpoint
)
{
System
.
out
.
print
(
"Received new connection "
);
logger
.
debug
(
"Received new connection "
);
}
@Override
public
void
close
(
DaRPCServerEndpoint
<
Request
,
Response
>
rpcClientEndpoint
)
{
System
.
out
.
println
(
"Closing Connection"
);
logger
.
debug
(
"Closing Connection"
);
}
}
code/hpdos_rdma_offloaded/src/main/java/hpdos_rdma_offloaded/service/MasterMetadataServerService.java
View file @
24ff629b
...
...
@@ -13,6 +13,7 @@ import com.ibm.darpc.DaRPCServerEvent;
import
com.ibm.darpc.DaRPCService
;
import
com.ibm.darpc.DaRPCStream
;
import
org.apache.log4j.Logger
;
import
org.rocksdb.RocksDBException
;
import
hpdos_rdma_offloaded.handler.NetworkHandler
;
...
...
@@ -33,8 +34,10 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
ConcurrentHashMap
<
SocketAddress
,
DaRPCStream
<
InvalidationRequest
,
InvalidationResponse
>>
invalidationClientEpStreams
;
ConcurrentHashMap
<
SocketAddress
,
DaRPCClientEndpoint
<
InvalidationRequest
,
InvalidationResponse
>>
invalidationClientEps
;
DaRPCClientGroup
<
InvalidationRequest
,
InvalidationResponse
>
invalidationClientGroup
;
final
static
Logger
logger
=
Logger
.
getLogger
(
MasterMetadataServerService
.
class
);
public
MasterMetadataServerService
(
NetworkHandler
networkHandler
,
StorageHandler
storageHandler
)
throws
Exception
{
System
.
out
.
println
(
"New MetadataServerService Created"
);
logger
.
debug
(
"New MetadataServerService Created"
);
this
.
networkHandler
=
networkHandler
;
this
.
storageHandler
=
storageHandler
;
this
.
invalidationClientEpStreams
=
new
ConcurrentHashMap
<>();
...
...
@@ -49,10 +52,9 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
Request
request
=
event
.
getReceiveMessage
();
Response
response
=
event
.
getSendMessage
();
Packet
packet
=
new
Packet
(
request
.
getRequestType
(),
request
.
getKey
(),
request
.
getValue
());
System
.
out
.
print
(
"Received "
+
request
.
getRequestType
()+
" Request for key: "
+
new
String
(
request
.
getKey
()));
logger
.
info
(
"Received "
+
request
.
getRequestType
()+
" Request for key: "
+
new
String
(
request
.
getKey
()));
if
(
request
.
getValue
()
!=
null
)
System
.
out
.
print
(
" value : "
+
new
String
(
request
.
getValue
())
);
System
.
out
.
println
();
logger
.
info
(
" value : "
+
new
String
(
request
.
getValue
()));
try
{
if
(
request
.
getRequestType
()
==
RequestType
.
PUT
)
{
this
.
storageHandler
.
create
(
request
);
...
...
@@ -79,6 +81,8 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
}
else
if
(
request
.
getRequestType
()
==
RequestType
.
DELETE
){
this
.
storageHandler
.
delete
(
request
);
this
.
networkHandler
.
delete
(
packet
);
this
.
networkHandler
.
sendInvalidateRequest
(
request
);
// var replicationRequestFutures = this.networkHandler.sendReplicateRequest(request);
//Yet to consume isDone
// var isDone = this.networkHandler.consumeReplicationRequestFutures(replicationRequestFutures);
...
...
@@ -97,7 +101,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
@Override
public
void
open
(
DaRPCServerEndpoint
<
Request
,
Response
>
rpcClientEndpoint
)
{
System
.
out
.
print
(
"Received new connection "
);
logger
.
debug
(
"Received new connection "
);
// try{
// System.out.println(rpcClientEndpoint.getDstAddr()+" "+rpcClientEndpoint.getSrcAddr());
// String clientIP = ((InetSocketAddress)rpcClientEndpoint.getDstAddr()).getHostName();
...
...
@@ -129,7 +133,7 @@ public class MasterMetadataServerService extends RpcProtocol implements DaRPCSer
ipAddress
=
ipAddress
.
substring
(
startIndex
,
endIndex
);
System
.
out
.
println
(
"Closing Connection for ip address: "
+
ipAddress
);
logger
.
debug
(
"Closing Connection for ip address: "
+
ipAddress
);
NetworkHandler
.
invalidationStreams
.
remove
(
ipAddress
);
}
catch
(
IOException
e
)
{
// TODO Auto-generated catch block
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment