Commit d16dbc01 authored by Paras Garg's avatar Paras Garg

Fixed Issue with rocksdb and converted to byte[]

parent 1180756d
How to setup RocksDB jar
1. Compiling RocksDb java from source code
make -j8 rocksjava DEBUG_LEVEL=0
2. cd java/target
3. install jar to mavenLocal Repo
Ex: used on nic
mvn install:install-file -Dfile=./rocksdbjni-6.15.5-linux64.jar -DgroupId=com.hpdos.rocksdb -DartifactId=rocksdb -Dversion=1.0 -Dpackaging=jar -DgeneratePom=true
Ex: Used on host
mvn install:install-file -Dfile=./rocksdbjni-6.23.0-linux64.jar -DgroupId=com.hpdos.rocksdb -DartifactId=rocksdb -Dversion=1.0 -Dpackaging=jar -DgeneratePom=true
4. import this jar in as maven dependency
implementation group: 'com.hpdos.rocksdb', name: 'rocksdb', version: '1.0'
...@@ -8,7 +8,7 @@ app.rdma_max_inline=0 ...@@ -8,7 +8,7 @@ app.rdma_max_inline=0
app.rdma_server_port=1920 app.rdma_server_port=1920
app.NETWORK_HANDLER_THREADS=10 app.NETWORK_HANDLER_THREADS=10
app.REPLICATION_HANDLER_THREADS=10 app.REPLICATION_HANDLER_THREADS=10
app.db_path=/home/ub-05/paras/rocks app.db_path=/tmp/rocks
#The below properties are for master node only. #The below properties are for master node only.
app.follower_registration_port=9876 app.follower_registration_port=9876
......
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java Library project to get you started.
* For more details take a look at the Java Libraries chapter in the Gradle
* User Manual available at https://docs.gradle.org/6.0/userguide/java_library_plugin.html
*/
// plugins {
// // Apply the java-library plugin to add support for Java Library
// id 'java-library'
// id 'application'
// }
// repositories {
// // Use jcenter for resolving dependencies.
// // You can declare any Maven/Ivy/file repository here.
// jcenter()
// mavenCentral()
// }
// application {
// // Define the main class for the application.
// mainClass = 'MetadataClient'
// }
// dependencies {
// // This dependency is exported to consumers, that is to say found on their compile classpath.
// api 'org.apache.commons:commons-math3:3.6.1'
// // This dependency is used internally, and not exposed to consumers on their own compile classpath.
// implementation 'com.google.guava:guava:28.0-jre'
// implementation 'com.ibm.darpc:darpc:1.9'
// // Use JUnit test framework
// testImplementation 'junit:junit:4.12'
// }
apply plugin: "application" apply plugin: "application"
apply plugin: "java-library" apply plugin: "java-library"
mainClassName = "hpdos_rdma_offloaded.MetadataServer" application
{
mainClassName = "hpdos_rdma_offloaded.MetadataServer"
}
version '1.0-SNAPSHOT' version '1.0-SNAPSHOT'
repositories { repositories
{
mavenLocal()
mavenCentral() mavenCentral()
jcenter() jcenter()
} }
dependencies { dependencies
{
testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'junit', name: 'junit', version: '4.12'
api 'org.apache.commons:commons-math3:3.6.1' api 'org.apache.commons:commons-math3:3.6.1'
// This dependency is used internally, and not exposed to consumers on their own compile classpath.
implementation 'com.google.guava:guava:28.0-jre' implementation 'com.google.guava:guava:28.0-jre'
implementation 'com.ibm.darpc:darpc:1.9' implementation 'com.ibm.darpc:darpc:1.9'
implementation 'com.github.ben-manes.caffeine:caffeine:3.0.4' implementation 'com.github.ben-manes.caffeine:caffeine:3.0.4'
// https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni implementation group: 'com.hpdos.rocksdb', name: 'rocksdb', version: '1.0'
implementation group: 'org.rocksdb', name: 'rocksdbjni', version: '5.8.0'
// Use JUnit test framework
// testImplementation 'junit:junit:4.12'
} }
\ No newline at end of file
...@@ -15,10 +15,9 @@ import org.rocksdb.RocksDBException; ...@@ -15,10 +15,9 @@ import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.handler.NetworkHandler; import hpdos_rdma_offloaded.handler.NetworkHandler;
import hpdos_rdma_offloaded.handler.RegistrationHandler; import hpdos_rdma_offloaded.handler.RegistrationHandler;
import hpdos_rdma_offloaded.handler.ReplicationHandler;
import hpdos_rdma_offloaded.handler.StorageHandler; import hpdos_rdma_offloaded.handler.StorageHandler;
import hpdos_rdma_offloaded.rdma_protocol.Request; import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.rdma_protocol.Response; import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.service.MetadataServerService; import hpdos_rdma_offloaded.service.MetadataServerService;
public class MetadataServer implements Runnable{ public class MetadataServer implements Runnable{
...@@ -30,6 +29,7 @@ public class MetadataServer implements Runnable{ ...@@ -30,6 +29,7 @@ public class MetadataServer implements Runnable{
private boolean polling; private boolean polling;
private int maxinline = 0; private int maxinline = 0;
private int rdma_port; private int rdma_port;
private long[] clusterAffinities;
boolean isMaster; boolean isMaster;
StorageHandler storageHandler; StorageHandler storageHandler;
...@@ -38,19 +38,24 @@ public class MetadataServer implements Runnable{ ...@@ -38,19 +38,24 @@ public class MetadataServer implements Runnable{
RegistrationHandler registrationHandler = null; RegistrationHandler registrationHandler = null;
public final static Properties properties= new Properties();; public final static Properties properties= new Properties();;
public MetadataServer(Properties properties) throws IOException,RocksDBException { public MetadataServer(String propertyPath) throws IOException,RocksDBException {
this.isMaster = Boolean.valueOf((String)properties.get("app.MASTER")); InputStream inputStream = new FileInputStream(propertyPath);
this.storageHandler = new StorageHandler(properties.getProperty("app.db_path")); properties.load(inputStream);
this.networkHandler = new NetworkHandler(this.storageHandler, properties); boolean isMaster = Boolean.valueOf((String)properties.get("app.MASTER"));
this.metadataServerService = new MetadataServerService(this.networkHandler); if(isMaster)
// if (this.isMaster) { {
this.registrationHandler = new RegistrationHandler(properties); System.out.println("STARTING MASTER SERVICES.");
// }
} }
else
public void run()
{ {
System.out.println("STARTING FOLLOWER SERVICES.");
}
isMaster = Boolean.valueOf((String)properties.get("app.MASTER"));
storageHandler = new StorageHandler(properties.getProperty("app.db_path"));
networkHandler = new NetworkHandler(this.storageHandler, properties);
metadataServerService = new MetadataServerService(this.networkHandler);
registrationHandler = new RegistrationHandler(properties);
host = (String) properties.get("app.HOST"); host = (String) properties.get("app.HOST");
poolsize = Integer.valueOf( (String)properties.get("app.cpu_affinity")); poolsize = Integer.valueOf( (String)properties.get("app.cpu_affinity"));
recvQueue = Integer.valueOf( (String)properties.get("app.rdma_receive_queue")); recvQueue = Integer.valueOf( (String)properties.get("app.rdma_receive_queue"));
...@@ -59,65 +64,45 @@ public class MetadataServer implements Runnable{ ...@@ -59,65 +64,45 @@ public class MetadataServer implements Runnable{
maxinline = Integer.valueOf((String) properties.get("app.rdma_max_inline")); maxinline = Integer.valueOf((String) properties.get("app.rdma_max_inline"));
rdma_port = Integer.valueOf((String) properties.get("app.rdma_server_port")); rdma_port = Integer.valueOf((String) properties.get("app.rdma_server_port"));
polling = Boolean.valueOf((String) properties.get("app.rdma_polling")); polling = Boolean.valueOf((String) properties.get("app.rdma_polling"));
long[] clusterAffinities = new long[poolsize]; clusterAffinities = new long[poolsize];
for (int i = 0; i < poolsize; i++){ for (int i = 0; i < poolsize; i++)
{
long cpu = 1L << i; long cpu = 1L << i;
clusterAffinities[i] = cpu; clusterAffinities[i] = cpu;
} }
}
public void run(){
// MetadataServerService rpcService = new MetadataServerService(); // MetadataServerService rpcService = new MetadataServerService();
DaRPCServerGroup<Request, Response> group = null; DaRPCServerGroup<Request, Response> group = null;
try { try
group = DaRPCServerGroup.createServerGroup(this.metadataServerService, clusterAffinities, -1, maxinline, polling, recvQueue, sendQueue, wqSize, 32); {
} catch (Exception e) { group = DaRPCServerGroup.createServerGroup(this.metadataServerService, this.clusterAffinities, -1, maxinline, polling, recvQueue, sendQueue, wqSize, 32);
e.printStackTrace();
};
RdmaServerEndpoint<DaRPCServerEndpoint<Request, Response>> serverEp = null; RdmaServerEndpoint<DaRPCServerEndpoint<Request, Response>> serverEp = null;
try {
serverEp = group.createServerEndpoint(); serverEp = group.createServerEndpoint();
} catch (IOException e) {
e.printStackTrace();
}
InetSocketAddress address = new InetSocketAddress(host, rdma_port); InetSocketAddress address = new InetSocketAddress(host, rdma_port);
try {
serverEp.bind(address, 100); serverEp.bind(address, 100);
} catch (Exception e) {
e.printStackTrace();
}
while(true){ while(true){
try { try
{
// System.out.println("Listening to RDMA requests a, IP: " + host + " , PORT: " + rdma_port); // System.out.println("Listening to RDMA requests a, IP: " + host + " , PORT: " + rdma_port);
serverEp.accept(); serverEp.accept();
// System.out.println("Accepted connection."); // System.out.println("Accepted connection.");
} catch (IOException e) {
e.printStackTrace();
}
} }
} catch (IOException e)
public static void main(String[] args) {
try
{ {
RocksDB.loadLibrary(); e.printStackTrace();
InputStream inputStream = new FileInputStream(args[0]);
properties.load(inputStream);
boolean isMaster = Boolean.valueOf((String)properties.get("app.MASTER"));
if (isMaster) {
System.out.println("STARTING MASTER SERVICES.");
} else {
System.out.println("STARTING FOLLOWER SERVICES.");
} }
new MetadataServer(properties).run();
} }
catch(RocksDBException ex) }catch(Exception e)
{ {
System.out.println("Rocksdb Exception occur"); e.printStackTrace();
ex.printStackTrace();
} }
catch(Exception ex)
{
System.out.println("Exception Occur");
ex.printStackTrace();
} }
public static void main(String[] args) throws Exception
{
RocksDB.loadLibrary();
new MetadataServer(args[0]).run();
System.out.println("Rocksdb Exception occur");
} }
} }
package hpdos_rdma_offloaded.handler; package hpdos_rdma_offloaded.handler;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
...@@ -21,9 +20,10 @@ import org.rocksdb.RocksDBException; ...@@ -21,9 +20,10 @@ import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.lib.Follower; import hpdos_rdma_offloaded.lib.Follower;
import hpdos_rdma_offloaded.lib.Packet; import hpdos_rdma_offloaded.lib.Packet;
import hpdos_rdma_offloaded.rdma_protocol.Request; import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.rdma_protocol.Response; import hpdos_rdma_offloaded.protocol.RequestType;
import hpdos_rdma_offloaded.rdma_protocol.RpcProtocol; import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol;
public class NetworkHandler { public class NetworkHandler {
StorageHandler storageHandler; StorageHandler storageHandler;
...@@ -73,10 +73,10 @@ public class NetworkHandler { ...@@ -73,10 +73,10 @@ public class NetworkHandler {
// Change the Futre to CompletableFuture to achieve correct asynchronousness // Change the Futre to CompletableFuture to achieve correct asynchronousness
public void create(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{ public void create(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{
System.out.println("Received create request for key/value: " + packet.getKey() + "/" + packet.getValue()); System.out.println("Received create request for key/value: " + new String(packet.getKey()) + "/" + new String(packet.getValue()));
storageHandler.create(packet.getKey().getBytes(), packet.getValue().getBytes()); storageHandler.create(packet.getKey(), packet.getValue());
if (this.isMaster)
if (this.isMaster) { {
System.out.println("Starting replication"); System.out.println("Starting replication");
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
Response replicationResponse = new Response(); Response replicationResponse = new Response();
...@@ -90,11 +90,12 @@ public class NetworkHandler { ...@@ -90,11 +90,12 @@ public class NetworkHandler {
} }
public byte[] read(Packet packet) throws RocksDBException,InterruptedException,ExecutionException{ public byte[] read(Packet packet) throws RocksDBException,InterruptedException,ExecutionException{
System.out.println("Received read request for key/value: " + packet.getKey() + "/" + packet.getValue()); System.out.println("Received read request for key/value: " + new String(packet.getKey()));
byte[] value = storageHandler.read(packet.getKey().getBytes()); byte[] value = storageHandler.read(packet.getKey());
System.out.println("got value "+ new String(value)); if(value != null)
if (this.isMaster) { System.out.println("Got value "+ new String(value));
if (this.isMaster)
{
System.out.println("Starting replication"); System.out.println("Starting replication");
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
Response replicationResponse = new Response(); Response replicationResponse = new Response();
...@@ -109,7 +110,7 @@ public class NetworkHandler { ...@@ -109,7 +110,7 @@ public class NetworkHandler {
} }
public void update(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{ public void update(Packet packet) throws InterruptedException, ExecutionException,RocksDBException{
this.storageHandler.update(packet.getKey().getBytes(), packet.getValue().getBytes()); this.storageHandler.update(packet.getKey(), packet.getValue());
if (isMaster) { if (isMaster) {
Response response = new Response(); Response response = new Response();
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
...@@ -135,7 +136,7 @@ public class NetworkHandler { ...@@ -135,7 +136,7 @@ public class NetworkHandler {
// To implement delete // To implement delete
public void delete(Packet packet) throws RocksDBException,InterruptedException,ExecutionException public void delete(Packet packet) throws RocksDBException,InterruptedException,ExecutionException
{ {
this.storageHandler.delete(packet.getKey().getBytes()); this.storageHandler.delete(packet.getKey());
if (this.isMaster) { if (this.isMaster) {
System.out.println("Starting replication"); System.out.println("Starting replication");
Future<Response> futureReplication = this.executorService.submit(()->{ Future<Response> futureReplication = this.executorService.submit(()->{
...@@ -149,17 +150,15 @@ public class NetworkHandler { ...@@ -149,17 +150,15 @@ public class NetworkHandler {
} }
} }
public void sendInvalidationRequest(Packet packet) throws InterruptedException, ExecutionException{ public void sendInvalidationRequest(Packet packet) throws InterruptedException, ExecutionException{
Set<Callable<Response>> callables = new HashSet<>(); Set<Callable<Response>> callables = new HashSet<>();
for (DaRPCStream<Request, Response> stream: streams.values()) { for (DaRPCStream<Request, Response> stream: streams.values()) {
callables.add(() -> { callables.add(() -> {
Request request = new Request(); Request request = new Request();
Response response = new Response(); Response response = new Response();
request.setOperationType(104); request.setRequestType(RequestType.INVALIDATE);
request.setKey(""); request.setKey(null);
request.setValue(""); request.setValue(null);
response = stream.request(request, response, false).get(); response = stream.request(request, response, false).get();
return response; return response;
}); });
......
...@@ -7,13 +7,8 @@ import java.lang.ClassNotFoundException; ...@@ -7,13 +7,8 @@ import java.lang.ClassNotFoundException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import hpdos_rdma_offloaded.lib.Follower;
public class RegistrationHandler { public class RegistrationHandler {
public ServerSocket server; public ServerSocket server;
......
...@@ -12,10 +12,9 @@ import java.util.concurrent.Future; ...@@ -12,10 +12,9 @@ import java.util.concurrent.Future;
import com.ibm.darpc.DaRPCFuture; import com.ibm.darpc.DaRPCFuture;
import hpdos_rdma_offloaded.rdma_protocol.*;
import hpdos_rdma_offloaded.lib.Follower; import hpdos_rdma_offloaded.lib.Follower;
import hpdos_rdma_offloaded.lib.Packet; import hpdos_rdma_offloaded.lib.Packet;
import hpdos_rdma_offloaded.protocol.*;
public class ReplicationHandler { public class ReplicationHandler {
...@@ -35,7 +34,7 @@ public class ReplicationHandler { ...@@ -35,7 +34,7 @@ public class ReplicationHandler {
Response response = new Response(); Response response = new Response();
request.setKey(packet.getKey()); request.setKey(packet.getKey());
request.setValue(packet.getValue()); request.setValue(packet.getValue());
request.setOperationType(packet.getOperationType()); request.setRequestType(packet.getOperationType());
DaRPCFuture<Request, Response> future = follower.getStream().request(request, response, false); DaRPCFuture<Request, Response> future = follower.getStream().request(request, response, false);
return future; return future;
}); });
......
...@@ -9,8 +9,8 @@ public class StorageHandler implements AutoCloseable{ ...@@ -9,8 +9,8 @@ public class StorageHandler implements AutoCloseable{
RocksDB rocksDB; RocksDB rocksDB;
Options rockDbOptions; Options rockDbOptions;
public StorageHandler(String dbpath) throws RocksDBException{ public StorageHandler(String dbpath) throws RocksDBException
// Inititalize configurations here\ {
System.out.println("Creating RocksDB"); System.out.println("Creating RocksDB");
this.rockDbOptions = new Options(); this.rockDbOptions = new Options();
rockDbOptions.setCreateIfMissing(true); rockDbOptions.setCreateIfMissing(true);
......
package hpdos_rdma_offloaded.lib; package hpdos_rdma_offloaded.lib;
import com.ibm.darpc.DaRPCStream; import com.ibm.darpc.DaRPCStream;
import hpdos_rdma_offloaded.rdma_protocol.*;
import hpdos_rdma_offloaded.protocol.*;
public class Follower { public class Follower {
String ipAddress; String ipAddress;
......
package hpdos_rdma_offloaded.lib; package hpdos_rdma_offloaded.lib;
public class Packet { public class Packet {
String key; private byte[] key;
String value; private byte[] value;
Integer operationType; private int operationType;
public Packet(Integer operationType, String key, String value){ public Packet(int operationType, byte[] key, byte[] value)
{
this.key = key; this.key = key;
this.value = value; this.value = value;
this.operationType = operationType; this.operationType = operationType;
} }
public String getKey(){ public byte[] getKey()
{
return this.key; return this.key;
} }
public String getValue(){ public byte[] getValue()
{
return this.value; return this.value;
} }
public Integer getOperationType(){ public Integer getOperationType()
{
return this.operationType; return this.operationType;
} }
} }
package hpdos_rdma_offloaded.protocol;
public interface AckType
{
public static int SUCCESS = 0;
public static int NOTFOUND = 1;
public static int NOTALLOWED = 2;
public static int DBFAILED = 3;
public static int SUCCESS_WITH_VALUE = 4;
}
\ No newline at end of file
package hpdos_rdma_offloaded.protocol;
import java.io.IOException;
import java.nio.ByteBuffer;
import com.ibm.darpc.DaRPCMessage;
public class Request implements DaRPCMessage
{
private static int SERIALIZED_SIZE = 260;
public int requestType;
public byte[] key;
public byte[] value;
@Override
public int write(ByteBuffer buffer) throws IOException
{
System.out.println("Request Write Method");
buffer.putInt(requestType);
buffer.put(key);
//if operation type is get and delete then value is not required
if(requestType == RequestType.PUT)
{
buffer.put(value);
//size of key+value+operationType
return 4+key.length+value.length;
}
//else return size of Key+operationType
return 4+key.length;
}
@Override
public void update(ByteBuffer buffer) throws IOException
{
System.out.println("Request update method");
requestType = buffer.getInt();
if(key == null || key.length != 128)
this.key = new byte[128];
buffer.get(key);
if(requestType == RequestType.PUT)
{
if(value == null || value.length != 128)
this.value = new byte[128];
buffer.get(value);
}
}
@Override
public int size() {
//This method gives max size of request
return SERIALIZED_SIZE;
}
public byte[] getKey() {
return key;
}
public void setKey(byte[] key) {
this.key = key;
}
public byte[] getValue() {
return value;
}
public void setValue(byte[] value) {
this.value = value;
}
public int getRequestType() {
return requestType;
}
public void setRequestType(int requestType) {
this.requestType = requestType;
}
}
package hpdos_rdma_offloaded.protocol;
public interface RequestType
{
public static int GET =101;
public static int PUT =100;
public static int DELETE =102;
public static int INVALIDATE = 103;
};
package hpdos_rdma_offloaded.rdma_protocol; package hpdos_rdma_offloaded.protocol;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
...@@ -6,29 +6,35 @@ import java.nio.ByteBuffer; ...@@ -6,29 +6,35 @@ import java.nio.ByteBuffer;
import com.ibm.darpc.DaRPCMessage; import com.ibm.darpc.DaRPCMessage;
public class Response implements DaRPCMessage{ public class Response implements DaRPCMessage{
public static int SERIALIZED_SIZE = 256; public static int SERIALIZED_SIZE = 260;
private int ack; private int ack;
public String key; public byte[] key;
public String value; public byte[] value;
private static char[] dst = new char[100];
@Override @Override
public int write(ByteBuffer buffer) throws IOException { public int write(ByteBuffer buffer) throws IOException
System.out.println("write response"); {
ack = buffer.getInt(); System.out.println("Response write Method");
buffer.asCharBuffer().put(key + ";" + value + ";"); buffer.putInt(ack);
return SERIALIZED_SIZE; if(ack == AckType.SUCCESS_WITH_VALUE)
{
buffer.put(value);
return 4+value.length;
}
return 4;
} }
@Override @Override
public void update(ByteBuffer buffer) throws IOException { public void update(ByteBuffer buffer) throws IOException
System.out.println("update response"); {
System.out.println("esponse update method");
ack = buffer.getInt(); ack = buffer.getInt();
buffer.asCharBuffer().get(dst, 0, 99); if(ack == AckType.SUCCESS_WITH_VALUE)
String s = new String(dst); {
s = s.trim(); if(value == null || value.length <128)
String split[] = s.split(";"); value = new byte[128];
key = split[0]; buffer.get(value);
value = split[1]; }
} }
@Override @Override
...@@ -41,23 +47,23 @@ public class Response implements DaRPCMessage{ ...@@ -41,23 +47,23 @@ public class Response implements DaRPCMessage{
return ack; return ack;
} }
public void setAct(int ack){ public void setAck(int ack){
this.ack = ack; this.ack = ack;
} }
public String getKey() { public byte[] getKey() {
return key; return key;
} }
public void setKey(String key) { public void setKey(byte[] key) {
this.key = key; this.key = key;
} }
public String getValue() { public byte[] getValue() {
return value; return value;
} }
public void setValue(String value) { public void setValue(byte[] value) {
this.value = value; this.value = value;
} }
} }
package hpdos_rdma_offloaded.rdma_protocol; package hpdos_rdma_offloaded.protocol;
import com.ibm.darpc.DaRPCProtocol; import com.ibm.darpc.DaRPCProtocol;
......
package hpdos_rdma_offloaded.rdma_protocol;
import java.io.IOException;
import java.nio.ByteBuffer;
import com.ibm.darpc.DaRPCMessage;
public class Request implements DaRPCMessage{
public int operationType;
public String key;
public String value;
private int SERIALIZED_SIZE = 256;
private static char[] dst = new char[100];
@Override
public int write(ByteBuffer buffer) throws IOException {
System.out.println("Write request");
buffer.putInt(operationType);
buffer.asCharBuffer().put(key + ";" + value + ";");
return SERIALIZED_SIZE;
}
@Override
public void update(ByteBuffer buffer) throws IOException {
System.out.println("update request");
operationType = buffer.getInt();
buffer.asCharBuffer().get(dst, 0, 99);
String s = new String(dst);
s = s.trim();
String split[] = s.split(";");
if(split.length >=1)
key = split[0];
if(split.length >=2)
value = split[1];
}
@Override
public int size() {
return SERIALIZED_SIZE;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public int getOperationType() {
return operationType;
}
public void setOperationType(int operationType) {
this.operationType = operationType;
}
}
...@@ -11,9 +11,11 @@ import org.rocksdb.RocksDBException; ...@@ -11,9 +11,11 @@ import org.rocksdb.RocksDBException;
import hpdos_rdma_offloaded.handler.NetworkHandler; import hpdos_rdma_offloaded.handler.NetworkHandler;
import hpdos_rdma_offloaded.lib.Packet; import hpdos_rdma_offloaded.lib.Packet;
import hpdos_rdma_offloaded.rdma_protocol.Request; import hpdos_rdma_offloaded.protocol.AckType;
import hpdos_rdma_offloaded.rdma_protocol.Response; import hpdos_rdma_offloaded.protocol.Request;
import hpdos_rdma_offloaded.rdma_protocol.RpcProtocol; import hpdos_rdma_offloaded.protocol.RequestType;
import hpdos_rdma_offloaded.protocol.Response;
import hpdos_rdma_offloaded.protocol.RpcProtocol;
public class MetadataServerService extends RpcProtocol implements DaRPCService<Request, Response>{ public class MetadataServerService extends RpcProtocol implements DaRPCService<Request, Response>{
NetworkHandler networkHandler; NetworkHandler networkHandler;
...@@ -26,51 +28,60 @@ public class MetadataServerService extends RpcProtocol implements DaRPCService<R ...@@ -26,51 +28,60 @@ public class MetadataServerService extends RpcProtocol implements DaRPCService<R
public void processServerEvent(DaRPCServerEvent<Request, Response> event) throws IOException { public void processServerEvent(DaRPCServerEvent<Request, Response> event) throws IOException {
Request request = event.getReceiveMessage(); Request request = event.getReceiveMessage();
Response response = event.getSendMessage(); Response response = event.getSendMessage();
if (request.getOperationType() == 100 ||request.getOperationType() == 102 ) { if (request.getRequestType() == RequestType.PUT) {
// Write code for create // Write code for create
Packet packet = new Packet(request.getOperationType(), request.getKey(), request.getValue()); Packet packet = new Packet(request.getRequestType(), request.getKey(), request.getValue());
try { try
{
this.networkHandler.create(packet); this.networkHandler.create(packet);
response.setAct(0); response.setAck(AckType.SUCCESS);
response.setKey(null); response.setKey(null);
response.setValue(null); response.setValue(null);
}catch(RocksDBException ex) }
catch(RocksDBException ex)
{ {
ex.printStackTrace(); ex.printStackTrace();
response.setAct(1); response.setAck(AckType.DBFAILED);
} }
catch (InterruptedException | ExecutionException e) { catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
else if(request.getOperationType() == 101) { else if(request.getRequestType() == RequestType.GET) {
Packet packet = new Packet(request.getOperationType(), request.getKey(), ""); Packet packet = new Packet(request.getRequestType(), request.getKey(), null);
try try
{ {
byte[] value = this.networkHandler.read(packet); byte[] value = this.networkHandler.read(packet);
response.setAct(1); if(value == null)
response.setKey(packet.getKey()); {
response.setValue(new String(value)); response.setAck(AckType.NOTFOUND);
}
else
{
response.setAck(AckType.SUCCESS_WITH_VALUE);
response.setValue(value);
}
} }
catch(RocksDBException ex) catch(RocksDBException ex)
{ {
ex.printStackTrace(); ex.printStackTrace();
response.setAct(1); response.setAck(AckType.DBFAILED);
}catch (InterruptedException | ExecutionException e) { }
catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
// Write code for read // Write code for read
} }
else if(request.getOperationType() == 103){ else if(request.getRequestType() == RequestType.DELETE){
try{ try{
Packet packet = new Packet(request.getOperationType(), request.getKey(), ""); Packet packet = new Packet(request.getRequestType(), request.getKey(), null);
this.networkHandler.delete(packet); this.networkHandler.delete(packet);
response.setAct(0); response.setAck(AckType.SUCCESS);
} }
catch(RocksDBException ex) catch(RocksDBException ex)
{ {
ex.printStackTrace(); ex.printStackTrace();
response.setAct(1); response.setAck(AckType.DBFAILED);
}catch (InterruptedException | ExecutionException e) { }catch (InterruptedException | ExecutionException e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -86,7 +97,7 @@ public class MetadataServerService extends RpcProtocol implements DaRPCService<R ...@@ -86,7 +97,7 @@ public class MetadataServerService extends RpcProtocol implements DaRPCService<R
@Override @Override
public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) { public void close(DaRPCServerEndpoint<Request, Response> rpcClientEndpoint) {
System.out.println("Closing Connection");
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment