Commit d42a38e0 authored by Paras Garg's avatar Paras Garg

rocksdb

parent 5eda140a
...@@ -6,26 +6,6 @@ ...@@ -6,26 +6,6 @@
<attribute name="gradle_used_by_scope" value="main,test"/> <attribute name="gradle_used_by_scope" value="main,test"/>
</attributes> </attributes>
</classpathentry> </classpathentry>
<classpathentry kind="src" output="bin/main" path="src/main/resources">
<attributes>
<attribute name="gradle_scope" value="main"/>
<attribute name="gradle_used_by_scope" value="main,test"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="bin/test" path="src/test/java">
<attributes>
<attribute name="gradle_scope" value="test"/>
<attribute name="gradle_used_by_scope" value="test"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="bin/test" path="src/test/resources">
<attributes>
<attribute name="gradle_scope" value="test"/>
<attribute name="gradle_used_by_scope" value="test"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11/"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11/"/>
<classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/> <classpathentry kind="con" path="org.eclipse.buildship.core.gradleclasspathcontainer"/>
<classpathentry kind="output" path="bin/default"/> <classpathentry kind="output" path="bin/default"/>
......
...@@ -8,6 +8,7 @@ app.rdma_max_inline=0 ...@@ -8,6 +8,7 @@ app.rdma_max_inline=0
app.rdma_server_port=1920 app.rdma_server_port=1920
app.NETWORK_HANDLER_THREADS=10 app.NETWORK_HANDLER_THREADS=10
app.REPLICATION_HANDLER_THREADS=10 app.REPLICATION_HANDLER_THREADS=10
app.db_path=/home/ub-05/paras/rocks
#The below properties are for master node only. #The below properties are for master node only.
app.follower_registration_port=9876 app.follower_registration_port=9876
......
...@@ -56,6 +56,9 @@ dependencies { ...@@ -56,6 +56,9 @@ dependencies {
implementation 'com.google.guava:guava:28.0-jre' implementation 'com.google.guava:guava:28.0-jre'
implementation 'com.ibm.darpc:darpc:1.9' implementation 'com.ibm.darpc:darpc:1.9'
implementation 'com.github.ben-manes.caffeine:caffeine:3.0.4' implementation 'com.github.ben-manes.caffeine:caffeine:3.0.4'
// https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni
implementation group: 'org.rocksdb', name: 'rocksdbjni', version: '5.8.0'
// Use JUnit test framework // Use JUnit test framework
// testImplementation 'junit:junit:4.12' // testImplementation 'junit:junit:4.12'
} }
\ No newline at end of file
...@@ -10,6 +10,8 @@ import com.ibm.darpc.DaRPCServerEndpoint; ...@@ -10,6 +10,8 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerGroup; import com.ibm.darpc.DaRPCServerGroup;
import com.ibm.disni.RdmaServerEndpoint; import com.ibm.disni.RdmaServerEndpoint;
import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.handler.NetworkHandler; import hpdos_rdma_offloaded.handler.NetworkHandler;
import hpdos_rdma_offloaded.handler.RegistrationHandler; import hpdos_rdma_offloaded.handler.RegistrationHandler;
import hpdos_rdma_offloaded.handler.ReplicationHandler; import hpdos_rdma_offloaded.handler.ReplicationHandler;
...@@ -35,9 +37,9 @@ public class MetadataServer implements Runnable{ ...@@ -35,9 +37,9 @@ public class MetadataServer implements Runnable{
RegistrationHandler registrationHandler = null; RegistrationHandler registrationHandler = null;
public final static Properties properties= new Properties();; public final static Properties properties= new Properties();;
public MetadataServer(Properties properties) throws IOException { public MetadataServer(Properties properties) throws IOException,RocksDBException {
this.isMaster = Boolean.valueOf((String)properties.get("app.MASTER")); this.isMaster = Boolean.valueOf((String)properties.get("app.MASTER"));
this.storageHandler = new StorageHandler(properties); this.storageHandler = new StorageHandler(properties.getProperty("app.db_path"));
this.networkHandler = new NetworkHandler(this.storageHandler, properties); this.networkHandler = new NetworkHandler(this.storageHandler, properties);
this.metadataServerService = new MetadataServerService(this.networkHandler); this.metadataServerService = new MetadataServerService(this.networkHandler);
// if (this.isMaster) { // if (this.isMaster) {
...@@ -91,19 +93,29 @@ public class MetadataServer implements Runnable{ ...@@ -91,19 +93,29 @@ public class MetadataServer implements Runnable{
} }
} }
public static void main(String[] args) throws IOException, InterruptedException { public static void main(String[] args) {
InputStream inputStream = new FileInputStream(args[0]); try
properties.load(inputStream); {
boolean isMaster = Boolean.valueOf((String)properties.get("app.MASTER")); InputStream inputStream = new FileInputStream(args[0]);
if (isMaster) { properties.load(inputStream);
System.out.println("STARTING MASTER SERVICES."); boolean isMaster = Boolean.valueOf((String)properties.get("app.MASTER"));
} else { if (isMaster) {
System.out.println("STARTING FOLLOWER SERVICES."); System.out.println("STARTING MASTER SERVICES.");
} else {
System.out.println("STARTING FOLLOWER SERVICES.");
}
new MetadataServer(properties).run();
}
catch(RocksDBException ex)
{
System.out.println("Rocksdb Exception occur");
ex.printStackTrace();
}
catch(Exception ex)
{
System.out.println("Exception Occur");
ex.printStackTrace();
} }
Runnable t1 = new MetadataServer(properties);
Thread t = new Thread(t1);
t.start();
} }
} }
...@@ -17,6 +17,8 @@ import com.ibm.darpc.DaRPCClientEndpoint; ...@@ -17,6 +17,8 @@ import com.ibm.darpc.DaRPCClientEndpoint;
import com.ibm.darpc.DaRPCClientGroup; import com.ibm.darpc.DaRPCClientGroup;
import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.DaRPCStream;
import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.lib.Follower; import hpdos_rdma_offloaded.lib.Follower;
import hpdos_rdma_offloaded.lib.Packet; import hpdos_rdma_offloaded.lib.Packet;
import hpdos_rdma_offloaded.rdma_protocol.Request; import hpdos_rdma_offloaded.rdma_protocol.Request;
...@@ -34,7 +36,6 @@ public class NetworkHandler { ...@@ -34,7 +36,6 @@ public class NetworkHandler {
public NetworkHandler(StorageHandler storageHandler, Properties properties){ public NetworkHandler(StorageHandler storageHandler, Properties properties){
this.storageHandler = storageHandler; this.storageHandler = storageHandler;
this.executorService = Executors.newFixedThreadPool(10); this.executorService = Executors.newFixedThreadPool(10);
this.isMaster = Boolean.valueOf((String)properties.get("app.MASTER")); this.isMaster = Boolean.valueOf((String)properties.get("app.MASTER"));
} }
...@@ -71,14 +72,9 @@ public class NetworkHandler { ...@@ -71,14 +72,9 @@ public class NetworkHandler {
// Change the Futre to CompletableFuture to achieve correct asynchronousness // Change the Futre to CompletableFuture to achieve correct asynchronousness
public void create(Packet packet) throws InterruptedException, ExecutionException{ public void create(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{
System.out.println("Received create request for key/value: " + packet.getKey() + "/" + packet.getValue()); System.out.println("Received create request for key/value: " + packet.getKey() + "/" + packet.getValue());
Response response = new Response(); storageHandler.create(packet.getKey().getBytes(), packet.getValue().getBytes());
Future<String> futureRocksDB = this.executorService.submit(()->{
// Write code to write to rocksdb memtable here.
// Change return type as required.
return "";
});
if (this.isMaster) { if (this.isMaster) {
System.out.println("Starting replication"); System.out.println("Starting replication");
...@@ -88,55 +84,69 @@ public class NetworkHandler { ...@@ -88,55 +84,69 @@ public class NetworkHandler {
ReplicationHandler.replicateMetadata(packet); ReplicationHandler.replicateMetadata(packet);
return replicationResponse; return replicationResponse;
}); });
response = futureReplication.get(); Response response = futureReplication.get();
System.out.println("Replicating complete"); System.out.println("Replicating complete Ack" + response.getAck());
} }
// Write the return type for local write
futureRocksDB.get();
// Write code to parse the responses here.
} }
public void read(Packet packet){ public byte[] read(Packet packet) throws RocksDBException,InterruptedException,ExecutionException{
// Read the key, and check if it is avaible in memtable System.out.println("Received read request for key/value: " + packet.getKey() + "/" + packet.getValue());
// if (//check for key) { byte[] value = storageHandler.read(packet.getKey().getBytes());
System.out.println("got value "+ new String(value));
if (this.isMaster) {
// } else { System.out.println("Starting replication");
// Read from SSD Future<Response> futureReplication = this.executorService.submit(()->{
// } Response replicationResponse = new Response();
// Write code to replicate the data to other nics
ReplicationHandler.replicateMetadata(packet);
return replicationResponse;
});
Response response = futureReplication.get();
System.out.println("Replicating complete Ack "+ response.getAck());
}
return value;
} }
public void update(Packet packet) throws InterruptedException, ExecutionException{ public void update(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{
Future<String> futureRocksDB = this.executorService.submit(()->{ this.storageHandler.update(packet.getKey().getBytes(), packet.getValue().getBytes());
// Write code to write to rocksdb memtable here.
// Change return type as required.
return "";
});
if (isMaster) { if (isMaster) {
Response response = new Response();
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
Response response = new Response();
// Write code to replicate the data to other nics // Write code to replicate the data to other nics
return response; Response replicationResponse = new Response();
// Write code to replicate the data to other nics
ReplicationHandler.replicateMetadata(packet);
return replicationResponse;
}); });
Future<Boolean> futureInvalidation = this.executorService.submit(()->{ Future<Boolean> futureInvalidation = this.executorService.submit(()->{
sendInvalidationRequest(packet); sendInvalidationRequest(packet);
return false; return false;
}); });
Response response = futureReplication.get(); response = futureReplication.get();
futureInvalidation.get(); futureInvalidation.get();
System.out.println("Replicating complete Ack "+ response.getAck());
} }
// Write the return type for local write
futureRocksDB.get();
// Write code to parse the responses here. // Write code to parse the responses here.
} }
// To implement delete // To implement delete
public void delete(Packet packet){ public void delete(Packet packet) throws RocksDBException,InterruptedException,ExecutionException
{
this.storageHandler.delete(packet.getKey().getBytes());
if (this.isMaster) {
System.out.println("Starting replication");
Future<Response> futureReplication = this.executorService.submit(()->{
Response replicationResponse = new Response();
// Write code to replicate the data to other nics
ReplicationHandler.replicateMetadata(packet);
return replicationResponse;
});
Response response = futureReplication.get();
System.out.println("Replicating complete "+ response.getAck());
}
} }
...@@ -163,5 +173,4 @@ public class NetworkHandler { ...@@ -163,5 +173,4 @@ public class NetworkHandler {
} }
} }
} }
package hpdos_rdma_offloaded.handler; package hpdos_rdma_offloaded.handler;
import java.util.Properties; import org.rocksdb.RocksDB;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
public class StorageHandler { public class StorageHandler implements AutoCloseable{
public StorageHandler(Properties properties){
// Inititalize configurations here RocksDB rocksDB;
Options rockDbOptions;
public StorageHandler(String dbpath) throws RocksDBException{
// Inititalize configurations here\
System.out.println("Creating RocksDB");
this.rockDbOptions = new Options();
rockDbOptions.setCreateIfMissing(true);
this.rocksDB = RocksDB.open(rockDbOptions,dbpath);
System.out.println("Created RocksDB");
}
public void close()
{
rocksDB.close();
rockDbOptions.close();
System.out.println("Closing RocksDB instance");
} }
public void create(){} public void create(byte[] key,byte[] value) throws RocksDBException{
public void read(){} rocksDB.put(key,value);
public void update(){} }
public void delete(){} public byte[] read(byte[] key)throws RocksDBException
{
return rocksDB.get(key);
}
public void update(byte[] key,byte[] value) throws RocksDBException
{
rocksDB.put(key,value);
}
public void delete(byte[] key) throws RocksDBException
{
rocksDB.delete(key);
}
} }
...@@ -13,6 +13,7 @@ public class Request implements DaRPCMessage{ ...@@ -13,6 +13,7 @@ public class Request implements DaRPCMessage{
private static char[] dst = new char[100]; private static char[] dst = new char[100];
@Override @Override
public int write(ByteBuffer buffer) throws IOException { public int write(ByteBuffer buffer) throws IOException {
System.out.println("Write request");
buffer.putInt(operationType); buffer.putInt(operationType);
buffer.asCharBuffer().put(key + ";" + value + ";"); buffer.asCharBuffer().put(key + ";" + value + ";");
return SERIALIZED_SIZE; return SERIALIZED_SIZE;
...@@ -20,13 +21,16 @@ public class Request implements DaRPCMessage{ ...@@ -20,13 +21,16 @@ public class Request implements DaRPCMessage{
@Override @Override
public void update(ByteBuffer buffer) throws IOException { public void update(ByteBuffer buffer) throws IOException {
System.out.println("update request");
operationType = buffer.getInt(); operationType = buffer.getInt();
buffer.asCharBuffer().get(dst, 0, 99); buffer.asCharBuffer().get(dst, 0, 99);
String s = new String(dst); String s = new String(dst);
s = s.trim(); s = s.trim();
String split[] = s.split(";"); String split[] = s.split(";");
key = split[0]; if(split.length >=1)
value = split[1]; key = split[0];
if(split.length >=2)
value = split[1];
} }
@Override @Override
......
...@@ -13,6 +13,7 @@ public class Response implements DaRPCMessage{ ...@@ -13,6 +13,7 @@ public class Response implements DaRPCMessage{
private static char[] dst = new char[100]; private static char[] dst = new char[100];
@Override @Override
public int write(ByteBuffer buffer) throws IOException { public int write(ByteBuffer buffer) throws IOException {
System.out.println("write response");
ack = buffer.getInt(); ack = buffer.getInt();
buffer.asCharBuffer().put(key + ";" + value + ";"); buffer.asCharBuffer().put(key + ";" + value + ";");
return SERIALIZED_SIZE; return SERIALIZED_SIZE;
...@@ -20,6 +21,7 @@ public class Response implements DaRPCMessage{ ...@@ -20,6 +21,7 @@ public class Response implements DaRPCMessage{
@Override @Override
public void update(ByteBuffer buffer) throws IOException { public void update(ByteBuffer buffer) throws IOException {
System.out.println("update response");
ack = buffer.getInt(); ack = buffer.getInt();
buffer.asCharBuffer().get(dst, 0, 99); buffer.asCharBuffer().get(dst, 0, 99);
String s = new String(dst); String s = new String(dst);
......
...@@ -7,6 +7,8 @@ import com.ibm.darpc.DaRPCServerEndpoint; ...@@ -7,6 +7,8 @@ import com.ibm.darpc.DaRPCServerEndpoint;
import com.ibm.darpc.DaRPCServerEvent; import com.ibm.darpc.DaRPCServerEvent;
import com.ibm.darpc.DaRPCService; import com.ibm.darpc.DaRPCService;
import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.handler.NetworkHandler; import hpdos_rdma_offloaded.handler.NetworkHandler;
import hpdos_rdma_offloaded.lib.Packet; import hpdos_rdma_offloaded.lib.Packet;
import hpdos_rdma_offloaded.rdma_protocol.Request; import hpdos_rdma_offloaded.rdma_protocol.Request;
...@@ -22,28 +24,56 @@ public class MetadataServerService extends RpcProtocol implements DaRPCService<R ...@@ -22,28 +24,56 @@ public class MetadataServerService extends RpcProtocol implements DaRPCService<R
@Override @Override
public void processServerEvent(DaRPCServerEvent<Request, Response> event) throws IOException { public void processServerEvent(DaRPCServerEvent<Request, Response> event) throws IOException {
// TODO Auto-generated method stub
Request request = event.getReceiveMessage(); Request request = event.getReceiveMessage();
Response response = event.getSendMessage(); Response response = event.getSendMessage();
if (request.getOperationType() == 100) { if (request.getOperationType() == 100 ||request.getOperationType() == 102 ) {
// Write code for create // Write code for create
Packet packet = new Packet(request.getOperationType(), request.getKey(), request.getValue()); Packet packet = new Packet(request.getOperationType(), request.getKey(), request.getValue());
try { try {
this.networkHandler.create(packet); this.networkHandler.create(packet);
response.setAct(0); response.setAct(0);
response.setKey(null); response.setKey(null);
response.setValue(null); response.setValue(null);
} catch (InterruptedException | ExecutionException e) { }catch(RocksDBException ex)
{
ex.printStackTrace();
response.setAct(1);
}
catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
else if(request.getOperationType() == 101) { else if(request.getOperationType() == 101) {
Packet packet = new Packet(request.getOperationType(), request.getKey(), "");
try
{
byte[] value = this.networkHandler.read(packet);
response.setAct(1);
response.setKey(packet.getKey());
response.setValue(new String(value));
}
catch(RocksDBException ex)
{
ex.printStackTrace();
response.setAct(1);
}catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// Write code for read // Write code for read
} }
else if(request.getOperationType() == 102){
// Write code for update
}
else if(request.getOperationType() == 103){ else if(request.getOperationType() == 103){
try{
Packet packet = new Packet(request.getOperationType(), request.getKey(), "");
this.networkHandler.delete(packet);
response.setAct(0);
}
catch(RocksDBException ex)
{
ex.printStackTrace();
response.setAct(1);
}catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// Write code for delete // Write code for delete
} }
event.triggerResponse(); event.triggerResponse();
...@@ -51,13 +81,11 @@ public class MetadataServerService extends RpcProtocol implements DaRPCService<R ...@@ -51,13 +81,11 @@ public class MetadataServerService extends RpcProtocol implements DaRPCService<R
@Override @Override
public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) { public void open(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
// TODO Auto-generated method stub
System.out.println("Received new connection..."); System.out.println("Received new connection...");
} }
@Override @Override
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) { public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
// TODO Auto-generated method stub
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment