Commit 6852c7fd authored by NILANJAN DAW's avatar NILANJAN DAW

MDS now configurable via app.configs

Fixed data block size bug for the client
parent 66bddc63
app.name="HPDOS-Client" app.name="HPDOS-Client"
app.version="0.1.4" app.version="0.1.4"
app.thread_count=4 app.thread_count=4
app.runtime=30 app.runtime=1
app.data_size=10 app.data_size=10
app.data_conversion_factor="B" app.data_conversion_factor=B
app.private_ratio=0.8 app.private_ratio=0.8
app.cycle_create=1 app.cycle_create=1
app.cycle_read=4 app.cycle_read=4
......
...@@ -17,8 +17,8 @@ import java.util.*; ...@@ -17,8 +17,8 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
public class ClientRunner { public class ClientRunner {
public static int parallelCount = 10; public static int parallelCount;
public static int runtime = 0; public static int runtime;
private final String clientID; private final String clientID;
public static String propertiesFile; public static String propertiesFile;
private int cCreate, cRead, cUpdate, cDelete; private int cCreate, cRead, cUpdate, cDelete;
...@@ -26,6 +26,7 @@ public class ClientRunner { ...@@ -26,6 +26,7 @@ public class ClientRunner {
private List<Follower> replicaSet; private List<Follower> replicaSet;
private Queue<Long> createTime, updateTime, readTime, deleteTime; private Queue<Long> createTime, updateTime, readTime, deleteTime;
private final Properties properties; private final Properties properties;
public ClientRunner() { public ClientRunner() {
clientID = UUID.randomUUID().toString(); clientID = UUID.randomUUID().toString();
properties = new Properties(); properties = new Properties();
...@@ -100,8 +101,8 @@ public class ClientRunner { ...@@ -100,8 +101,8 @@ public class ClientRunner {
} }
private String createString() { private String createString() {
int dataSize = Integer.parseInt((String) properties.get("app.data_size")); double dataSize = Double.parseDouble((String) properties.get("app.data_size"));
dataSize /= 2; // Java strings are 2B long dataSize /= 2.0; // Java strings are 2B long
String conversionFactor = (String) properties.get("app.data_conversion_factor"); String conversionFactor = (String) properties.get("app.data_conversion_factor");
int multiplier = 1; int multiplier = 1;
switch (conversionFactor) { switch (conversionFactor) {
...@@ -109,7 +110,7 @@ public class ClientRunner { ...@@ -109,7 +110,7 @@ public class ClientRunner {
case "M": multiplier *= 1000; case "M": multiplier *= 1000;
case "K": multiplier *= 1000; case "K": multiplier *= 1000;
} }
char[] data = new char[dataSize * multiplier]; char[] data = new char[(int)(dataSize * multiplier)];
return new String(data); return new String(data);
} }
public double runExperiment(String id, long experimentStartTime) { public double runExperiment(String id, long experimentStartTime) {
......
app.name=HPDOS-Server
app.version=0.1.6
app.REPLICATION_TYPE=async
app.REPLICATOR_THREAD_POOL_SIZE=140
...@@ -5,10 +5,10 @@ public class ConfigConstants { ...@@ -5,10 +5,10 @@ public class ConfigConstants {
public static final int PORT = 8080; public static final int PORT = 8080;
public static final int HEARTBEAT_INTERVAL = 500; public static final int HEARTBEAT_INTERVAL = 500;
public static final int REPLICATION_TIMEOUT = 5000; public static final int REPLICATION_TIMEOUT = 5000;
public static final String replicationAsync = "async";
// Backend types 300-399 // Backend types 300-399
public static final int BACKEND_IN_MEMORY = 300; public static final int BACKEND_IN_MEMORY = 300;
public static final int LSM_BACKEND = 301; public static final int LSM_BACKEND = 301;
public static final int BINARY_TREE_BACKEND = 302; public static final int BINARY_TREE_BACKEND = 302;
public static final int REPLICATOR_THREAD_POOL_SIZE = 12; public static final int REPLICATOR_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
} }
...@@ -17,11 +17,10 @@ import io.grpc.ManagedChannelBuilder; ...@@ -17,11 +17,10 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.ServerBuilder; import io.grpc.ServerBuilder;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.io.InputStream;
import java.util.Timer; import java.util.*;
import java.util.TimerTask;
import java.util.UUID;
public class MetadataServer { public class MetadataServer {
private Server server; private Server server;
...@@ -32,13 +31,17 @@ public class MetadataServer { ...@@ -32,13 +31,17 @@ public class MetadataServer {
private final String host; private final String host;
private IOHandler ioHandler; private IOHandler ioHandler;
private ReplicationService replicationService; private ReplicationService replicationService;
private final Properties properties;
public MetadataServer() { public MetadataServer(String propertiesFile) throws IOException {
this.followers = new HashMap<>(); this.followers = new HashMap<>();
this.serverID = UUID.randomUUID().toString(); this.serverID = UUID.randomUUID().toString();
this.port = 10000 + (int)(Math.random() * 40000); this.port = 10000 + (int)(Math.random() * 40000);
this.host = "localhost"; this.host = "localhost";
this.replicationService = null; this.replicationService = null;
this.properties = new Properties();
InputStream inputStream = new FileInputStream(propertiesFile);
properties.load(inputStream);
} }
public String getGreeting() { public String getGreeting() {
...@@ -139,7 +142,14 @@ public class MetadataServer { ...@@ -139,7 +142,14 @@ public class MetadataServer {
} }
} }
public static void main(String[] args) { public static void main(String[] args) {
MetadataServer metaDataServer = new MetadataServer();
MetadataServer metaDataServer = null;
try {
metaDataServer = new MetadataServer(args[0]);
} catch (IOException e) {
System.out.println("Config file missing or unreadable");
System.exit(-1);
}
System.out.println(metaDataServer.getGreeting()); System.out.println(metaDataServer.getGreeting());
System.out.println("Starting Metadata service"); System.out.println("Starting Metadata service");
...@@ -154,7 +164,8 @@ public class MetadataServer { ...@@ -154,7 +164,8 @@ public class MetadataServer {
System.out.println("Searching for MetadataMaster"); System.out.println("Searching for MetadataMaster");
metaDataServer.announceToMaster(); metaDataServer.announceToMaster();
if (metaDataServer.isMaster) { if (metaDataServer.isMaster) {
metaDataServer.replicationService = new InlineReplicationService(metaDataServer.followers); metaDataServer.replicationService = new InlineReplicationService(
metaDataServer.followers, metaDataServer.properties);
System.out.println("Started master replication module"); System.out.println("Started master replication module");
boolean status = metaDataServer.startMasterServices(); boolean status = metaDataServer.startMasterServices();
System.out.println("Master ID: " + metaDataServer.serverID); System.out.println("Master ID: " + metaDataServer.serverID);
......
...@@ -10,6 +10,7 @@ import io.grpc.stub.StreamObserver; ...@@ -10,6 +10,7 @@ import io.grpc.stub.StreamObserver;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ExecutionException;
public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
private final IOHandler ioHandler; private final IOHandler ioHandler;
...@@ -102,10 +103,10 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase { ...@@ -102,10 +103,10 @@ public class NetworkHandler extends NetworkServiceGrpc.NetworkServiceImplBase {
// System.out.println("starting replication"); // System.out.println("starting replication");
Stopwatch stopwatch = Stopwatch.createUnstarted(); Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start(); stopwatch.start();
replicationResponse = replicationService.replicateMetadataAsync(replicationRequest); replicationResponse = replicationService.replicateMetadata(replicationRequest);
stopwatch.stop(); stopwatch.stop();
// System.out.println("Network handler replicate" + stopwatch); // System.out.println("Network handler replicate" + stopwatch);
} catch (InterruptedException e) { } catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
// System.out.println("replication complete"); // System.out.println("replication complete");
......
...@@ -15,6 +15,7 @@ import hpdos.message.ResponseBuilder; ...@@ -15,6 +15,7 @@ import hpdos.message.ResponseBuilder;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import javax.annotation.Nonnull;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
...@@ -22,26 +23,40 @@ public class InlineReplicationService implements ReplicationService { ...@@ -22,26 +23,40 @@ public class InlineReplicationService implements ReplicationService {
private final HashMap<String, MasterFollower> followers; private final HashMap<String, MasterFollower> followers;
private final HashMap<String, ManagedChannel> channels; private final HashMap<String, ManagedChannel> channels;
private final ExecutorService executorService; private ExecutorService executorService;
public InlineReplicationService(HashMap<String, MasterFollower> followers) { private final boolean isReplicationAsync;
public InlineReplicationService(HashMap<String, MasterFollower> followers, Properties properties) {
this.followers = followers; this.followers = followers;
this.channels = new HashMap<>(); this.channels = new HashMap<>();
for (MasterFollower follower: this.followers.values()) { String replicationType = (String) properties.get("app.REPLICATION_TYPE");
ManagedChannel channel = ManagedChannelBuilder this.isReplicationAsync = replicationType.equals(ConfigConstants.replicationAsync);
.forAddress(follower.getIp(), follower.getPort()) if (!this.isReplicationAsync) {
.usePlaintext() int replicationThreadPoolSize;
.build(); if (properties.containsKey("app.REPLICATOR_THREAD_POOL_SIZE")) {
channels.put(follower.getFollowerID(), channel); replicationThreadPoolSize = Integer.parseInt((String)
properties.get("app.REPLICATOR_THREAD_POOL_SIZE"));
} else {
replicationThreadPoolSize = ConfigConstants.REPLICATOR_THREAD_POOL_SIZE;
}
this.executorService = Executors.newFixedThreadPool(replicationThreadPoolSize);
System.out.println("Creating synchronous replication pool. Pool size: " + replicationThreadPoolSize);
} else {
System.out.println("Replication to be handled using asynchronous handlers");
} }
this.executorService = Executors.newFixedThreadPool(ConfigConstants.REPLICATOR_THREAD_POOL_SIZE);
} }
@Override @Override
public void cleanup() throws InterruptedException { public void cleanup() throws InterruptedException {
for (ManagedChannel channel: channels.values()) for (ManagedChannel channel: channels.values())
channel.shutdown(); channel.shutdown();
executorService.shutdown(); if (this.executorService != null) {
executorService.awaitTermination(MessageConstants.STATUS_REPLICATION_TIMEOUT, TimeUnit.MILLISECONDS); executorService.shutdown();
boolean status = executorService.
awaitTermination(MessageConstants.STATUS_REPLICATION_TIMEOUT, TimeUnit.MILLISECONDS);
if (!status)
executorService.shutdownNow();
}
} }
private void establishChannels() { private void establishChannels() {
...@@ -58,7 +73,14 @@ public class InlineReplicationService implements ReplicationService { ...@@ -58,7 +73,14 @@ public class InlineReplicationService implements ReplicationService {
} }
@Override @Override
public ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) public ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws ExecutionException, InterruptedException {
if (this.isReplicationAsync) {
return replicateMetadataAsync(replicationRequest);
} else {
return replicateMetadataSync(replicationRequest);
}
}
public ReplicationResponse replicateMetadataSync(ReplicationRequest replicationRequest)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
Set<Callable<ReplicationResponse>> callables = new HashSet<>(); Set<Callable<ReplicationResponse>> callables = new HashSet<>();
...@@ -104,7 +126,6 @@ public class InlineReplicationService implements ReplicationService { ...@@ -104,7 +126,6 @@ public class InlineReplicationService implements ReplicationService {
* @param replicationRequest replication request sent * @param replicationRequest replication request sent
* @return replication response * @return replication response
*/ */
@Override
public ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest) throws InterruptedException { public ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest) throws InterruptedException {
CountDownLatch replicationWaiter = new CountDownLatch(this.followers.size()); CountDownLatch replicationWaiter = new CountDownLatch(this.followers.size());
HashMap<String, Response> responseHashMap = new HashMap<>(); HashMap<String, Response> responseHashMap = new HashMap<>();
...@@ -133,7 +154,7 @@ public class InlineReplicationService implements ReplicationService { ...@@ -133,7 +154,7 @@ public class InlineReplicationService implements ReplicationService {
} }
@Override @Override
public void onFailure(Throwable t) { public void onFailure(@Nonnull Throwable t) {
replicationWaiter.countDown(); replicationWaiter.countDown();
} }
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
......
...@@ -8,7 +8,6 @@ import java.util.concurrent.ExecutionException; ...@@ -8,7 +8,6 @@ import java.util.concurrent.ExecutionException;
public interface ReplicationService { public interface ReplicationService {
abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException; abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException;
abstract ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest) throws InterruptedException;
abstract void cleanup() throws InterruptedException; abstract void cleanup() throws InterruptedException;
abstract HashMap<String, MasterFollower> getFollowers(); abstract HashMap<String, MasterFollower> getFollowers();
} }
...@@ -4,11 +4,14 @@ ...@@ -4,11 +4,14 @@
package hpdos; package hpdos;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class AppTest { public class AppTest {
@Test public void testAppHasAGreeting() { @Test public void testAppHasAGreeting() throws IOException {
MetadataServer classUnderTest = new MetadataServer(); MetadataServer classUnderTest = new MetadataServer("dummypath");
assertNotNull("app should have a greeting", classUnderTest.getGreeting()); assertNotNull("app should have a greeting", classUnderTest.getGreeting());
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment