Commit 514ff4ef authored by NILANJAN DAW's avatar NILANJAN DAW

Added a Client Loader cache to sit intercept network calls and serve data.

parent 20f865e8
<component name="CopyrightManager">
<copyright>
<option name="notice" value="Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay&#10;&#10;Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);&#10;you may not use this file except in compliance with the License.&#10;You may obtain a copy of the License at&#10;&#10; http://www.apache.org/licenses/LICENSE-2.0&#10;&#10;Unless required by applicable law or agreed to in writing, software&#10;distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;See the License for the specific language governing permissions and&#10;limitations under the License." />
<option name="myName" value="apache" />
</copyright>
</component>
\ No newline at end of file
<component name="CopyrightManager">
<settings>
<module2copyright>
<element module="All" copyright="apache" />
</module2copyright>
</settings>
</component>
\ No newline at end of file
app.name="HPDOS-Client" app.name="HPDOS-Client"
app.version="0.1.4" app.version="0.1.4"
app.mode=rps
app.thread_count=10
app.rps=1000
app.concurrency=5 app.concurrency=5
app.runtime=10 app.runtime=10
app.data_size=10 app.data_size=10
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* /*
* This file was generated by the Gradle 'init' task. * This file was generated by the Gradle 'init' task.
* *
...@@ -26,6 +42,8 @@ dependencies { ...@@ -26,6 +42,8 @@ dependencies {
// This dependency is used by the application. // This dependency is used by the application.
implementation 'com.google.guava:guava:29.0-jre' implementation 'com.google.guava:guava:29.0-jre'
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.15.6' implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.15.6'
// https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine
implementation group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.0.1'
implementation 'io.grpc:grpc-netty-shaded:1.36.0' implementation 'io.grpc:grpc-netty-shaded:1.36.0'
implementation 'io.grpc:grpc-protobuf:1.36.0' implementation 'io.grpc:grpc-protobuf:1.36.0'
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* /*
* This Java source file was generated by the Gradle 'init' task. * This Java source file was generated by the Gradle 'init' task.
*/ */
package HpdosClient; package HpdosClient;
import HpdosClient.MessageFormat.MessageConstants; import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.lib.StorageHandler;
import HpdosClient.lib.StorageModel; import HpdosClient.lib.StorageModel;
import HpdosClient.lib.StorageService; import HpdosClient.lib.StorageService;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import hpdos.grpc.*; import hpdos.grpc.Follower;
import io.grpc.ManagedChannel; import hpdos.grpc.Packet;
import io.grpc.ManagedChannelBuilder; import hpdos.grpc.Response;
import io.grpc.netty.shaded.io.netty.util.concurrent.FutureListener;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
...@@ -32,18 +49,25 @@ public class ClientRunner { ...@@ -32,18 +49,25 @@ public class ClientRunner {
private final Properties properties; private final Properties properties;
private final List<StorageModel> generatedPacket; private final List<StorageModel> generatedPacket;
private Semaphore limiter = null; private Semaphore limiter = null;
private final int cpuCount; private int threadCount;
private String mode;
private int rps;
AtomicDouble openCount;
public ClientRunner() { public ClientRunner() {
clientID = UUID.randomUUID().toString(); clientID = UUID.randomUUID().toString();
generatedPacket = Collections.synchronizedList(new ArrayList<>()); generatedPacket = Collections.synchronizedList(new ArrayList<>());
properties = new Properties(); properties = new Properties();
cpuCount = Runtime.getRuntime().availableProcessors(); threadCount = Runtime.getRuntime().availableProcessors();
try { try {
openCount = new AtomicDouble(0);
InputStream inputStream = new FileInputStream(propertiesFile); InputStream inputStream = new FileInputStream(propertiesFile);
this.properties.load(inputStream); this.properties.load(inputStream);
System.out.println((String) properties.get("app.concurrency")); System.out.println((String) properties.get("app.concurrency"));
concurrency = Integer.parseInt((String) properties.get("app.concurrency")); concurrency = Integer.parseInt((String) properties.get("app.concurrency"));
this.limiter = new Semaphore(concurrency / cpuCount); this.rps = Integer.parseInt((String) properties.get("app.rps"));
this.mode = (String) properties.get("app.mode");
this.threadCount = Integer.parseInt((String) properties.get("app.thread_count"));
this.limiter = new Semaphore(concurrency);
runtime = Integer.parseInt((String) properties.get("app.runtime")); runtime = Integer.parseInt((String) properties.get("app.runtime"));
this.cCreate = Integer.parseInt((String) properties.get("app.cycle_create")); this.cCreate = Integer.parseInt((String) properties.get("app.cycle_create"));
...@@ -88,7 +112,6 @@ public class ClientRunner { ...@@ -88,7 +112,6 @@ public class ClientRunner {
try { try {
Packet packet = res.get(); Packet packet = res.get();
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
// System.out.println("seed: " + response);
if (response.getStatus() == MessageConstants.STATUS_OK) { if (response.getStatus() == MessageConstants.STATUS_OK) {
model.updateData(response.getAck()); model.updateData(response.getAck());
generatedPacket.add(model); generatedPacket.add(model);
...@@ -101,9 +124,30 @@ public class ClientRunner { ...@@ -101,9 +124,30 @@ public class ClientRunner {
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
latch.await(); latch.await();
// for (int i = 0; i < 100; i++) {
// int index = (int) (Math.random() * generatedPacket.size());
// try {
// Stopwatch stopwatch = Stopwatch.createUnstarted();
// stopwatch.start();
// ListenableFuture<StorageModel> res = storageService.read(generatedPacket.get(index).getKey());
// res.addListener(() -> {
// stopwatch.stop();
// try {
// System.out.println("Time: " + stopwatch + " model: " + res.get());
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// stopwatch.reset();
// }, MoreExecutors.directExecutor());
// } catch (ExecutionException | IOException e) {
// System.out.println("resource not found");
// }
// }
// Thread.sleep(2000);
} }
public double runExperiment(StorageService storageService) throws InterruptedException {
public double runExperimentByConcurrency(StorageService storageService) throws InterruptedException, IOException, ExecutionException {
String value = createString(), updatedValue = createString(); String value = createString(), updatedValue = createString();
double totalBracket = this.cCreate + this.cRead + this.cUpdate + this.cDelete; double totalBracket = this.cCreate + this.cRead + this.cUpdate + this.cDelete;
...@@ -113,7 +157,6 @@ public class ClientRunner { ...@@ -113,7 +157,6 @@ public class ClientRunner {
readBracket = createBracket + this.cRead / totalBracket; readBracket = createBracket + this.cRead / totalBracket;
updateBracket = readBracket + this.cUpdate / totalBracket; updateBracket = readBracket + this.cUpdate / totalBracket;
deleteBracket = updateBracket + this.cDelete / totalBracket; deleteBracket = updateBracket + this.cDelete / totalBracket;
System.out.println("Starting experiment");
do { do {
limiter.acquire(); limiter.acquire();
double toss = Math.random(); double toss = Math.random();
...@@ -127,6 +170,7 @@ public class ClientRunner { ...@@ -127,6 +170,7 @@ public class ClientRunner {
res.addListener(() -> { res.addListener(() -> {
try { try {
Packet packet = res.get(); Packet packet = res.get();
this.createTime.add(System.currentTimeMillis() - createStart);
// System.out.println("packet " + packet); // System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) { if (response.getStatus() == MessageConstants.STATUS_OK) {
...@@ -139,18 +183,45 @@ public class ClientRunner { ...@@ -139,18 +183,45 @@ public class ClientRunner {
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
this.createTime.add(System.currentTimeMillis() - createStart);
limiter.release(); limiter.release();
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} else if (toss < readBracket) { } else if (toss < readBracket) {
// System.out.println("read"); // System.out.println("read");
int index; StorageModel sendPacket;
synchronized (generatedPacket) {
index = (int) (Math.random() * generatedPacket.size());
sendPacket = this.generatedPacket.get(index);
}
final long readStart = System.currentTimeMillis();
ListenableFuture<StorageModel> res = storageService.read(sendPacket.getKey());
res.addListener(() -> {
try {
StorageModel model = res.get();
readTime.add(System.currentTimeMillis() - readStart);
// System.out.println("packet " + packet);
synchronized (sendPacket) {
sendPacket.updateData(model);
}
//
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
limiter.release();
}, MoreExecutors.directExecutor());
} else if (toss < updateBracket) {
// System.out.println("update");
int index = (int) (Math.random() * generatedPacket.size()); int index = (int) (Math.random() * generatedPacket.size());
StorageModel sendPacket = this.generatedPacket.get(index); StorageModel sendPacket = this.generatedPacket.get(index);
final long readStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.read(sendPacket.getKey()); final long updateStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.update(sendPacket.getKey(), updatedValue,
sendPacket.getVersion());
res.addListener(() -> { res.addListener(() -> {
try { try {
Packet packet = res.get(); Packet packet = res.get();
updateTime.add(System.currentTimeMillis() - updateStart);
// System.out.println("packet " + packet); // System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) if (response.getStatus() == MessageConstants.STATUS_OK)
...@@ -162,23 +233,130 @@ public class ClientRunner { ...@@ -162,23 +233,130 @@ public class ClientRunner {
// else // else
// System.out.println("error packet " + response); // System.out.println("error packet " + response);
} }
limiter.release();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
readTime.add(System.currentTimeMillis() - readStart);
limiter.release();
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} else if (toss < updateBracket) { } else {
// System.out.println("update"); // System.out.println("delete");
int index = (int) (Math.random() * generatedPacket.size()); int index = (int) (Math.random() * generatedPacket.size());
StorageModel sendPacket = this.generatedPacket.get(index); StorageModel sendPacket = this.generatedPacket.get(index);
final long deleteStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.delete(sendPacket.getKey(), sendPacket.getVersion());
res.addListener(() -> {
try {
Packet packet = res.get();
deleteTime.add(System.currentTimeMillis() - deleteStart);
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK)
if (sendPacket.getKey().equals(response.getAck().getKey())) {
synchronized (generatedPacket) {
generatedPacket.remove(sendPacket);
}
}
// else
// System.out.println("error packet " + response);
}
limiter.release();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, MoreExecutors.directExecutor());
// deleteTime.add(System.currentTimeMillis() - timestampDeleteStart);
}
} while (!this.experimentEnded);
// System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) +
// "ms qps " + qps);
return 0;
}
public double runExperimentByRps(StorageService storageService) throws InterruptedException, ExecutionException {
String value = createString(), updatedValue = createString();
double totalBracket = this.cCreate + this.cRead + this.cUpdate + this.cDelete;
double createBracket, readBracket, updateBracket, deleteBracket;
createBracket = this.cCreate * 1.0 / totalBracket;
readBracket = createBracket + this.cRead / totalBracket;
updateBracket = readBracket + this.cUpdate / totalBracket;
deleteBracket = updateBracket + this.cDelete / totalBracket;
long sleepTime = (threadCount * 1000L) / this.rps;
Thread.sleep((long)(Math.random() * 500)); // randomizing start time to avoid spikes in request
do {
openCount.addAndGet(1);
double toss = Math.random();
if (toss < createBracket) {
// System.out.println("create");
String key = Integer.toString((int) (Math.random() * Integer.MAX_VALUE));
StorageModel model = new StorageModel(0, value.length(), key,
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, 0, value);
final long createStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.create(model.getKey(), model.getValue(), model.getAccessType());
res.addListener(() -> {
try {
Packet packet = res.get();
this.createTime.add(System.currentTimeMillis() - createStart);
// System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) {
model.updateData(response.getAck());
generatedPacket.add(model);
}
// else
// System.out.println("error packet " + response);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
openCount.addAndGet(-1);
}, MoreExecutors.directExecutor());
} else if (toss < readBracket) {
// System.out.println("read");
int index; StorageModel sendPacket;
synchronized (generatedPacket) {
index = (int) (Math.random() * generatedPacket.size());
sendPacket = this.generatedPacket.get(index);
}
final long readStart = System.currentTimeMillis();
try {
ListenableFuture<StorageModel> res = storageService.read(sendPacket.getKey());
res.addListener(() -> {
try {
StorageModel model = res.get();
readTime.add(System.currentTimeMillis() - readStart);
synchronized (sendPacket) {
sendPacket.updateData(model);
}
} catch (Exception e) {
System.out.println(e.getMessage() + " " + sendPacket.getKey());
}
openCount.addAndGet(-1);
}, MoreExecutors.directExecutor());
} catch (IOException e) {
e.printStackTrace();
}
} else if (toss < updateBracket) {
// System.out.println("update");
int index; StorageModel sendPacket;
synchronized (generatedPacket) {
index = (int) (Math.random() * generatedPacket.size());
sendPacket = this.generatedPacket.get(index);
}
final long updateStart = System.currentTimeMillis(); final long updateStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.update(sendPacket.getKey(), updatedValue, ListenableFuture<Packet> res = storageService.update(sendPacket.getKey(), updatedValue,
sendPacket.getVersion()); sendPacket.getVersion());
res.addListener(() -> { res.addListener(() -> {
try { try {
Packet packet = res.get(); Packet packet = res.get();
updateTime.add(System.currentTimeMillis() - updateStart);
// System.out.println("packet " + packet); // System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) if (response.getStatus() == MessageConstants.STATUS_OK)
...@@ -190,46 +368,53 @@ public class ClientRunner { ...@@ -190,46 +368,53 @@ public class ClientRunner {
// else // else
// System.out.println("error packet " + response); // System.out.println("error packet " + response);
} }
updateTime.add(System.currentTimeMillis() - updateStart);
limiter.release(); openCount.addAndGet(-1);
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} else { } else {
// System.out.println("delete"); // System.out.println("delete");
int index = (int) (Math.random() * generatedPacket.size()); int index; StorageModel sendPacket;
StorageModel sendPacket = this.generatedPacket.get(index); synchronized (generatedPacket) {
index = (int) (Math.random() * generatedPacket.size());
sendPacket = this.generatedPacket.get(index);
}
final long deleteStart = System.currentTimeMillis(); final long deleteStart = System.currentTimeMillis();
ListenableFuture<Packet> res = storageService.delete(sendPacket.getKey(), sendPacket.getVersion()); ListenableFuture<Packet> res = storageService.delete(sendPacket.getKey(), sendPacket.getVersion());
res.addListener(() -> { res.addListener(() -> {
try { try {
Packet packet = res.get(); Packet packet = res.get();
deleteTime.add(System.currentTimeMillis() - deleteStart);
// System.out.println("packet " + packet); // System.out.println("packet " + packet);
for (Response response: packet.getResponseList()) { for (Response response: packet.getResponseList()) {
if (response.getStatus() == MessageConstants.STATUS_OK) if (response.getStatus() == MessageConstants.STATUS_OK)
if (sendPacket.getKey().equals(response.getAck().getKey())) { if (sendPacket.getKey().equals(response.getAck().getKey())) {
synchronized (sendPacket) { synchronized (generatedPacket) {
generatedPacket.remove(sendPacket); generatedPacket.remove(sendPacket);
} }
} }
// else // else
// System.out.println("error packet " + response); // System.out.println("error packet " + response);
} }
deleteTime.add(System.currentTimeMillis() - deleteStart); openCount.addAndGet(-1);
limiter.release();
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
// deleteTime.add(System.currentTimeMillis() - timestampDeleteStart); // deleteTime.add(System.currentTimeMillis() - timestampDeleteStart);
} }
while (openCount.get() > this.rps)
Thread.sleep(sleepTime);
Thread.sleep(sleepTime);
} while (!this.experimentEnded); } while (!this.experimentEnded);
// System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) + // System.out.println(id + "runtime " + (System.currentTimeMillis() - startTime) +
// "ms qps " + qps); // "ms qps " + qps);
return 0; return 0;
} }
private void timerService() { private void timerService() {
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start(); stopwatch.start();
...@@ -241,7 +426,7 @@ public class ClientRunner { ...@@ -241,7 +426,7 @@ public class ClientRunner {
timer.cancel(); timer.cancel();
timer.purge(); timer.purge();
} }
System.out.println("Experiment ran: " + stopwatch); System.out.println("Experiment ran: " + stopwatch + " openCount: " + openCount);
} }
}, 5000, 5000); }, 5000, 5000);
} }
...@@ -300,7 +485,7 @@ public class ClientRunner { ...@@ -300,7 +485,7 @@ public class ClientRunner {
storageService.initStorage(); storageService.initStorage();
System.out.println("storage initialised"); System.out.println("storage initialised");
clientRunner.seedServer(storageService, clientRunner.createString()); clientRunner.seedServer(storageService, clientRunner.createString());
System.out.println("server seeded" + clientRunner.generatedPacket); System.out.println("server seeded. Length: " + clientRunner.generatedPacket.size());
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
clientRunner.timerService(); clientRunner.timerService();
ExecutorService executorService = Executors.newSingleThreadExecutor(); ExecutorService executorService = Executors.newSingleThreadExecutor();
...@@ -313,10 +498,14 @@ public class ClientRunner { ...@@ -313,10 +498,14 @@ public class ClientRunner {
clientRunner.experimentEnded = true; clientRunner.experimentEnded = true;
System.out.println("Experiment ended"); System.out.println("Experiment ended");
}); });
ExecutorService experimentExecutors = Executors.newFixedThreadPool(clientRunner.cpuCount); ExecutorService experimentExecutors = Executors.newFixedThreadPool(clientRunner.threadCount);
Set<Callable<Double>> callables = new HashSet<>(); Set<Callable<Double>> callables = new HashSet<>();
for (int i = 0; i < clientRunner.cpuCount; i++) { System.out.println("Starting experiment");
callables.add(() -> clientRunner.runExperiment(storageService)); for (int i = 0; i < clientRunner.threadCount; i++) {
if (clientRunner.mode.equals("rps"))
callables.add(() -> clientRunner.runExperimentByRps(storageService));
else
callables.add(() -> clientRunner.runExperimentByConcurrency(storageService));
} }
List<Future<Double>> futures = experimentExecutors.invokeAll(callables); List<Future<Double>> futures = experimentExecutors.invokeAll(callables);
for (Future<Double> future: futures) { for (Future<Double> future: futures) {
...@@ -327,11 +516,12 @@ public class ClientRunner { ...@@ -327,11 +516,12 @@ public class ClientRunner {
Thread.sleep(2000); Thread.sleep(2000);
double totalRuntime = endTime - startTime; double totalRuntime = endTime - startTime;
clientRunner.printStatistics(totalRuntime); clientRunner.printStatistics(totalRuntime);
clientRunner.cleanupExperiment(storageService); clientRunner.cleanupExperiment(storageService);
System.out.println(storageService.getCache().stats());
executorService.shutdown(); executorService.shutdown();
boolean status = executorService.awaitTermination(1000, TimeUnit.MICROSECONDS); boolean status = executorService.awaitTermination(1000, TimeUnit.MICROSECONDS);
if (!status) if (!status)
executorService.shutdownNow(); executorService.shutdownNow();
experimentExecutors.shutdownNow();
} }
} }
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient; package HpdosClient;
public class ConfigConstants { public class ConfigConstants {
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient.MessageFormat; package HpdosClient.MessageFormat;
public class MessageConstants { public class MessageConstants {
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient.MessageFormat; package HpdosClient.MessageFormat;
import hpdos.grpc.Packet; import hpdos.grpc.Packet;
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient.MessageFormat; package HpdosClient.MessageFormat;
import hpdos.grpc.*;
import hpdos.grpc.Ack;
import hpdos.grpc.Nack;
import hpdos.grpc.Packet;
import hpdos.grpc.Response;
import java.util.ArrayList; import java.util.ArrayList;
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient.lib;
import com.google.common.util.concurrent.ListenableFuture;
import hpdos.grpc.Packet;
public class StorageHandler {
private boolean isResultReady;
private Packet packet;
private ListenableFuture<Packet> listener;
public StorageHandler(boolean isResultReady, Packet packet, ListenableFuture<Packet> listener) {
this.isResultReady = isResultReady;
this.packet = packet;
this.listener = listener;
}
public boolean isResultReady() {
return isResultReady;
}
public void setResultReady(boolean resultReady) {
isResultReady = resultReady;
}
public Packet getPacket() {
return packet;
}
public void setPacket(Packet packet) {
this.packet = packet;
}
public ListenableFuture<Packet> getListener() {
return listener;
}
public void setListener(ListenableFuture<Packet> listener) {
this.listener = listener;
}
}
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient.lib; package HpdosClient.lib;
import hpdos.grpc.Ack; import hpdos.grpc.Ack;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
public class StorageModel { public class StorageModel {
private int version; private int version;
private int dataSize; private int dataSize;
...@@ -32,6 +45,13 @@ public class StorageModel { ...@@ -32,6 +45,13 @@ public class StorageModel {
this.crc = ack.getCrc(); this.crc = ack.getCrc();
} }
public void updateData(StorageModel model) {
this.version = model.getVersion();
this.dataSize = model.getDataSize();
this.value = model.getValue();
this.crc = model.getCrc();
}
public int getVersion() { public int getVersion() {
return version; return version;
} }
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package HpdosClient.lib; package HpdosClient.lib;
import HpdosClient.ConfigConstants; import HpdosClient.ConfigConstants;
import HpdosClient.MessageFormat.MessageConstants; import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.MessageFormat.RequestBuilder; import HpdosClient.MessageFormat.RequestBuilder;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import hpdos.grpc.*; import hpdos.grpc.*;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import java.util.*; import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
public class StorageService { public class StorageService {
...@@ -18,10 +42,42 @@ public class StorageService { ...@@ -18,10 +42,42 @@ public class StorageService {
private NetworkServiceGrpc.NetworkServiceFutureStub masterStub; private NetworkServiceGrpc.NetworkServiceFutureStub masterStub;
private final ArrayList<NetworkServiceGrpc.NetworkServiceFutureStub> stubs; private final ArrayList<NetworkServiceGrpc.NetworkServiceFutureStub> stubs;
private List<Follower> replicaSet; private List<Follower> replicaSet;
private final LoadingCache<String, StorageModel> cache;
public StorageService(String clientID) { public StorageService(String clientID) {
this.clientID = clientID; this.clientID = clientID;
this.stubs = new ArrayList<>(); this.stubs = new ArrayList<>();
this.cache = CacheBuilder
.newBuilder()
.maximumSize(10000)
.recordStats()
.build(new CacheLoader<>() {
@Override
public StorageModel load(@Nonnull String key) throws Exception {
int rnd = new Random().nextInt(stubs.size());
NetworkServiceGrpc.NetworkServiceFutureStub stub = stubs.get(rnd);
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ""));
Packet requestPacket = RequestBuilder.buildPacket(request);
ListenableFuture<Packet> readListener = stub.readMetadata(requestPacket);
Packet packet = readListener.get();
Response response = packet.getResponse(0);
if (response.getStatus() == MessageConstants.STATUS_OK) {
Ack ack = response.getAck();
return new StorageModel(ack.getVersion(), ack.getDataSize(), ack.getKey(),
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ack.getCrc(), ack.getValue());
} else {
throw new IOException("Resource not found");
}
}
});
}
public LoadingCache<String, StorageModel> getCache() {
return cache;
} }
public void retrieveFollowerList() { public void retrieveFollowerList() {
...@@ -47,6 +103,7 @@ public class StorageService { ...@@ -47,6 +103,7 @@ public class StorageService {
channels.add(masterChannel); channels.add(masterChannel);
masterStub = NetworkServiceGrpc.newFutureStub(this.masterChannel); masterStub = NetworkServiceGrpc.newFutureStub(this.masterChannel);
retrieveFollowerList(); retrieveFollowerList();
stubs.add(masterStub);
for (Follower follower: replicaSet) { for (Follower follower: replicaSet) {
ManagedChannel channel = ManagedChannelBuilder. ManagedChannel channel = ManagedChannelBuilder.
forAddress(follower.getIp(), follower.getPort()) forAddress(follower.getIp(), follower.getPort())
...@@ -67,32 +124,152 @@ public class StorageService { ...@@ -67,32 +124,152 @@ public class StorageService {
return this.masterStub.createMetadata(packet); return this.masterStub.createMetadata(packet);
} }
public ListenableFuture<Packet> read(String key) { public ListenableFuture<StorageModel> read(String key) throws ExecutionException, IOException {
int rnd = new Random().nextInt(this.stubs.size()); return new ListenableFuture<>() {
NetworkServiceGrpc.NetworkServiceFutureStub stub = this.stubs.get(rnd); StorageModel data = null;
ArrayList<Request> request = new ArrayList<>(); boolean isDone = false;
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ, @Override
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, "")); public void addListener(@Nonnull Runnable listener, @Nonnull Executor executor) {
Packet packet = RequestBuilder.buildPacket(request); executor.execute(() -> {
return stub.readMetadata(packet); try {
data = cache.get(key);
} catch (Exception e) {
data = null;
}
isDone = true;
listener.run();
});
}
@Override
public boolean cancel(boolean b) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isDone;
}
@Override
public StorageModel get() throws InterruptedException, ExecutionException {
return data;
}
@Override
public StorageModel get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return data;
}
};
} }
public ListenableFuture<Packet> update(String key, String value, int version) { public ListenableFuture<Packet> update(String key, String value, int version) {
ArrayList<Request> request = new ArrayList<>(); return new ListenableFuture<Packet>() {
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_UPDATE, private Packet responsePacket = null;
version, value.length(), key, private boolean isDone = false;
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value)); @Override
Packet packet = RequestBuilder.buildPacket(request); public void addListener(Runnable listener, Executor executor) {
return this.masterStub.updateMetadata(packet); executor.execute(() -> {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_UPDATE,
version, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, clientID, value));
Packet requestPacket = RequestBuilder.buildPacket(request);
ListenableFuture<Packet> res = masterStub.updateMetadata(requestPacket);
try {
this.responsePacket = res.get();
if (this.responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
cache.invalidate(key);
this.isDone = true;
listener.run();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
@Override
public boolean cancel(boolean b) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return isDone;
}
@Override
public Packet get() throws InterruptedException, ExecutionException {
return this.responsePacket;
}
@Override
public Packet get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return this.responsePacket;
}
};
} }
public ListenableFuture<Packet> delete(String key, int version) { public ListenableFuture<Packet> delete(String key, int version) {
ArrayList<Request> request = new ArrayList<>(); return new ListenableFuture<Packet>() {
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_DELETE, private Packet responsePacket = null;
version, 0, key, private boolean isDone = false;
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, "")); @Override
Packet packet = RequestBuilder.buildPacket(request); public void addListener(Runnable listener, Executor executor) {
return this.masterStub.deleteMetadata(packet); executor.execute(() -> {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_DELETE,
version, 0, key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ""));
Packet requestPacket = RequestBuilder.buildPacket(request);
ListenableFuture<Packet> res = masterStub.deleteMetadata(requestPacket);
try {
this.responsePacket = res.get();
if (responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
cache.invalidate(key);
this.isDone = true;
listener.run();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
@Override
public boolean cancel(boolean b) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return this.isDone;
}
@Override
public Packet get() throws InterruptedException, ExecutionException {
return this.responsePacket;
}
@Override
public Packet get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return this.responsePacket;
}
};
} }
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* /*
* This Java source file was generated by the Gradle 'init' task. * This Java source file was generated by the Gradle 'init' task.
*/ */
package HpdosClient; package HpdosClient;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assert.assertNotNull;
public class AppTest { public class AppTest {
@Test public void testAppHasAGreeting() { @Test public void testAppHasAGreeting() {
......
#
# Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip
......
#!/usr/bin/env sh #!/usr/bin/env sh
# #
# Copyright 2015 the original author or authors. # Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
# You may obtain a copy of the License at # You may obtain a copy of the License at
# #
# https://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* /*
* This file was generated by the Gradle 'init' task. * This file was generated by the Gradle 'init' task.
* *
......
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hpdos.lib; package hpdos.lib;
public interface StorageService { public interface StorageService {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment