Commit 3a2d9462 authored by NILANJAN DAW's avatar NILANJAN DAW

trying out distributed mode

parent 421e3afe
...@@ -32,16 +32,18 @@ public class ClientRunner { ...@@ -32,16 +32,18 @@ public class ClientRunner {
private final Properties properties; private final Properties properties;
private final List<StorageModel> generatedPacket; private final List<StorageModel> generatedPacket;
private Semaphore limiter = null; private Semaphore limiter = null;
private final int cpuCount;
public ClientRunner() { public ClientRunner() {
clientID = UUID.randomUUID().toString(); clientID = UUID.randomUUID().toString();
generatedPacket = Collections.synchronizedList(new ArrayList<>()); generatedPacket = Collections.synchronizedList(new ArrayList<>());
properties = new Properties(); properties = new Properties();
cpuCount = Runtime.getRuntime().availableProcessors();
try { try {
InputStream inputStream = new FileInputStream(propertiesFile); InputStream inputStream = new FileInputStream(propertiesFile);
this.properties.load(inputStream); this.properties.load(inputStream);
System.out.println((String) properties.get("app.concurrency")); System.out.println((String) properties.get("app.concurrency"));
concurrency = Integer.parseInt((String) properties.get("app.concurrency")); concurrency = Integer.parseInt((String) properties.get("app.concurrency"));
this.limiter = new Semaphore(concurrency); this.limiter = new Semaphore(concurrency / cpuCount);
runtime = Integer.parseInt((String) properties.get("app.runtime")); runtime = Integer.parseInt((String) properties.get("app.runtime"));
this.cCreate = Integer.parseInt((String) properties.get("app.cycle_create")); this.cCreate = Integer.parseInt((String) properties.get("app.cycle_create"));
...@@ -152,9 +154,7 @@ public class ClientRunner { ...@@ -152,9 +154,7 @@ public class ClientRunner {
// System.out.println("packet " + packet); // System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) if (response.getStatus() == MessageConstants.STATUS_OK)
if (!sendPacket.getKey().equals(response.getAck().getKey())) if (sendPacket.getKey().equals(response.getAck().getKey())) {
System.out.println("read kand ho gaya");
else {
synchronized (sendPacket) { synchronized (sendPacket) {
sendPacket.updateData(response.getAck()); sendPacket.updateData(response.getAck());
} }
...@@ -182,9 +182,7 @@ public class ClientRunner { ...@@ -182,9 +182,7 @@ public class ClientRunner {
// System.out.println("packet " + packet); // System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) if (response.getStatus() == MessageConstants.STATUS_OK)
if (!sendPacket.getKey().equals(response.getAck().getKey())) if (sendPacket.getKey().equals(response.getAck().getKey())) {
System.out.println("update kand ho gaya");
else {
synchronized (sendPacket) { synchronized (sendPacket) {
sendPacket.updateData(response.getAck()); sendPacket.updateData(response.getAck());
} }
...@@ -210,9 +208,7 @@ public class ClientRunner { ...@@ -210,9 +208,7 @@ public class ClientRunner {
// System.out.println("packet " + packet); // System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) if (response.getStatus() == MessageConstants.STATUS_OK)
if (!sendPacket.getKey().equals(response.getAck().getKey())) if (sendPacket.getKey().equals(response.getAck().getKey())) {
System.out.println("delete kand ho gaya");
else {
synchronized (sendPacket) { synchronized (sendPacket) {
generatedPacket.remove(sendPacket); generatedPacket.remove(sendPacket);
} }
...@@ -296,7 +292,7 @@ public class ClientRunner { ...@@ -296,7 +292,7 @@ public class ClientRunner {
latch.await(); latch.await();
storageService.cleanup(); storageService.cleanup();
} }
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException, ExecutionException {
propertiesFile = args[0]; propertiesFile = args[0];
ClientRunner clientRunner = new ClientRunner(); ClientRunner clientRunner = new ClientRunner();
System.out.println(clientRunner.getGreeting()); System.out.println(clientRunner.getGreeting());
...@@ -317,7 +313,15 @@ public class ClientRunner { ...@@ -317,7 +313,15 @@ public class ClientRunner {
clientRunner.experimentEnded = true; clientRunner.experimentEnded = true;
System.out.println("Experiment ended"); System.out.println("Experiment ended");
}); });
clientRunner.runExperiment(storageService); ExecutorService experimentExecutors = Executors.newFixedThreadPool(clientRunner.cpuCount);
Set<Callable<Double>> callables = new HashSet<>();
for (int i = 0; i < clientRunner.cpuCount; i++) {
callables.add(() -> clientRunner.runExperiment(storageService));
}
List<Future<Double>> futures = experimentExecutors.invokeAll(callables);
for (Future<Double> future: futures) {
future.get();
}
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
System.out.println("Waiting for system to settle down"); System.out.println("Waiting for system to settle down");
Thread.sleep(2000); Thread.sleep(2000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment