Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
H
hpdos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
SYNERG
hpdos
Commits
731e788d
Commit
731e788d
authored
Nov 29, 2021
by
Paras Garg
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
updated files
parent
a155b14c
Changes
11
Show whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
144 additions
and
123 deletions
+144
-123
code/hpdos_rdma_sal/app.config
code/hpdos_rdma_sal/app.config
+1
-1
code/hpdos_rdma_sal/build.gradle
code/hpdos_rdma_sal/build.gradle
+2
-2
code/hpdos_rdma_sal/src/main/java/BenchmarkingClient.java
code/hpdos_rdma_sal/src/main/java/BenchmarkingClient.java
+88
-84
code/hpdos_rdma_sal/src/main/java/MetadataClient.java
code/hpdos_rdma_sal/src/main/java/MetadataClient.java
+12
-9
code/hpdos_rdma_sal/src/main/java/StandaloneRDMAClient.java
code/hpdos_rdma_sal/src/main/java/StandaloneRDMAClient.java
+2
-3
code/hpdos_rdma_sal/src/main/java/hpdos/handlers/NetworkHandlerM.java
...dma_sal/src/main/java/hpdos/handlers/NetworkHandlerM.java
+2
-0
code/hpdos_rdma_sal/src/main/java/hpdos/invalidationServer/InvalidationServer.java
...ain/java/hpdos/invalidationServer/InvalidationServer.java
+3
-3
code/hpdos_rdma_sal/src/main/java/hpdos/protocol/BenchmarkingProperties.java
.../src/main/java/hpdos/protocol/BenchmarkingProperties.java
+12
-12
code/hpdos_rdma_sal/src/main/java/hpdos/protocol/ByteArray.java
...pdos_rdma_sal/src/main/java/hpdos/protocol/ByteArray.java
+16
-3
code/hpdos_rdma_sal/src/main/java/hpdos/protocol/Property.java
...hpdos_rdma_sal/src/main/java/hpdos/protocol/Property.java
+1
-1
code/hpdos_rdma_sal/src/main/java/hpdos/services/InvalidationService.java
...sal/src/main/java/hpdos/services/InvalidationService.java
+5
-5
No files found.
code/hpdos_rdma_sal/app.config
View file @
731e788d
app
.
HOST
=
192
.
168
.
200
.
20
app
.
HOST
=
192
.
168
.
200
.
20
app
.
NO_OF_MASTERS
=
1
app
.
NO_OF_MASTERS
=
1
app
.
MASTER_HOST1
=
192
.
168
.
200
.
20
app
.
MASTER_HOST1
=
192
.
168
.
200
.
20
#app.MASTER_HOST2=192.168.200.51
app
.
MASTER_HOST2
=
192
.
168
.
200
.
40
app
.
INVALIDATION_PORT
=
1922
app
.
INVALIDATION_PORT
=
1922
app
.
CACHE_SIZE
=
1
app
.
CACHE_SIZE
=
1
code/hpdos_rdma_sal/build.gradle
View file @
731e788d
...
@@ -40,8 +40,8 @@ apply plugin: "application"
...
@@ -40,8 +40,8 @@ apply plugin: "application"
apply
plugin:
"java-library"
apply
plugin:
"java-library"
//mainClassName = "MetadataClient"
//mainClassName = "MetadataClient"
mainClassName
=
"BenchmarkingClient"
//
mainClassName = "BenchmarkingClient"
//
mainClassName= "StandaloneRDMAClient"
mainClassName
=
"StandaloneRDMAClient"
version
'1.0-SNAPSHOT'
version
'1.0-SNAPSHOT'
repositories
{
repositories
{
...
...
code/hpdos_rdma_sal/src/main/java/BenchmarkingClient.java
View file @
731e788d
import
java.io.FileInputStream
;
import
java.io.FileInputStream
;
import
java.io.InputStream
;
import
java.io.InputStream
;
import
java.util.HashSet
;
import
java.util.HashSet
;
import
java.util.Iterator
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.List
;
...
@@ -15,30 +16,32 @@ import java.util.concurrent.Future;
...
@@ -15,30 +16,32 @@ import java.util.concurrent.Future;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeUnit
;
import
com.google.common.base.Stopwatch
;
import
com.google.common.base.Stopwatch
;
import
com.google.common.util.concurrent.ListeningExecutorService
;
import
com.google.common.util.concurrent.MoreExecutors
;
import
org.apache.log4j.Logger
;
import
org.apache.log4j.Logger
;
import
hpdos.handlers.NetworkHandler
;
import
hpdos.handlers.NetworkHandler
;
import
hpdos.protocol.BenchmarkingProperties
;
import
hpdos.protocol.Property
;
public
class
BenchmarkingClient
{
public
class
BenchmarkingClient
{
final
static
Logger
logger
=
Logger
.
getLogger
(
BenchmarkingClient
.
class
);
public
Properties
properties
;
public
Properties
properties
;
public
double
runExperiment
(
String
threadId
,
MetadataClient
client
)
throws
Exception
{
public
double
runExperiment
(
String
threadId
,
MetadataClient
client
,
BenchmarkingProperties
properties
)
throws
Exception
{
Double
sleepTime
=
1000.0
*
1000.0
/
BenchmarkingP
roperties
.
requestsPerSecond
;
Double
sleepTime
=
1000.0
*
1000.0
/
p
roperties
.
requestsPerSecond
;
long
sleepTimeMicros
=
Double
.
valueOf
(
sleepTime
).
longValue
();
long
sleepTimeMicros
=
Double
.
valueOf
(
sleepTime
).
longValue
();
logger
.
debug
(
sleepTimeMicros
);
logger
.
info
(
sleepTimeMicros
);
Thread
.
sleep
((
long
)(
Math
.
random
()
*
500
));
// randomizing start time to avoid spikes in request
Thread
.
sleep
((
long
)(
Math
.
random
()
*
500
));
// randomizing start time to avoid spikes in request
long
requestsPerClient
=
BenchmarkingP
roperties
.
requestsPerClient
;
long
requestsPerClient
=
p
roperties
.
requestsPerClient
;
double
totalBracket
=
BenchmarkingP
roperties
.
reads
double
totalBracket
=
p
roperties
.
reads
+
BenchmarkingP
roperties
.
writes
+
p
roperties
.
writes
+
BenchmarkingP
roperties
.
deletes
;
+
p
roperties
.
deletes
;
double
writeBracket
,
readBracket
,
deleteBracket
;
double
writeBracket
,
readBracket
,
deleteBracket
;
writeBracket
=
BenchmarkingP
roperties
.
writes
*
1.0
/
totalBracket
;
writeBracket
=
p
roperties
.
writes
*
1.0
/
totalBracket
;
readBracket
=
writeBracket
+
BenchmarkingP
roperties
.
reads
/
totalBracket
;
readBracket
=
writeBracket
+
p
roperties
.
reads
/
totalBracket
;
deleteBracket
=
readBracket
+
BenchmarkingP
roperties
.
deletes
/
totalBracket
;
deleteBracket
=
readBracket
+
p
roperties
.
deletes
/
totalBracket
;
// int temp = 5;
// int temp = 5;
ExecutorService
threadpool
=
Executors
.
newCachedThreadPool
();
ExecutorService
threadpool
=
Executors
.
newCachedThreadPool
();
// ListeningExecutorService service = MoreExecutors.listeningDecorator(threadpool);
// ListeningExecutorService service = MoreExecutors.listeningDecorator(threadpool);
...
@@ -52,15 +55,15 @@ public class BenchmarkingClient {
...
@@ -52,15 +55,15 @@ public class BenchmarkingClient {
if
(
toss
<
writeBracket
)
{
if
(
toss
<
writeBracket
)
{
byte
[]
key
=
new
byte
[
BenchmarkingP
roperties
.
keySize
];
byte
[]
key
=
new
byte
[
p
roperties
.
keySize
];
byte
[]
value
=
new
byte
[
BenchmarkingP
roperties
.
valueSize
];
byte
[]
value
=
new
byte
[
p
roperties
.
valueSize
];
new
Random
().
nextBytes
(
key
);
new
Random
().
nextBytes
(
key
);
new
Random
().
nextBytes
(
value
);
new
Random
().
nextBytes
(
value
);
// stopwatch.start();
// stopwatch.start();
long
writeStartTime
=
System
.
nanoTime
();
long
writeStartTime
=
System
.
nanoTime
();
CompletableFuture
<
Void
>
cf
=
CompletableFuture
.
runAsync
(()
->
{
CompletableFuture
<
Void
>
cf
=
CompletableFuture
.
runAsync
(()
->
{
try
{
try
{
client
.
networkHandler
.
put
(
key
,
value
);
client
.
put
(
key
,
value
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
}
}
...
@@ -69,47 +72,45 @@ public class BenchmarkingClient {
...
@@ -69,47 +72,45 @@ public class BenchmarkingClient {
});
});
responses
.
add
(
cf
);
responses
.
add
(
cf
);
// stopwatch.stop();
// stopwatch.stop();
}
}
else
if
(
toss
<
readBracket
)
{
else
if
(
toss
<
readBracket
)
{
byte
[]
key
=
new
byte
[
BenchmarkingP
roperties
.
keySize
];
byte
[]
key
=
new
byte
[
p
roperties
.
keySize
];
new
Random
().
nextBytes
(
key
);
new
Random
().
nextBytes
(
key
);
// stopwatch.start();
// stopwatch.start();
long
readStartTime
=
System
.
nanoTime
();
long
readStartTime
=
System
.
nanoTime
();
CompletableFuture
<
Void
>
cf
=
CompletableFuture
.
runAsync
(()
->
{
CompletableFuture
<
Void
>
cf
=
CompletableFuture
.
runAsync
(()
->
{
try
{
try
{
client
.
networkHandler
.
get
(
key
);
client
.
get
(
key
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
}
}
}).
whenComplete
((
r
,
t
)->{
}).
whenComplete
((
r
,
t
)->{
collectExperimentStatistics
((
System
.
nanoTime
()
-
readStartTime
)/
1000
,
101
);
collectExperimentStatistics
((
System
.
nanoTime
()
-
readStartTime
)/
1000
,
101
,
properties
);
});
});
responses
.
add
(
cf
);
responses
.
add
(
cf
);
// stopwatch.stop();
// stopwatch.stop();
// collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 101);
// collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 101);
}
}
else
if
(
toss
<
deleteBracket
){
else
if
(
toss
<
deleteBracket
){
byte
[]
key
=
new
byte
[
BenchmarkingP
roperties
.
keySize
];
byte
[]
key
=
new
byte
[
p
roperties
.
keySize
];
new
Random
().
nextBytes
(
key
);
new
Random
().
nextBytes
(
key
);
// stopwatch.start();
// stopwatch.start();
long
deleteStartTime
=
System
.
nanoTime
();
long
deleteStartTime
=
System
.
nanoTime
();
CompletableFuture
<
Void
>
cf
=
CompletableFuture
.
runAsync
(()
->
{
CompletableFuture
<
Void
>
cf
=
CompletableFuture
.
runAsync
(()
->
{
try
{
try
{
client
.
networkHandler
.
delete
(
key
);
client
.
delete
(
key
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
}
}
}).
whenComplete
((
r
,
t
)->{
}).
whenComplete
((
r
,
t
)->{
collectExperimentStatistics
((
System
.
nanoTime
()
-
deleteStartTime
)/
1000
,
102
);
collectExperimentStatistics
((
System
.
nanoTime
()
-
deleteStartTime
)/
1000
,
102
,
properties
);
});
});
responses
.
add
(
cf
);
responses
.
add
(
cf
);
// stopwatch.stop();
// stopwatch.stop();
// collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 102);
// collectExperimentStatistics(stopwatch.elapsed(TimeUnit.MICROSECONDS), 102);
}
}
if
(
requestsPerClient
%
10000
==
0
)
{
if
(
requestsPerClient
%
10000
==
0
)
{
logger
.
debug
(
"Ten thousand requests completed "
+
requestsPerClient
);
logger
.
info
(
"Ten thousand requests completed "
+
requestsPerClient
);
}
}
requestsPerClient
--
;
requestsPerClient
--
;
...
@@ -119,9 +120,9 @@ public class BenchmarkingClient {
...
@@ -119,9 +120,9 @@ public class BenchmarkingClient {
// stopwatch.start();
// stopwatch.start();
testWait
(
sleepTimeMicros
);
testWait
(
sleepTimeMicros
);
long
endTime
=
System
.
nanoTime
();
long
endTime
=
System
.
nanoTime
();
BenchmarkingProperties
.
timePerRequest
=
BenchmarkingP
roperties
.
timePerRequest
+
((
endTime
-
startTime
)/
1000
);
properties
.
timePerRequest
=
p
roperties
.
timePerRequest
+
((
endTime
-
startTime
)/
1000
);
// stopwatch.stop();
// stopwatch.stop();
// logger.
debug
("Microseconds waited: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
// logger.
info
("Microseconds waited: " + stopwatch.elapsed(TimeUnit.MICROSECONDS));
}
}
Iterator
iteratorValues
=
responses
.
iterator
();
Iterator
iteratorValues
=
responses
.
iterator
();
long
startTime
=
System
.
nanoTime
();
long
startTime
=
System
.
nanoTime
();
...
@@ -130,107 +131,110 @@ public class BenchmarkingClient {
...
@@ -130,107 +131,110 @@ public class BenchmarkingClient {
res
.
get
(
10
,
TimeUnit
.
SECONDS
);
res
.
get
(
10
,
TimeUnit
.
SECONDS
);
}
}
long
endTime
=
System
.
nanoTime
();
long
endTime
=
System
.
nanoTime
();
logger
.
debug
(
"Average request latency: "
+
((
endTime
-
startTime
)/
1000
)/(
BenchmarkingP
roperties
.
requestsPerClient
));
logger
.
info
(
"Average request latency: "
+
((
endTime
-
startTime
)/
1000
)/(
p
roperties
.
requestsPerClient
));
return
0
;
return
0
;
}
}
public
static
void
collectExperimentStatistics
(
long
time
,
int
type
){
public
static
void
collectExperimentStatistics
(
long
time
,
int
type
,
BenchmarkingProperties
properties
){
// logger.
debug
("Time: " + time);
// logger.
info
("Time: " + time);
if
(
type
==
100
)
{
if
(
type
==
100
)
{
BenchmarkingP
roperties
.
writeCount
++;
p
roperties
.
writeCount
++;
BenchmarkingProperties
.
writeTime
=
BenchmarkingP
roperties
.
writeTime
+
time
;
properties
.
writeTime
=
p
roperties
.
writeTime
+
time
;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
}
else
if
(
type
==
101
)
{
else
if
(
type
==
101
)
{
BenchmarkingP
roperties
.
readCount
++;
p
roperties
.
readCount
++;
BenchmarkingProperties
.
readTime
=
BenchmarkingP
roperties
.
readTime
+
time
;
properties
.
readTime
=
p
roperties
.
readTime
+
time
;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
}
else
if
(
type
==
102
){
else
if
(
type
==
102
){
BenchmarkingP
roperties
.
deleteCount
++;
p
roperties
.
deleteCount
++;
BenchmarkingProperties
.
deleteTime
=
BenchmarkingP
roperties
.
deleteTime
+
time
;
properties
.
deleteTime
=
p
roperties
.
deleteTime
+
time
;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
// BenchmarkingProperties.actualReqPerSecond = BenchmarkingProperties.actualReqPerSecond + time;
}
}
}
}
public
void
printStatistics
(){
public
void
printStatistics
(
BenchmarkingProperties
properties
){
logger
.
debug
(
"Total writes: "
+
BenchmarkingP
roperties
.
writeCount
);
logger
.
info
(
"Total writes: "
+
p
roperties
.
writeCount
);
logger
.
debug
(
"Total reads: "
+
BenchmarkingP
roperties
.
readCount
);
logger
.
info
(
"Total reads: "
+
p
roperties
.
readCount
);
logger
.
debug
(
"Total deletes: "
+
BenchmarkingP
roperties
.
deleteCount
);
logger
.
info
(
"Total deletes: "
+
p
roperties
.
deleteCount
);
logger
.
debug
(
"Total write time: "
+
BenchmarkingP
roperties
.
writeTime
);
logger
.
info
(
"Total write time: "
+
p
roperties
.
writeTime
);
logger
.
debug
(
"Total read time: "
+
BenchmarkingP
roperties
.
readTime
);
logger
.
info
(
"Total read time: "
+
p
roperties
.
readTime
);
logger
.
debug
(
"Total delete time: "
+
BenchmarkingP
roperties
.
deleteTime
);
logger
.
info
(
"Total delete time: "
+
p
roperties
.
deleteTime
);
logger
.
debug
(
"Average write time in microseconds: "
+
(
BenchmarkingProperties
.
writeTime
/
BenchmarkingP
roperties
.
writeCount
));
logger
.
info
(
"Average write time in microseconds: "
+
(
properties
.
writeTime
/
p
roperties
.
writeCount
));
logger
.
debug
(
"Average read time in microseconds: "
+
(
BenchmarkingProperties
.
readTime
/
BenchmarkingP
roperties
.
readCount
));
logger
.
info
(
"Average read time in microseconds: "
+
(
properties
.
readTime
/
p
roperties
.
readCount
));
logger
.
debug
(
"Average delete time in microseconds: "
+
(
BenchmarkingProperties
.
deleteTime
/
BenchmarkingP
roperties
.
deleteCount
));
logger
.
info
(
"Average delete time in microseconds: "
+
(
properties
.
deleteTime
/
p
roperties
.
deleteCount
));
logger
.
debug
(
"Average Time/Request : "
+
BenchmarkingProperties
.
timePerRequest
/(
BenchmarkingProperties
.
writeCount
+
BenchmarkingProperties
.
readCount
+
BenchmarkingP
roperties
.
deleteCount
));
logger
.
info
(
"Average Time/Request : "
+
properties
.
timePerRequest
/(
properties
.
writeCount
+
properties
.
readCount
+
p
roperties
.
deleteCount
));
long
avgTimePerReq
=
BenchmarkingProperties
.
timePerRequest
/(
BenchmarkingProperties
.
writeCount
+
BenchmarkingProperties
.
readCount
+
BenchmarkingP
roperties
.
deleteCount
);
long
avgTimePerReq
=
properties
.
timePerRequest
/(
properties
.
writeCount
+
properties
.
readCount
+
p
roperties
.
deleteCount
);
logger
.
debug
(
"Actual Requests/second : "
+
(
1000000
/
avgTimePerReq
));
logger
.
info
(
"Actual Requests/second : "
+
(
1000000
/
avgTimePerReq
));
}
}
public
void
testWait
(
long
microseconds
){
public
void
testWait
(
long
microseconds
){
// logger.
debug
("Sleeping for " + microseconds + " microseconds.");
// logger.
info
("Sleeping for " + microseconds + " microseconds.");
final
long
INTERVAL
=
microseconds
*
1000
;
final
long
INTERVAL
=
microseconds
*
1000
;
long
start
=
System
.
nanoTime
();
long
start
=
System
.
nanoTime
();
long
end
=
0
;
long
end
=
0
;
do
{
do
{
end
=
System
.
nanoTime
();
end
=
System
.
nanoTime
();
}
while
(
start
+
INTERVAL
>=
end
);
}
while
(
start
+
INTERVAL
>=
end
);
// logger.
debug
(end - start);
// logger.
info
(end - start);
}
}
final
static
Logger
logger
=
Logger
.
getLogger
(
BenchmarkingClient
.
class
);
public
static
void
main
(
String
[]
args
)
throws
Exception
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
BenchmarkingClient
benchmarkingClient
=
new
BenchmarkingClient
();
BenchmarkingClient
benchmarkingClient
=
new
BenchmarkingClient
();
logger
.
debug
(
"Staring to benchmark..."
);
logger
.
info
(
"Staring to benchmark..."
);
benchmarkingClient
.
properties
=
new
Properties
();
benchmarkingClient
.
properties
=
new
Properties
();
InputStream
inputStream
=
new
FileInputStream
(
args
[
0
]);
InputStream
inputStream
=
new
FileInputStream
(
args
[
0
]);
benchmarkingClient
.
properties
.
load
(
inputStream
);
benchmarkingClient
.
properties
.
load
(
inputStream
);
BenchmarkingProperties
.
reads
=
Double
.
valueOf
((
String
)
benchmarkingClient
.
properties
.
get
(
"reads"
));
BenchmarkingProperties
properties
=
new
BenchmarkingProperties
();
BenchmarkingProperties
.
writes
=
Double
.
valueOf
((
String
)
benchmarkingClient
.
properties
.
get
(
"writes"
));
BenchmarkingProperties
.
deletes
=
Double
.
valueOf
((
String
)
benchmarkingClient
.
properties
.
get
(
"deletes"
));
properties
.
reads
=
Double
.
valueOf
(
benchmarkingClient
.
properties
.
getProperty
(
"reads"
));
BenchmarkingProperties
.
clients
=
Integer
.
valueOf
((
String
)
benchmarkingClient
.
properties
.
get
(
"clients"
));
properties
.
writes
=
Double
.
valueOf
(
benchmarkingClient
.
properties
.
getProperty
(
"writes"
));
BenchmarkingProperties
.
requestsPerClient
=
Long
.
valueOf
((
String
)
benchmarkingClient
.
properties
.
get
(
"requests_per_client"
));
properties
.
deletes
=
Double
.
valueOf
(
benchmarkingClient
.
properties
.
getProperty
(
"deletes"
));
BenchmarkingProperties
.
requestsPerSecond
=
Double
.
valueOf
((
String
)
benchmarkingClient
.
properties
.
get
(
"requests_per_second"
));
properties
.
clients
=
Integer
.
valueOf
(
benchmarkingClient
.
properties
.
getProperty
(
"clients"
));
BenchmarkingProperties
.
keySize
=
Integer
.
valueOf
((
String
)
benchmarkingClient
.
properties
.
get
(
"key_size"
));
properties
.
requestsPerClient
=
Long
.
valueOf
(
benchmarkingClient
.
properties
.
getProperty
(
"requests_per_client"
));
BenchmarkingProperties
.
valueSize
=
Integer
.
valueOf
((
String
)
benchmarkingClient
.
properties
.
get
(
"value_size"
));
properties
.
requestsPerSecond
=
Double
.
valueOf
(
benchmarkingClient
.
properties
.
getProperty
(
"requests_per_second"
));
properties
.
keySize
=
Integer
.
valueOf
(
benchmarkingClient
.
properties
.
getProperty
(
"key_size"
));
logger
.
debug
(
"================= Properties loaded for benchmarking ================= "
);
properties
.
valueSize
=
Integer
.
valueOf
(
benchmarkingClient
.
properties
.
getProperty
(
"value_size"
));
logger
.
debug
(
"Percentage of reads: "
+
BenchmarkingProperties
.
reads
*
100
);
logger
.
debug
(
"Percentage of writes: "
+
BenchmarkingProperties
.
writes
*
100
);
logger
.
info
(
"================= Properties loaded for benchmarking ================= "
);
logger
.
debug
(
"Percentage of deletes: "
+
BenchmarkingProperties
.
deletes
*
100
);
logger
.
info
(
"Percentage of reads: "
+
properties
.
reads
*
100
);
logger
.
debug
(
"Clients to be spawned: "
+
BenchmarkingProperties
.
clients
);
logger
.
info
(
"Percentage of writes: "
+
properties
.
writes
*
100
);
logger
.
debug
(
"Requests per client: "
+
BenchmarkingProperties
.
requestsPerClient
);
logger
.
info
(
"Percentage of deletes: "
+
properties
.
deletes
*
100
);
logger
.
debug
(
"Requests per second: "
+
BenchmarkingProperties
.
requestsPerSecond
);
logger
.
info
(
"Clients to be spawned: "
+
properties
.
clients
);
logger
.
debug
(
"Key Size: "
+
BenchmarkingProperties
.
keySize
);
logger
.
info
(
"Requests per client: "
+
properties
.
requestsPerClient
);
logger
.
debug
(
"Value Size: "
+
BenchmarkingProperties
.
valueSize
);
logger
.
info
(
"Requests per second: "
+
properties
.
requestsPerSecond
);
logger
.
debug
(
"====================================================================== "
);
logger
.
info
(
"Key Size: "
+
properties
.
keySize
);
logger
.
info
(
"Value Size: "
+
properties
.
valueSize
);
logger
.
info
(
"====================================================================== "
);
// MetadataClient client = null;
// MetadataClient client = null;
MetadataClient
client
=
new
MetadataClient
(
benchmarkingClient
.
properties
);
Property
metaProperties
=
new
Property
(
"./app.config"
);
ExecutorService
experimentExecutors
=
Executors
.
newFixedThreadPool
(
BenchmarkingProperties
.
clients
);
MetadataClient
client
=
new
MetadataClient
(
metaProperties
);
ExecutorService
experimentExecutors
=
Executors
.
newFixedThreadPool
(
properties
.
clients
);
Set
<
Callable
<
Double
>>
callables
=
new
HashSet
<>();
Set
<
Callable
<
Double
>>
callables
=
new
HashSet
<>();
logger
.
debug
(
"INITIALIZING EXPERIMENT..."
);
logger
.
info
(
"INITIALIZING EXPERIMENT..."
);
for
(
int
i
=
1
;
i
<=
BenchmarkingP
roperties
.
clients
;
i
++)
{
for
(
int
i
=
1
;
i
<=
p
roperties
.
clients
;
i
++)
{
int
finalI
=
i
;
int
finalI
=
i
;
callables
.
add
(()
->
benchmarkingClient
.
runExperiment
(
Integer
.
toString
(
finalI
),
client
));
callables
.
add
(()
->
benchmarkingClient
.
runExperiment
(
Integer
.
toString
(
finalI
),
client
,
properties
));
}
}
Thread
.
sleep
(
5000
);
Thread
.
sleep
(
5000
);
logger
.
debug
(
"STARTING EXPERIMENT..."
);
logger
.
info
(
"STARTING EXPERIMENT..."
);
List
<
Future
<
Double
>>
futures
=
experimentExecutors
.
invokeAll
(
callables
);
List
<
Future
<
Double
>>
futures
=
experimentExecutors
.
invokeAll
(
callables
);
for
(
Future
<
Double
>
future:
futures
)
{
for
(
Future
<
Double
>
future:
futures
)
{
future
.
get
();
future
.
get
();
}
}
logger
.
debug
(
"DONE..."
);
logger
.
info
(
"DONE..."
);
NetworkHandler
networkHandler
=
new
NetworkHandler
();
NetworkHandler
networkHandler
=
new
NetworkHandler
();
networkHandler
.
benchmarkingProperties
.
printStatistics
();
networkHandler
.
benchmarkingProperties
.
printStatistics
();
experimentExecutors
.
shutdown
();
experimentExecutors
.
shutdown
();
}
}
}
}
\ No newline at end of file
code/hpdos_rdma_sal/src/main/java/MetadataClient.java
View file @
731e788d
...
@@ -23,7 +23,7 @@ public class MetadataClient
...
@@ -23,7 +23,7 @@ public class MetadataClient
public
MetadataClient
(
Property
properties
)
throws
Exception
public
MetadataClient
(
Property
properties
)
throws
Exception
{
{
String
invalidationServerIP
=
properties
.
invalidationServerIP
;
String
invalidationServerIP
=
properties
.
invalidationServerIP
;
logger
.
debug
(
"Invalidation server ip: "
+
invalidationServerIP
);
logger
.
info
(
"Invalidation server ip: "
+
invalidationServerIP
);
this
.
cache
=
Caffeine
.
newBuilder
()
this
.
cache
=
Caffeine
.
newBuilder
()
.
expireAfterWrite
(
300
,
TimeUnit
.
SECONDS
)
.
expireAfterWrite
(
300
,
TimeUnit
.
SECONDS
)
...
@@ -37,14 +37,15 @@ public class MetadataClient
...
@@ -37,14 +37,15 @@ public class MetadataClient
public
byte
[]
get
(
byte
[]
key
)
throws
Exception
public
byte
[]
get
(
byte
[]
key
)
throws
Exception
{
{
logger
.
debug
(
"Getting From local cache"
);
logger
.
info
(
"Getting From local cache"
);
byte
[]
value
=
this
.
cache
.
getIfPresent
(
new
ByteArray
(
key
));
byte
[]
value
=
this
.
cache
.
getIfPresent
(
new
ByteArray
(
key
));
if
(
value
!=
null
)
if
(
value
!=
null
)
return
value
;
return
value
;
logger
.
debug
(
"Not found in cache Getting from server"
);
logger
.
info
(
"Not found in cache Getting from server"
);
var
futureResponse
=
networkHandlerM
.
get
(
key
).
get
(
1
00
,
TimeUnit
.
MILLISECONDS
);
//var futureResponse = networkHandlerM.get(key).get(10
00, TimeUnit.MILLISECONDS);
var
futureResponse
=
networkHandlerM
.
get
(
key
).
get
();
if
(
futureResponse
.
getAck
()
==
AckType
.
SUCCESS_WITH_VALUE
){
if
(
futureResponse
.
getAck
()
==
AckType
.
SUCCESS_WITH_VALUE
){
System
.
out
.
println
(
"Adding to cache"
);
System
.
out
.
println
(
"Adding to cache"
);
this
.
cache
.
put
(
new
ByteArray
(
key
),
futureResponse
.
getValue
());
this
.
cache
.
put
(
new
ByteArray
(
key
),
futureResponse
.
getValue
());
...
@@ -55,8 +56,9 @@ public class MetadataClient
...
@@ -55,8 +56,9 @@ public class MetadataClient
public
int
put
(
byte
[]
key
,
byte
[]
value
)
throws
Exception
public
int
put
(
byte
[]
key
,
byte
[]
value
)
throws
Exception
{
{
var
response
=
networkHandlerM
.
put
(
key
,
value
).
get
(
100
,
TimeUnit
.
MILLISECONDS
);
//var response = networkHandlerM.put(key,value).get(1000, TimeUnit.MILLISECONDS);
logger
.
debug
(
"Putting KV pair on server ...."
);
var
response
=
networkHandlerM
.
put
(
key
,
value
).
get
();
logger
.
info
(
"Putting KV pair on server ...."
);
if
(
response
.
getAck
()
==
AckType
.
SUCCESS
){
if
(
response
.
getAck
()
==
AckType
.
SUCCESS
){
this
.
cache
.
put
(
new
ByteArray
(
key
),
value
);
this
.
cache
.
put
(
new
ByteArray
(
key
),
value
);
System
.
out
.
println
(
"Updating cache.."
);
System
.
out
.
println
(
"Updating cache.."
);
...
@@ -66,9 +68,10 @@ public class MetadataClient
...
@@ -66,9 +68,10 @@ public class MetadataClient
public
int
delete
(
byte
[]
key
)
throws
Exception
public
int
delete
(
byte
[]
key
)
throws
Exception
{
{
logger
.
debug
(
"Invalidating Local Cache and sending delete request to server"
);
logger
.
info
(
"Invalidating Local Cache and sending delete request to server"
);
//this.cache.invalidate(key);
//this.cache.invalidate(key);
var
response
=
this
.
networkHandlerM
.
delete
(
key
).
get
(
100
,
TimeUnit
.
MILLISECONDS
);
// var response = this.networkHandlerM.delete(key).get(1000,TimeUnit.MILLISECONDS);
var
response
=
this
.
networkHandlerM
.
delete
(
key
).
get
();
return
response
.
getAck
();
return
response
.
getAck
();
}
}
}
}
\ No newline at end of file
code/hpdos_rdma_sal/src/main/java/StandaloneRDMAClient.java
View file @
731e788d
...
@@ -10,13 +10,12 @@ public class StandaloneRDMAClient {
...
@@ -10,13 +10,12 @@ public class StandaloneRDMAClient {
public
static
void
main
(
String
[]
args
)
throws
Exception
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Scanner
scanner
=
new
Scanner
(
System
.
in
);
Scanner
scanner
=
new
Scanner
(
System
.
in
);
System
.
out
.
println
(
"Enter Invalidation Server IP to bind"
);
Property
properties
=
new
Property
(
args
[
0
]);
Property
properties
=
new
Property
(
args
[
0
]);
// String address = scanner.nextLine();
// String address = scanner.nextLine();
System
.
out
.
println
(
"prepare"
);
MetadataClient
client
=
new
MetadataClient
(
properties
);
MetadataClient
client
=
new
MetadataClient
(
properties
);
System
.
out
.
println
(
"starting"
);
int
option
;
int
option
;
while
(
true
)
{
while
(
true
)
{
System
.
out
.
println
(
"Enter 1. for read and 2 for write 3 for delete: "
);
System
.
out
.
println
(
"Enter 1. for read and 2 for write 3 for delete: "
);
...
...
code/hpdos_rdma_sal/src/main/java/hpdos/handlers/NetworkHandlerM.java
View file @
731e788d
...
@@ -24,6 +24,8 @@ public class NetworkHandlerM {
...
@@ -24,6 +24,8 @@ public class NetworkHandlerM {
public
NetworkHandlerM
(
Property
properties
)
throws
Exception
public
NetworkHandlerM
(
Property
properties
)
throws
Exception
{
{
streams
=
new
ArrayList
<>();
eps
=
new
ArrayList
<>();
RpcProtocol
rpcProtocol
=
new
RpcProtocol
();
RpcProtocol
rpcProtocol
=
new
RpcProtocol
();
group
=
DaRPCClientGroup
.
createClientGroup
(
rpcProtocol
,
100
,
0
,
16
,
16
);
group
=
DaRPCClientGroup
.
createClientGroup
(
rpcProtocol
,
100
,
0
,
16
,
16
);
for
(
var
ip
:
properties
.
masters
)
for
(
var
ip
:
properties
.
masters
)
...
...
code/hpdos_rdma_sal/src/main/java/hpdos/invalidationServer/InvalidationServer.java
View file @
731e788d
...
@@ -49,12 +49,12 @@ public class InvalidationServer
...
@@ -49,12 +49,12 @@ public class InvalidationServer
try
try
{
{
int
conns
=
numOfConnections
;
int
conns
=
numOfConnections
;
logger
.
debug
(
"Waiting for server to send connection request for invalidation"
);
logger
.
info
(
"Waiting for server to send connection request for invalidation"
);
while
(
conns
>
0
)
{
while
(
conns
>
0
)
{
serverEp
.
accept
();
serverEp
.
accept
();
conns
--
;
conns
--
;
}
}
logger
.
debug
(
"Got Connected for Invalidation"
);
logger
.
info
(
"Got Connected for Invalidation"
);
}
}
catch
(
Exception
e
){
catch
(
Exception
e
){
e
.
printStackTrace
();
e
.
printStackTrace
();
...
@@ -63,7 +63,7 @@ public class InvalidationServer
...
@@ -63,7 +63,7 @@ public class InvalidationServer
var
t
=
new
Thread
(
runnable
);
var
t
=
new
Thread
(
runnable
);
t
.
setName
(
"Server Connection"
);
t
.
setName
(
"Server Connection"
);
t
.
start
();
t
.
start
();
logger
.
debug
(
"started Invalidation server"
);
logger
.
info
(
"started Invalidation server"
);
}
}
public
void
sendInvalidationRegistrationRequest
(
String
masterIpAddress
,
String
hostIpAddress
,
NetworkHandler
networkHandler
)
throws
IOException
,
ClassNotFoundException
{
public
void
sendInvalidationRegistrationRequest
(
String
masterIpAddress
,
String
hostIpAddress
,
NetworkHandler
networkHandler
)
throws
IOException
,
ClassNotFoundException
{
...
...
code/hpdos_rdma_sal/src/main/java/hpdos/protocol/BenchmarkingProperties.java
View file @
731e788d
...
@@ -25,24 +25,24 @@ public class BenchmarkingProperties {
...
@@ -25,24 +25,24 @@ public class BenchmarkingProperties {
final
static
Logger
logger
=
Logger
.
getLogger
(
BenchmarkingProperties
.
class
);
final
static
Logger
logger
=
Logger
.
getLogger
(
BenchmarkingProperties
.
class
);
public
void
printStatistics
(){
public
void
printStatistics
(){
logger
.
debug
(
"Total writes: "
+
writeCount
);
logger
.
info
(
"Total writes: "
+
writeCount
);
logger
.
debug
(
"Total reads: "
+
readCount
);
logger
.
info
(
"Total reads: "
+
readCount
);
logger
.
debug
(
"Total deletes: "
+
deleteCount
);
logger
.
info
(
"Total deletes: "
+
deleteCount
);
logger
.
debug
(
"Total write time: "
+
writeTime
);
logger
.
info
(
"Total write time: "
+
writeTime
);
logger
.
debug
(
"Total read time: "
+
readTime
);
logger
.
info
(
"Total read time: "
+
readTime
);
logger
.
debug
(
"Total delete time: "
+
deleteTime
);
logger
.
info
(
"Total delete time: "
+
deleteTime
);
logger
.
debug
(
"Average write time in microseconds: "
+
(
writeTime
/
writeCount
));
logger
.
info
(
"Average write time in microseconds: "
+
(
writeTime
/
writeCount
));
logger
.
debug
(
"Average read time in microseconds: "
+
(
readTime
/
readCount
));
logger
.
info
(
"Average read time in microseconds: "
+
(
readTime
/
readCount
));
logger
.
debug
(
"Average delete time in microseconds: "
+
(
deleteTime
/
deleteCount
));
logger
.
info
(
"Average delete time in microseconds: "
+
(
deleteTime
/
deleteCount
));
// logger.
debug
("Average Time/Request : " + timePerRequest/(writeCount + readCount + deleteCount));
// logger.
info
("Average Time/Request : " + timePerRequest/(writeCount + readCount + deleteCount));
// long avgTimePerReq = timePerRequest/(writeCount + readCount + deleteCount);
// long avgTimePerReq = timePerRequest/(writeCount + readCount + deleteCount);
// logger.
debug
("Actual Requests/second : " + (1000000/avgTimePerReq));
// logger.
info
("Actual Requests/second : " + (1000000/avgTimePerReq));
}
}
public
void
collectExperimentStatistics
(
long
time
,
int
type
){
public
void
collectExperimentStatistics
(
long
time
,
int
type
){
// logger.
debug
("Time: " + time);
// logger.
info
("Time: " + time);
if
(
type
==
100
)
{
if
(
type
==
100
)
{
writeCount
++;
writeCount
++;
writeTime
=
writeTime
+
time
;
writeTime
=
writeTime
+
time
;
...
...
code/hpdos_rdma_sal/src/main/java/hpdos/protocol/ByteArray.java
View file @
731e788d
...
@@ -3,9 +3,12 @@ package hpdos.protocol;
...
@@ -3,9 +3,12 @@ package hpdos.protocol;
public
class
ByteArray
{
public
class
ByteArray
{
public
byte
[]
key
;
public
byte
[]
key
;
private
int
h
;
public
ByteArray
(
byte
[]
key
)
public
ByteArray
(
byte
[]
key
)
{
{
this
.
key
=
key
;
this
.
key
=
key
;
System
.
out
.
println
(
"new butearray"
);
h
=
0
;
}
}
@Override
@Override
public
boolean
equals
(
Object
other
)
public
boolean
equals
(
Object
other
)
...
@@ -14,18 +17,28 @@ public class ByteArray {
...
@@ -14,18 +17,28 @@ public class ByteArray {
return
false
;
return
false
;
if
(
this
.
key
==
other
)
if
(
this
.
key
==
other
)
return
true
;
return
true
;
ByteArray
b
=
(
ByteArray
)
other
;
ByteArray
b
=
(
ByteArray
)
other
;
int
length
=
b
.
key
.
length
;
int
length
=
b
.
key
.
length
;
if
(
this
.
key
.
length
!=
length
)
if
(
this
.
key
.
length
!=
length
)
return
false
;
return
false
;
for
(
int
i
=
0
;
i
<
length
;
i
++)
for
(
int
i
=
0
;
i
<
length
;
i
++)
if
(
this
.
key
[
i
]
!=
b
.
key
[
i
])
if
(
this
.
key
[
i
]
!=
b
.
key
[
i
])
return
false
;
return
false
;
return
true
;
return
true
;
}
}
@Override
public
int
hashCode
()
{
if
(
h
==
0
&&
key
.
length
>
0
)
{
for
(
int
i
=
0
;
i
<
key
.
length
;
i
++)
{
h
=
31
*
h
+
key
[
i
];
}
}
return
h
;
}
}
}
code/hpdos_rdma_sal/src/main/java/hpdos/protocol/Property.java
View file @
731e788d
...
@@ -25,7 +25,7 @@ public class Property {
...
@@ -25,7 +25,7 @@ public class Property {
masters
=
new
String
[
numberOfMasters
];
masters
=
new
String
[
numberOfMasters
];
for
(
int
i
=
1
;
i
<=
numberOfMasters
;
i
++)
for
(
int
i
=
1
;
i
<=
numberOfMasters
;
i
++)
{
{
masters
[
i
]
=
properties
.
getProperty
(
"app.MASTER_HOST"
+
i
);
masters
[
i
-
1
]
=
properties
.
getProperty
(
"app.MASTER_HOST"
+
i
);
}
}
}
}
...
...
code/hpdos_rdma_sal/src/main/java/hpdos/services/InvalidationService.java
View file @
731e788d
...
@@ -27,7 +27,7 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
...
@@ -27,7 +27,7 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
@Override
@Override
public
void
processServerEvent
(
DaRPCServerEvent
<
InvalidationRequest
,
InvalidationResponse
>
event
)
throws
IOException
public
void
processServerEvent
(
DaRPCServerEvent
<
InvalidationRequest
,
InvalidationResponse
>
event
)
throws
IOException
{
{
logger
.
debug
(
"Got Invalidation Request"
);
logger
.
info
(
"Got Invalidation Request"
);
InvalidationRequest
request
=
event
.
getReceiveMessage
();
InvalidationRequest
request
=
event
.
getReceiveMessage
();
InvalidationResponse
response
=
event
.
getSendMessage
();
InvalidationResponse
response
=
event
.
getSendMessage
();
try
try
...
@@ -46,17 +46,17 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
...
@@ -46,17 +46,17 @@ public class InvalidationService extends InvalidationRpcProtocol implements DaRP
public
void
open
(
DaRPCServerEndpoint
<
InvalidationRequest
,
InvalidationResponse
>
serverEp
)
public
void
open
(
DaRPCServerEndpoint
<
InvalidationRequest
,
InvalidationResponse
>
serverEp
)
{
{
logger
.
debug
(
"Recieved New Connection for invalidation"
);
logger
.
info
(
"Recieved New Connection for invalidation"
);
try
{
try
{
logger
.
debug
(
serverEp
.
getDstAddr
());
logger
.
info
(
serverEp
.
getDstAddr
());
}
catch
(
Exception
e
){}
}
catch
(
Exception
e
){}
}
}
public
void
close
(
DaRPCServerEndpoint
<
InvalidationRequest
,
InvalidationResponse
>
serverEp
)
public
void
close
(
DaRPCServerEndpoint
<
InvalidationRequest
,
InvalidationResponse
>
serverEp
)
{
{
logger
.
debug
(
"Closing Connection for invalidation"
);
logger
.
info
(
"Closing Connection for invalidation"
);
try
{
try
{
logger
.
debug
(
serverEp
.
getDstAddr
());
logger
.
info
(
serverEp
.
getDstAddr
());
}
catch
(
Exception
e
){}
}
catch
(
Exception
e
){}
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment