Commit d7f831b2 authored by NILANJAN DAW's avatar NILANJAN DAW

Async mode StorageService made cache flag aware. Asyc Storage no more sharing stubs.

parent fff1436f
app.name="HPDOS-Client"
app.version="0.1.4"
app.cache=1
app.thread_count=10
app.rps=1000
app.runtime=10
......
......@@ -53,7 +53,7 @@ dependencies {
application {
// Define the main class for the application.
mainClass = 'HpdosClient.ClientRunnerSync'
mainClass = 'HpdosClient.ClientRunner'
}
sourceSets {
......
......@@ -38,6 +38,7 @@ public class ClientRunnerSync {
public boolean experimentEnded = false;
private Queue<Long> createTime, updateTime, readTime, deleteTime;
private final Properties properties;
private boolean cacheEnabled;
public ClientRunnerSync() {
clientID = UUID.randomUUID().toString();
......@@ -46,6 +47,7 @@ public class ClientRunnerSync {
InputStream inputStream = new FileInputStream(propertiesFile);
properties.load(inputStream);
parallelCount = Integer.parseInt((String) properties.get("app.thread_count"));
cacheEnabled = properties.get("app.cache").equals("1");
runtime = Integer.parseInt((String) properties.get("app.runtime"));
cCreate = Integer.parseInt((String) properties.get("app.cycle_create"));
cRead = Integer.parseInt((String) properties.get("app.cycle_read"));
......@@ -79,7 +81,7 @@ public class ClientRunnerSync {
return new String(data);
}
public double runExperiment(String id, long experimentStartTime) {
StorageServiceSync storageService = new StorageServiceSync(this.clientID);
StorageServiceSync storageService = new StorageServiceSync(this.clientID, this.cacheEnabled);
storageService.initStorage();
String value = createString(), updatedValue = createString();
for (;;) {
......@@ -112,6 +114,7 @@ public class ClientRunnerSync {
if ((currentTime - experimentStartTime) >= runtime * 1000L)
break;
}
if (this.cacheEnabled)
System.out.println(storageService.getCache().stats());
storageService.cleanup();
return 0;
......@@ -174,6 +177,7 @@ public class ClientRunnerSync {
ClientRunnerSync clientRunner = new ClientRunnerSync();
System.out.println(clientRunner.getGreeting());
System.out.println("Using Sync Server. Thread count: " + parallelCount + " runtime: " + runtime + "s");
System.out.println("cache status: " + clientRunner.cacheEnabled);
ExecutorService executorService = Executors.newFixedThreadPool(parallelCount);
Thread.sleep(1000); // let things settle down a bit
Set<Callable<Double>> callables = new HashSet<>();
......
......@@ -38,6 +38,16 @@ public class StorageModel {
this.crc = crc;
}
public StorageModel(Ack ack, String owner, int accessType) {
this.version = ack.getVersion();
this.dataSize = ack.getDataSize();
this.value = ack.getValue();
this.crc = ack.getCrc();
this.key = ack.getKey();
this.accessType = accessType;
this.owner = owner;
}
public void updateData(Ack ack) {
this.version = ack.getVersion();
this.dataSize = ack.getDataSize();
......
......@@ -39,14 +39,14 @@ public class StorageService {
private final String clientID;
private ManagedChannel masterChannel;
private ArrayList<ManagedChannel> channels;
private NetworkServiceGrpc.NetworkServiceFutureStub masterStub;
private final ArrayList<NetworkServiceGrpc.NetworkServiceFutureStub> stubs;
private List<Follower> replicaSet;
private final LoadingCache<String, StorageModel> cache;
public StorageService(String clientID) {
private List<Follower> replicaSet;
private LoadingCache<String, StorageModel> cache = null;
private final boolean cacheEnabled;
public StorageService(String clientID, boolean cacheEnabled) {
this.clientID = clientID;
this.stubs = new ArrayList<>();
this.cacheEnabled = cacheEnabled;
if (this.cacheEnabled)
this.cache = CacheBuilder
.newBuilder()
.maximumSize(10000)
......@@ -54,12 +54,14 @@ public class StorageService {
.build(new CacheLoader<>() {
@Override
public StorageModel load(@Nonnull String key) throws Exception {
int rnd = new Random().nextInt(stubs.size());
NetworkServiceGrpc.NetworkServiceFutureStub stub = stubs.get(rnd);
int rnd = new Random().nextInt(channels.size());
NetworkServiceGrpc.NetworkServiceFutureStub stub = NetworkServiceGrpc
.newFutureStub(channels.get(rnd));
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ""));
0, 0, key, 0,
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ""));
Packet requestPacket = RequestBuilder.buildPacket(request);
ListenableFuture<Packet> readListener = stub.readMetadata(requestPacket);
Packet packet = readListener.get();
......@@ -84,9 +86,6 @@ public class StorageService {
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(this.masterChannel);
ResponseList responseList = stub.getReadReplicaList(null);
this.replicaSet = responseList.getFollowerList();
// for (Follower follower: this.replicaSet) {
// System.out.println(follower);
// }
}
public void cleanup() {
......@@ -101,17 +100,15 @@ public class StorageService {
.build();
channels = new ArrayList<>();
channels.add(masterChannel);
masterStub = NetworkServiceGrpc.newFutureStub(this.masterChannel);
retrieveFollowerList();
stubs.add(masterStub);
for (Follower follower: replicaSet) {
ManagedChannel channel = ManagedChannelBuilder.
forAddress(follower.getIp(), follower.getPort())
.usePlaintext()
.build();
channels.add(channel);
NetworkServiceGrpc.NetworkServiceFutureStub stub = NetworkServiceGrpc.newFutureStub(channel);
stubs.add(stub);
}
}
......@@ -121,10 +118,12 @@ public class StorageService {
0, value.length(), key,
0, accessType, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
return this.masterStub.createMetadata(packet);
NetworkServiceGrpc.NetworkServiceFutureStub masterStub = NetworkServiceGrpc
.newFutureStub(this.masterChannel);
return masterStub.createMetadata(packet);
}
public ListenableFuture<StorageModel> read(String key) throws ExecutionException, IOException {
public ListenableFuture<StorageModel> read(String key) throws IOException {
return new ListenableFuture<>() {
StorageModel data = null;
boolean isDone = false;
......@@ -132,7 +131,30 @@ public class StorageService {
public void addListener(@Nonnull Runnable listener, @Nonnull Executor executor) {
executor.execute(() -> {
try {
if (cacheEnabled) {
data = cache.get(key);
} else {
int rnd = new Random().nextInt(channels.size());
NetworkServiceGrpc.NetworkServiceFutureStub stub = NetworkServiceGrpc
.newFutureStub(channels.get(rnd));
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0,
MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ""));
Packet requestPacket = RequestBuilder.buildPacket(request);
ListenableFuture<Packet> readListener = stub.readMetadata(requestPacket);
Packet packet = readListener.get();
Response response = packet.getResponse(0);
if (response.getStatus() == MessageConstants.STATUS_OK) {
Ack ack = response.getAck();
data = new StorageModel(ack.getVersion(), ack.getDataSize(), ack.getKey(),
MessageConstants.METADATA_ACCESS_PRIVATE, clientID,
ack.getCrc(), ack.getValue());
} else {
throw new IOException("Resource not found");
}
}
} catch (Exception e) {
data = null;
}
......@@ -162,28 +184,31 @@ public class StorageService {
}
@Override
public StorageModel get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
public StorageModel get(long l, @Nonnull TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return data;
}
};
}
public ListenableFuture<Packet> update(String key, String value, int version) {
return new ListenableFuture<Packet>() {
return new ListenableFuture<>() {
private Packet responsePacket = null;
private boolean isDone = false;
@Override
public void addListener(Runnable listener, Executor executor) {
public void addListener(@Nonnull Runnable listener, @Nonnull Executor executor) {
executor.execute(() -> {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_UPDATE,
version, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, clientID, value));
Packet requestPacket = RequestBuilder.buildPacket(request);
NetworkServiceGrpc.NetworkServiceFutureStub masterStub = NetworkServiceGrpc
.newFutureStub(masterChannel);
ListenableFuture<Packet> res = masterStub.updateMetadata(requestPacket);
try {
this.responsePacket = res.get();
if (this.responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
if (cacheEnabled && this.responsePacket
.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
cache.invalidate(key);
this.isDone = true;
listener.run();
......@@ -214,28 +239,31 @@ public class StorageService {
}
@Override
public Packet get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
public Packet get(long l, @Nonnull TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return this.responsePacket;
}
};
}
public ListenableFuture<Packet> delete(String key, int version) {
return new ListenableFuture<Packet>() {
return new ListenableFuture<>() {
private Packet responsePacket = null;
private boolean isDone = false;
@Override
public void addListener(Runnable listener, Executor executor) {
public void addListener(@Nonnull Runnable listener, @Nonnull Executor executor) {
executor.execute(() -> {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_DELETE,
version, 0, key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, clientID, ""));
Packet requestPacket = RequestBuilder.buildPacket(request);
NetworkServiceGrpc.NetworkServiceFutureStub masterStub = NetworkServiceGrpc
.newFutureStub(masterChannel);
ListenableFuture<Packet> res = masterStub.deleteMetadata(requestPacket);
try {
this.responsePacket = res.get();
if (responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
if (cacheEnabled && responsePacket
.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
cache.invalidate(key);
this.isDone = true;
listener.run();
......@@ -266,7 +294,7 @@ public class StorageService {
}
@Override
public Packet get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
public Packet get(long l, @Nonnull TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return this.responsePacket;
}
};
......
......@@ -28,10 +28,7 @@ import io.grpc.ManagedChannelBuilder;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class StorageServiceSync {
......@@ -40,10 +37,13 @@ public class StorageServiceSync {
private ManagedChannel masterChannel;
private ArrayList<ManagedChannel> channels;
private List<Follower> replicaSet;
private final LoadingCache<String, StorageModel> cache;
private boolean cacheEnabled;
private LoadingCache<String, StorageModel> cache = null;
public StorageServiceSync(String clientID) {
public StorageServiceSync(String clientID, boolean cacheEnabled) {
this.clientID = clientID;
this.cacheEnabled = cacheEnabled;
if (this.cacheEnabled)
this.cache = CacheBuilder
.newBuilder()
.maximumSize(10000)
......@@ -120,6 +120,7 @@ public class StorageServiceSync {
// read back the metadata
public Map.Entry<StorageModel, Long> read(String key) {
if (cacheEnabled) {
long timestampReadStart = System.currentTimeMillis();
try {
StorageModel model = cache.get(key);
......@@ -128,6 +129,22 @@ public class StorageServiceSync {
e.printStackTrace();
}
return new AbstractMap.SimpleEntry<>(null, timestampReadStart);
} else {
int rnd = new Random().nextInt(this.channels.size());
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(channels.get(rnd));
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
long timestampReadStart = System.currentTimeMillis();
Packet responsePacket = stub.readMetadata(packet);
if (responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK) {
StorageModel model = new StorageModel(responsePacket.getResponse(0).getAck(),
this.clientID, MessageConstants.METADATA_ACCESS_PRIVATE);
return new AbstractMap.SimpleEntry<>(model, timestampReadStart);
}
return new AbstractMap.SimpleEntry<>(null, timestampReadStart);
}
}
public long update(String key, String value, int version) {
......@@ -139,7 +156,8 @@ public class StorageServiceSync {
long timestampCreateStart = System.currentTimeMillis();
NetworkServiceGrpc.NetworkServiceBlockingStub masterStub = NetworkServiceGrpc.newBlockingStub(masterChannel);
Packet responsePacket = masterStub.updateMetadata(packet);
if (responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
if (this.cacheEnabled &&
responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
cache.invalidate(key);
return timestampCreateStart;
}
......@@ -153,9 +171,9 @@ public class StorageServiceSync {
long timestampCreateStart = System.currentTimeMillis();
NetworkServiceGrpc.NetworkServiceBlockingStub masterStub = NetworkServiceGrpc.newBlockingStub(masterChannel);
Packet responsePacket = masterStub.deleteMetadata(packet);
if (responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
if (this.cacheEnabled &&
responsePacket.getResponse(0).getStatus() == MessageConstants.STATUS_OK)
cache.invalidate(key);
return timestampCreateStart;
}
}
/*
* Copyright 2021 Nilanjan Daw, Synerg Lab, Department of CSE, IIT Bombay
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package HpdosClient;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
public class AppTest {
@Test public void testAppHasAGreeting() {
ClientRunner classUnderTest = new ClientRunner();
assertNotNull("app should have a greeting", classUnderTest.getGreeting());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment