Commit 7e25adec authored by Paras Garg's avatar Paras Garg

fixed bugs in new messageformat

parent 94b24420
...@@ -41,12 +41,15 @@ clean: ...@@ -41,12 +41,15 @@ clean:
count: count:
find . -type f -name "*.hpp"|xargs wc -l find . -type f -name "*.hpp"|xargs wc -l
find . -type f -name "*.cpp"|xargs wc -l find . -type f -name "*.cpp"|xargs wc -l
find YCSB/hpdos -type f -name "*.java"| xargs wc -l
find jsrc/ -type f -name "*.java" | xargs wc -l
@echo "Lines of code counted!" @echo "Lines of code counted!"
Jclient: $(OBJS) Jclient: $(OBJS)
$(CXX) -o libhpdosclient.so -L/usr/local/lib -shared $^ $(LIBS) $(CXX) -o libhpdosclient.so -L/usr/local/lib -shared $^ $(LIBS)
@echo "jclient "$<" successfully!" @echo "jclient "$<" successfully!"
sudo rm -f /usr/lib/libhpdosclient.so
sudo cp libhpdosclient.so /usr/lib sudo cp libhpdosclient.so /usr/lib
@echo "Copied libhpdosclient.so to /usr/lib" @echo "Copied libhpdosclient.so to /usr/lib"
#jcompile: javac $(JSRCS) -d JBUILD #jcompile: javac $(JSRCS) -d JBUILD
......
...@@ -5,15 +5,17 @@ Steps to build jni client ...@@ -5,15 +5,17 @@ Steps to build jni client
Running YSCB Running YSCB
> mvn compile <br> > mvn compile <br>
add .build to make
./bin/ycsb load hpdos -P workloads/workloadb -threads 1 ./bin/ycsb load hpdos -P workloads/workloadb -threads 1
mvn -pl site.ycsb:hpdos-binding -am clean package -Dcheckstyle.skip mvn -pl site.ycsb:hpdos-binding -am clean package -Dcheckstyle.skip
https://medium.com/@pirogov.alexey/gdb-debug-native-part-of-java-application-c-c-libraries-and-jdk-6593af3b4f3f
to do to do
delete client endpoint on close delete client endpoint on close
threading in client and hashing in client threading in client and hashing in client
add more code for success failure not found etc server error client error
add cache add support for invalidation add cache add support for invalidation
interface client API through endpointGroup interface client API through endpointGroup
...@@ -29,4 +31,29 @@ Options: ...@@ -29,4 +31,29 @@ Options:
-p key=value Override workload property -p key=value Override workload property
-s Print status to stderr -s Print status to stderr
-target n Target ops/sec (default: unthrottled) -target n Target ops/sec (default: unthrottled)
-threads n Number of client threads (default: 1) -threads n Number of client threads (default: 1)
\ No newline at end of file
# Steps to configure spdk target and linux initiator
Build spdk using
https://spdk.io/doc/nvmf.html
sudo systemctl restart spdknvmeof.service
if error then
1. cd spdk
2. ./scripts/setup.sh
./rpccommands.sh
nvme discover -t rdma -a 192.168.100.8 -s 4420
sudo nvme connect -t rdma -n "nqn.2021-08.hpdos:ub08" -a 192.168.200.40 -s 4420
sudo nvme disconnect -n "nqn.2021-08.hpdos:ub08"
TO check whether file system is mounted
sudo file -s /dev/nvme0n1
else mount a file sytem
sudo mkfs -t xfs /dev/nvme0n1
sudo mkdir /data
sudo mkdir /data
sudo mount /dev/nvme1n1 /data
https://www.hyper-v.io/nvme-part-1-linux-nvme-initiator-linux-spdk-nvmf-target/
\ No newline at end of file
package site.ycsb.db;
import site.ycsb.DB; import site.ycsb.DB;
import site.ycsb.DBException; import site.ycsb.DBException;
import site.ycsb.ByteArrayByteIterator;
import site.ycsb.ByteIterator; import site.ycsb.ByteIterator;
import site.ycsb.Status; import site.ycsb.Status;
import site.ycsb.StringByteIterator; import site.ycsb.StringByteIterator;
...@@ -9,40 +10,57 @@ import site.ycsb.StringByteIterator; ...@@ -9,40 +10,57 @@ import site.ycsb.StringByteIterator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.*; import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
public class HpdosClient extends DB { public class HpdosClient extends DB {
// private static final Logger LOGGER = private static final Logger LOGGER = LoggerFactory.getLogger(HpdosClient.class);
// LoggerFactory.getLogger(RocksDBClient.class);
public HpdosClient() {
public HpdosClient() {
} }
long endpointGroup; JClient jclient;
public void init() throws DBException { public void init() throws DBException {
System.out.println(System.getProperty("user.dir"));
Properties prop = getProperties(); Properties prop = getProperties();
prop.forEach((k, v) -> System.out.println(k + "" + v)); prop.forEach((k, v) -> System.out.println(k + "" + v));
System.out.println("init hpdos"); System.out.println("init hpdos");
synchronized (HpdosClient.class) { jclient = JClient.getInstance();
if (endpointGroup == 0) System.out.println("jclient"+jclient);
endpointGroup++; //synchronized (HpdosClient.class) {
} // if (jclient == null) {
System.out.println("endpoit" + endpointGroup); // System.loadLibrary("hpdosclient");
// jclient = new JClient();
// jclient.endpointGroup = jclient.createEndpointGroup("../prop.config");
// }
//}
LOGGER.info("endpoint" + jclient.endpointGroup);
} }
public void cleanup() throws DBException { public void cleanup() throws DBException {
jclient.closeEndpointGroup(jclient.endpointGroup);
LOGGER.info("closing");
} }
// Read a single record // Read a single record
public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) { public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) {
System.out.println("Read" + key); //System.out.println("Read" + key);
byte res[] = jclient.get(jclient.endpointGroup, key.getBytes());
if (res == null)
return Status.NOT_FOUND;
if(fields == null)
{
return Status.OK;
}
Iterator<String> it = fields.iterator();
if (it.hasNext())
result.put(fields.iterator().next(), new ByteArrayByteIterator(res, 0, res.length));
return Status.OK; return Status.OK;
} }
...@@ -54,7 +72,28 @@ public class HpdosClient extends DB { ...@@ -54,7 +72,28 @@ public class HpdosClient extends DB {
// Update a single record // Update a single record
public Status update(String table, String key, Map<String, ByteIterator> values) { public Status update(String table, String key, Map<String, ByteIterator> values) {
System.out.println("update" + key); //System.out.println("update" + key);
try {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
for (ByteIterator v : values.values()) {
try {
stream.write(v.toArray());
} catch (IOException ex) {
LOGGER.error(ex.getMessage());
return Status.ERROR;
}
}
int val = jclient.put(jclient.endpointGroup, key.getBytes(), stream.toByteArray());
if (val == -1)
{
System.out.println("got -1\n");
return Status.BAD_REQUEST;
}
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
return Status.ERROR;
}
return Status.OK; return Status.OK;
} }
...@@ -65,6 +104,9 @@ public class HpdosClient extends DB { ...@@ -65,6 +104,9 @@ public class HpdosClient extends DB {
// Delete a single record // Delete a single record
public Status delete(String table, String key) { public Status delete(String table, String key) {
int status = jclient.delete(jclient.endpointGroup, key.getBytes());
if (status == -1 )
return Status.ERROR;
return Status.OK; return Status.OK;
} }
......
public class JClient {
static {
System.loadLibrary("hpdosclient");
}
public static JClient jclient = null;
public long endpointGroup;
private JClient() {
endpointGroup = createEndpointGroup("../prop.config");
}
public static JClient getInstance() {
synchronized(JClient.class)
{
if (jclient == null) {
System.out.println("creating jclient");
jclient = new JClient();
}
}
return jclient;
}
public native long createEndpointGroup(String file);
public native void closeEndpointGroup(long endpointGroup);
public native int put(long group, byte[] key, byte[] value);
public native byte[] get(long group, byte[] key);
public native int delete(long group, byte[] key);
}
package site.ycsb.db;
public class JNIClient {
public static long endpointGroup;
public long client;
public native int put(long client, byte[] key, byte[] value);
public native byte[] get(long client, byte[] key);
public native int delete(long client, byte[] key);
public native long createEndpointGroup(int sendQSize, int recvQSize, int compQSize,
int sendMsqSize, int recvMsgSize, int maxInLine, int timeout);
public native long createClient(long endpointGroup);
public native void closeClient(long client);
public native void closeEndpointGroup(long endpointGroup);
}
# Copyright (c) 2010 Yahoo! Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License. See accompanying
# LICENSE file.
# Yahoo! Cloud System Benchmark
# Workload A: Update heavy workload
# Application example: Session store recording recent actions
#
# Read/update ratio: 50/50
# Default data size: 1 KB records (10 fields, 100 bytes each, plus key)
# Request distribution: zipfian
recordcount=200
operationcount=200
fieldcount=1
fieldlength=200
workload=site.ycsb.workloads.CoreWorkload
readallfields=true
readproportion=0.5
updateproportion=0.5
# requestdistribution
# readmodifywriteproportion
scanproportion=0
insertproportion=0
requestdistribution=zipfian
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
# Default data size: 1 KB records (10 fields, 100 bytes each, plus key) # Default data size: 1 KB records (10 fields, 100 bytes each, plus key)
# Request distribution: zipfian # Request distribution: zipfian
recordcount=1000 recordcount=200
operationcount=1000 operationcount=200
workload=site.ycsb.workloads.CoreWorkload workload=site.ycsb.workloads.CoreWorkload
readallfields=true readallfields=true
......
...@@ -7,6 +7,22 @@ ...@@ -7,6 +7,22 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
/*
* Class: JClient
* Method: createEndpointGroup
* Signature: (Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_JClient_createEndpointGroup
(JNIEnv *, jobject, jstring);
/*
* Class: JClient
* Method: closeEndpointGroup
* Signature: (J)V
*/
JNIEXPORT void JNICALL Java_JClient_closeEndpointGroup
(JNIEnv *, jobject, jlong);
/* /*
* Class: JClient * Class: JClient
* Method: put * Method: put
...@@ -31,38 +47,6 @@ JNIEXPORT jbyteArray JNICALL Java_JClient_get ...@@ -31,38 +47,6 @@ JNIEXPORT jbyteArray JNICALL Java_JClient_get
JNIEXPORT jint JNICALL Java_JClient_delete JNIEXPORT jint JNICALL Java_JClient_delete
(JNIEnv *, jobject, jlong, jbyteArray); (JNIEnv *, jobject, jlong, jbyteArray);
/*
* Class: JClient
* Method: createEndpointGroup
* Signature: (IIIIIII)J
*/
JNIEXPORT jlong JNICALL Java_JClient_createEndpointGroup
(JNIEnv *, jobject, jint, jint, jint, jint, jint, jint, jint);
/*
* Class: JClient
* Method: createClient
* Signature: (J)J
*/
JNIEXPORT jlong JNICALL Java_JClient_createClient
(JNIEnv *, jobject, jlong);
/*
* Class: JClient
* Method: closeClient
* Signature: (J)V
*/
JNIEXPORT void JNICALL Java_JClient_closeClient
(JNIEnv *, jobject, jlong);
/*
* Class: JClient
* Method: closeEndpointGroup
* Signature: (J)V
*/
JNIEXPORT void JNICALL Java_JClient_closeEndpointGroup
(JNIEnv *, jobject, jlong);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -7,7 +7,7 @@ enum MessageType ...@@ -7,7 +7,7 @@ enum MessageType
DELETE = (1u << 2), DELETE = (1u << 2),
INVALIDATE = (1u << 3), INVALIDATE = (1u << 3),
SUCCESS = (1u << 4), SUCCESS = (1u << 4),
FAILURE = (1u <<5) FAILURE = (1u << 5)
}; };
struct __attribute__ ((__packed__)) MessageHeader struct __attribute__ ((__packed__)) MessageHeader
......
#ifndef __Properties__
#define __Properties__
#include <string>
#include <iostream>
#include <fstream>
#include <map>
class Properties
{
private:
std::map<std::string, std::string> _props;
const std::string _WHITESPACE = " \n\r\t\f\v";
std::string ltrim(const std::string &s);
std::string rtrim(const std::string &s);
std::string trim(const std::string &);
public:
Properties(std::string filename);
std::string getValue(std::string key);
};
#endif
\ No newline at end of file
#ifndef __RDMACLIENTENDPOINTGROUP__
#define __RDMACLIENTENDPOINTGROUP__
#include <vector>
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
#include <errno.h> #include <errno.h>
#include <unordered_map> #include <unordered_map>
#include <chrono>
#ifndef __RDMACLIENTENDPOINTGROUP__ #include <thread>
#define __RDMACLIENTENDPOINTGROUP__ #include <mutex>
#include "RdmaEndpointGroup.hpp" #include "RdmaEndpointGroup.hpp"
#include "RdmaCqProcessor.hpp" #include "RdmaCqProcessor.hpp"
#include "RdmaCmProcessor.hpp" #include "RdmaCmProcessor.hpp"
#include "RdmaClientEndpoint.hpp" #include "RdmaClientEndpoint.hpp"
#include "Properties.hpp"
class RdmaClientEndpointGroup : public RdmaEndpointGroup class RdmaClientEndpointGroup : public RdmaEndpointGroup
{ {
RdmaCmProcessor *_cmProcessor = NULL; RdmaCmProcessor *_cmProcessor = NULL;
RdmaCqProcessor *_cqProcessor = NULL; RdmaCqProcessor *_cqProcessor = NULL;
//struct rdma_cm_id *_cm_id; // struct rdma_cm_id *_cm_id;
int _sendQueueSize; int _sendQueueSize;
int _recvQueueSize; int _recvQueueSize;
...@@ -25,14 +28,25 @@ class RdmaClientEndpointGroup : public RdmaEndpointGroup ...@@ -25,14 +28,25 @@ class RdmaClientEndpointGroup : public RdmaEndpointGroup
int _recvMsgSize; int _recvMsgSize;
int _maxInLine; int _maxInLine;
int _timeoutMs; int _timeoutMs;
std::vector<RdmaClientEndpoint*> _clientEps;
int _clients;
int _counter;
std::mutex _counterMutex;
public: public:
RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
int recvMsgSize,int maxInLine, int timeout); int recvMsgSize, int maxInLine, int timeout);
void processCmEvent(struct rdma_cm_event *event); void processCmEvent(struct rdma_cm_event *event);
struct ibv_cq *createCq(struct rdma_cm_id *id); struct ibv_cq *createCq(struct rdma_cm_id *id);
RdmaClientEndpoint *createEndpoint(); RdmaClientEndpoint *createEndpoint();
void createClientEps(Properties *);
/* functions return RDMAFuture whose get will wait until data is put
* to avoid busy waiting cv is used*/
RdmaFuture* put(const char *key, int keySize, const char *value, int valueSize);
RdmaFuture* get(const char *key, int keySize);
RdmaFuture* deleteKey(const char *key, int keySize);
}; };
#endif #endif
\ No newline at end of file
...@@ -22,9 +22,10 @@ public: ...@@ -22,9 +22,10 @@ public:
struct ibv_cq *getCq(); struct ibv_cq *getCq();
void start(); void start();
void processCQEvents(); void processCQEvents();
void dispatchCqEvents(ibv_wc *wc_array, int size); inline void dispatchCqEvents(ibv_wc *wc_array, int size);
void close(); void close();
void registerEp(uint64_t qpum, RdmaClientEndpoint* ep); void registerEp(uint64_t qpum, RdmaClientEndpoint* ep);
void deRegisterEp(uint64_t qpum);
}; };
#endif #endif
......
...@@ -11,12 +11,17 @@ class RdmaFuture ...@@ -11,12 +11,17 @@ class RdmaFuture
{ {
static int DONE; static int DONE;
static int PENDING; static int PENDING;
uint8_t _status;
/* Used to identify future for a particular request.
* It is same as request id for packet generated
*/
uint64_t _requestId; uint64_t _requestId;
char *_data;
std::mutex stateMutex; std::mutex stateMutex;
std::condition_variable stateCv; std::condition_variable stateCv;
uint8_t state{0}; uint8_t state{0};
char *_data;
uint8_t _status;
public: public:
RdmaFuture(uint64_t requestId); RdmaFuture(uint64_t requestId);
......
import javax.security.auth.login.CredentialException;
public class JClient { public class JClient {
public native int put(long client, byte[] key, byte[] value);
public long endpointGroup;
public native byte[] get(long client, byte[] key); public native long createEndpointGroup(String path);
public native int delete(long client, byte[] key); public native void closeEndpointGroup(long endpointGroup);
public native long createEndpointGroup(int sendQSize, int recvQSize, int compQSize, int sendMsqSize, int recvMsgSize, public native int put(long group, byte[] key, byte[] value);
int maxInLine, int timeout);
public native long createClient(long endpointGroup); public native byte[] get(long group, byte[] key);
public native void closeClient(long client); public native int delete(long group, byte[] key);
public native void closeEndpointGroup(long endpointGroup);
public static long endpointGroup;
public long client;
static { static {
System.loadLibrary("hpdosclient"); System.loadLibrary("hpdosclient");
...@@ -27,14 +21,14 @@ public class JClient { ...@@ -27,14 +21,14 @@ public class JClient {
System.out.println(System.getProperty("java.library.path")); System.out.println(System.getProperty("java.library.path"));
try { try {
//Thread.sleep(1000*40);
JClient jclient = new JClient(); JClient jclient = new JClient();
endpointGroup = jclient.createEndpointGroup(5, 5, 5, 500, 500, 0, 1000); jclient.endpointGroup = jclient.createEndpointGroup("prop.config");
jclient.client = jclient.createClient(endpointGroup);
System.out.println("jc" + jclient.put(jclient.endpointGroup, "paras".getBytes(), "garg".getBytes()));
System.out.println("jc" + jclient.put(jclient.client, "paras".getBytes(), "garg".getBytes())); var res = jclient.get(jclient.endpointGroup, "paras".getBytes());
var res = jclient.get(jclient.client, "paras".getBytes());
System.out.println("val size"+res.length+" "+new String(res)); System.out.println("val size"+res.length+" "+new String(res));
System.out.println("jc" + jclient.delete(jclient.client, "paras".getBytes())); System.out.println("jc" + jclient.delete(jclient.endpointGroup, "paras".getBytes()));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
......
#include <iostream> #include <iostream>
#include "RdmaClientEndpointGroup.hpp" #include "RdmaClientEndpointGroup.hpp"
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
#include "Properties.cpp" #include "Properties.hpp"
int main() int main()
{ {
Properties prop("prop.config"); Properties* prop = new Properties("prop.config");
int sendQS = stoi(prop.getValue("sendQS")); int sendQS = stoi(prop->getValue("sendQS"));
int recvQS = stoi(prop.getValue("recvQS")); int recvQS = stoi(prop->getValue("recvQS"));
int compQS = stoi(prop.getValue("compQS")); int compQS = stoi(prop->getValue("compQS"));
int sendMS = stoi(prop.getValue("sendMS")); int sendMS = stoi(prop->getValue("sendMS"));
int recvMS = stoi(prop.getValue("recvMS")); int recvMS = stoi(prop->getValue("recvMS"));
RdmaClientEndpointGroup *group = new RdmaClientEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, 0, 1000); RdmaClientEndpointGroup *group = new RdmaClientEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, 0, 1000);
RdmaClientEndpoint *clientEp = group->createEndpoint(); //RdmaClientEndpoint *clientEp = group->createEndpoint();
group->createClientEps(prop);
delete prop;
/*
clientEp->connect("192.168.200.20", "1921", "sal"); clientEp->connect("192.168.200.20", "1921", "sal");
while (!clientEp->isConnected()) while (!clientEp->isConnected())
; ;
std::cout << "client : connected" << std::endl; */
std::cout << "clients : connected" << std::endl;
auto r1 = clientEp->put("paras", 5, "garg", 4); auto r1 = group->put("paras", 5, "garg", 4);
auto r2 = clientEp->get("paras", 5); auto r2 = group->get("paras", 5);
auto r3 = clientEp->deleteKey("paras", 5); auto r3 = group->deleteKey("paras", 5);
std::cout << "waiting for output" << std::endl; std::cout << "waiting for output" << std::endl;
if (r3 != nullptr) if (r3 != nullptr)
...@@ -47,7 +51,6 @@ int main() ...@@ -47,7 +51,6 @@ int main()
std::cout << " size " << response->valueSize << std::endl; std::cout << " size " << response->valueSize << std::endl;
delete r2; delete r2;
} }
/*char *message = new char[100]; /*char *message = new char[100];
struct SalRequestHeader *request = (struct SalRequestHeader *)message; struct SalRequestHeader *request = (struct SalRequestHeader *)message;
request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed); request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed);
......
...@@ -3,53 +3,42 @@ ...@@ -3,53 +3,42 @@
#include "Logger.hpp" #include "Logger.hpp"
#include "RdmaClientEndpoint.hpp" #include "RdmaClientEndpoint.hpp"
#include "RdmaClientEndpointGroup.hpp" #include "RdmaClientEndpointGroup.hpp"
#include <unistd.h>
/* JNIEXPORT jlong JNICALL Java_JClient_createEndpointGroup(JNIEnv *jenv, jobject jobj, jstring jpath)
bool rocksdb_put_helper(JNIEnv* env, ROCKSDB_NAMESPACE::DB* db, {
const ROCKSDB_NAMESPACE::WriteOptions& write_options, // exec_gdb();
ROCKSDB_NAMESPACE::ColumnFamilyHandle* cf_handle, jboolean iscopy;
jbyteArray jkey, jint jkey_off, jint jkey_len, const char *path = jenv->GetStringUTFChars(jpath, &iscopy);
jbyteArray jval, jint jval_off, jint jval_len) { // std::cout<<path<<"\n";
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
if (env->ExceptionCheck()) {
// exception thrown: ArrayIndexOutOfBoundsException
delete[] key;
return false;
}
jbyte* value = new jbyte[jval_len]; Properties *prop = new Properties(path);
env->GetByteArrayRegion(jval, jval_off, jval_len, value); int sendQS = stoi(prop->getValue("sendQS"));
if (env->ExceptionCheck()) { int recvQS = stoi(prop->getValue("recvQS"));
// exception thrown: ArrayIndexOutOfBoundsException int compQS = stoi(prop->getValue("compQS"));
delete[] value; int sendMS = stoi(prop->getValue("sendMS"));
delete[] key; int recvMS = stoi(prop->getValue("recvMS"));
return false;
} std::cout << "Creating EndpointGroup SendQS " << sendQS << " recvQS " << recvQS << " compQS ";
std::cout << compQS << " sendMS " << sendMS << " recvMS " << recvMS << "\n";
RdmaClientEndpointGroup *group = new RdmaClientEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, 0, 1000);
std::cout << "Connecting Endpoints\n";
group->createClientEps(prop);
delete prop;
*/
JNIEXPORT jlong JNICALL Java_JClient_createEndpointGroup(JNIEnv *jenv, jobject jobj,
jint sendq, jint recvq, jint compq,
jint sendm, jint recvm,
jint maxinline, jint timeout)
{
RdmaClientEndpointGroup *group = new RdmaClientEndpointGroup(sendq, recvq, compq, sendm,
recvm, maxinline, timeout);
return reinterpret_cast<jlong>(group); return reinterpret_cast<jlong>(group);
} }
JNIEXPORT jlong JNICALL Java_JClient_createClient(JNIEnv *jenv, jobject jobj, jlong group) JNIEXPORT void JNICALL Java_JClient_closeEndpointGroup(JNIEnv *jenv, jobject jobj, jlong jclient)
{ {
RdmaClientEndpoint *client = (reinterpret_cast<RdmaClientEndpointGroup *>(group)) RdmaClientEndpointGroup *group = reinterpret_cast<RdmaClientEndpointGroup *>(jclient);
->createEndpoint(); // group->close();
client->connect("192.168.200.20", "1921", "sal");
while (!client->isConnected())
;
return reinterpret_cast<jlong>(client);
} }
JNIEXPORT jbyteArray JNICALL Java_JClient_get(JNIEnv *jenv, jobject jobj, jlong jclient, JNIEXPORT jbyteArray JNICALL Java_JClient_get(JNIEnv *jenv, jobject jobj, jlong jclient,
jbyteArray jkey) jbyteArray jkey)
{ {
/* Get Key form Java Side*/
int keyLen = jenv->GetArrayLength(jkey); int keyLen = jenv->GetArrayLength(jkey);
char *key = new char[keyLen]; char *key = new char[keyLen];
jenv->GetByteArrayRegion(jkey, 0, keyLen, reinterpret_cast<jbyte *>(key)); jenv->GetByteArrayRegion(jkey, 0, keyLen, reinterpret_cast<jbyte *>(key));
...@@ -58,32 +47,43 @@ JNIEXPORT jbyteArray JNICALL Java_JClient_get(JNIEnv *jenv, jobject jobj, jlong ...@@ -58,32 +47,43 @@ JNIEXPORT jbyteArray JNICALL Java_JClient_get(JNIEnv *jenv, jobject jobj, jlong
CPPLog::LOG_ERROR("exception occurs in jni get"); CPPLog::LOG_ERROR("exception occurs in jni get");
delete[] key; delete[] key;
} }
RdmaClientEndpoint *client = reinterpret_cast<RdmaClientEndpoint *>(jclient);
auto future = client->get(key, keyLen); /* Get data from server*/
RdmaClientEndpointGroup *group = reinterpret_cast<RdmaClientEndpointGroup *>(jclient);
auto future = group->get(key, keyLen);
delete[] key; delete[] key;
/* Handle failed get request*/
if (future == nullptr) if (future == nullptr)
{ {
jbyteArray jvalue = jenv->NewByteArray(0); jbyteArray jvalue = jenv->NewByteArray(0);
return jvalue; return jvalue;
} }
auto data = future->get(); /* Create Java Byte Array and send data to java side
* after copying data from c char array to java byte array
*/
std::cout << "get future.get()\n";
char *data = future->get();
delete future; delete future;
std::cout << "g future.get()\n";
struct MessageHeader *response = (struct MessageHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
if (response->type == MessageType::FAILURE) if (response->type == MessageType::FAILURE)
{ {
jbyteArray jvalue = jenv->NewByteArray(0); jbyteArray jvalue = jenv->NewByteArray(0);
delete[] data;
return jvalue; return jvalue;
} }
jbyteArray jvalue = jenv->NewByteArray(response->valueSize); jbyteArray jvalue = jenv->NewByteArray(response->valueSize);
std::cout << "value size " << response->valueSize << " " << (data + MessageHeaderSize); // std::cout << "value size " << response->valueSize << " " << (data + MessageHeaderSize);
jenv->SetByteArrayRegion(jvalue, 0, response->valueSize - 1, jenv->SetByteArrayRegion(jvalue, 0, response->valueSize - 1,
reinterpret_cast<const jbyte *>(data + MessageHeaderSize)); reinterpret_cast<const jbyte *>(data + MessageHeaderSize));
delete[] data;
return jvalue; return jvalue;
} }
JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclient, JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclient,
jbyteArray jkey, jbyteArray jval) jbyteArray jkey, jbyteArray jval)
{ {
/*getting key from java side */
int keyLen = jenv->GetArrayLength(jkey); int keyLen = jenv->GetArrayLength(jkey);
char *key = new char[keyLen]; char *key = new char[keyLen];
jenv->GetByteArrayRegion(jkey, 0, keyLen, reinterpret_cast<jbyte *>(key)); jenv->GetByteArrayRegion(jkey, 0, keyLen, reinterpret_cast<jbyte *>(key));
...@@ -91,39 +91,50 @@ JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclien ...@@ -91,39 +91,50 @@ JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclien
{ {
CPPLog::LOG_ERROR("exception occurs in jni put"); CPPLog::LOG_ERROR("exception occurs in jni put");
delete[] key; delete[] key;
return -1;
} }
/*getting value from java side */
int valLen = jenv->GetArrayLength(jval); int valLen = jenv->GetArrayLength(jval);
char *val = new char[valLen]; char *val = new char[valLen];
jenv->GetByteArrayRegion(jval, 0, valLen, reinterpret_cast<jbyte *>(val)); jenv->GetByteArrayRegion(jval, 0, valLen, reinterpret_cast<jbyte *>(val));
std::cout << "put got " << val << " len " << valLen; // std::cout << "put got " << "key "<<key<<" val " <<val << " len " << valLen;
if (jenv->ExceptionCheck()) if (jenv->ExceptionCheck())
{ {
CPPLog::LOG_ERROR("exception occurs in jni put"); CPPLog::LOG_ERROR("exception occurs in jni put");
delete[] key; delete[] key;
delete[] val; delete[] val;
return -1;
} }
RdmaClientEndpoint *client = reinterpret_cast<RdmaClientEndpoint *>(jclient);
auto future = client->put(key, keyLen, val, valLen); /* Putting data to server*/
RdmaClientEndpointGroup *group = reinterpret_cast<RdmaClientEndpointGroup *>(jclient);
auto future = group->put(key, keyLen, val, valLen);
delete[] key; delete[] key;
delete[] val; delete[] val;
if (future == nullptr) if (future == nullptr)
{ {
return 0; return -1;
} }
std::cout << "put future.get()\n";
auto data = future->get(); auto data = future->get();
delete future; delete future;
std::cout << "p future.get()\n";
struct MessageHeader *response = (struct MessageHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
if (response->type == MessageType::FAILURE) std::cout<<"server response "<<response->type<<"\n";
if (response->type == MessageType::SUCCESS)
{ {
// Currently 0 indicate succces and remaing are failure more status code can be added in future
return 0; return 0;
} }
return 1; return -1;
} }
JNIEXPORT jint JNICALL Java_JClient_delete(JNIEnv *jenv, jobject jobj, jlong jclient, JNIEXPORT jint JNICALL Java_JClient_delete(JNIEnv *jenv, jobject jobj, jlong jclient,
jbyteArray jkey) jbyteArray jkey)
{ {
/*getting key from java side */
int keyLen = jenv->GetArrayLength(jkey); int keyLen = jenv->GetArrayLength(jkey);
char *key = new char[keyLen]; char *key = new char[keyLen];
jenv->GetByteArrayRegion(jkey, 0, keyLen, reinterpret_cast<jbyte *>(key)); jenv->GetByteArrayRegion(jkey, 0, keyLen, reinterpret_cast<jbyte *>(key));
...@@ -132,26 +143,86 @@ JNIEXPORT jint JNICALL Java_JClient_delete(JNIEnv *jenv, jobject jobj, jlong jcl ...@@ -132,26 +143,86 @@ JNIEXPORT jint JNICALL Java_JClient_delete(JNIEnv *jenv, jobject jobj, jlong jcl
CPPLog::LOG_ERROR("exception occurs in jni delete"); CPPLog::LOG_ERROR("exception occurs in jni delete");
delete[] key; delete[] key;
} }
RdmaClientEndpoint *client = reinterpret_cast<RdmaClientEndpoint *>(jclient);
auto future = client->deleteKey(key, keyLen); /* deleting data from server*/
RdmaClientEndpointGroup *group = reinterpret_cast<RdmaClientEndpointGroup *>(jclient);
auto future = group->deleteKey(key, keyLen);
delete[] key; delete[] key;
if (future == nullptr) if (future == nullptr)
{ {
return 0; return -1;
} }
std::cout << "delete future.get()\n";
auto data = future->get(); auto data = future->get();
delete future; delete future;
std::cout << "d future.get()\n";
struct MessageHeader *response = (struct MessageHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
if (response->type == MessageType::FAILURE) if (response->type == MessageType::FAILURE)
{ {
return 0; return -1;
} }
return 1; return 0;
} }
JNIEXPORT void JNICALL Java_JClient_closeClient(JNIEnv *, jobject, jlong)
int gdb_process_pid = 0;
void exec_gdb()
{ {
// Create child process for running GDB debugger
int pid = fork();
if (pid < 0) /* error */
{
abort();
}
else if (pid) /* parent */
{
// Application process
gdb_process_pid = pid; // save debugger pid
sleep(10); /* Give GDB time to attach */
// Continue the application execution controlled by GDB
}
else /* child */
{
// GDB process. We run DDD GUI wrapper around GDB debugger
std::stringstream args;
// Pass parent process id to the debugger
args << "--pid=" << getppid();
// Invoke DDD debugger
execl("ddd", "ddd", "--debugger", "gdb", args.str().c_str(), (char *)0);
// Get here only in case of DDD invocation failure
std::cerr << "\nFailed to exec GDB (DDD)\n"
<< std::endl;
}
} }
JNIEXPORT void JNICALL Java_JClient_closeEndpointGroup(JNIEnv *, jobject, jlong)
{ /*
} bool rocksdb_put_helper(JNIEnv* env, ROCKSDB_NAMESPACE::DB* db,
\ No newline at end of file const ROCKSDB_NAMESPACE::WriteOptions& write_options,
ROCKSDB_NAMESPACE::ColumnFamilyHandle* cf_handle,
jbyteArray jkey, jint jkey_off, jint jkey_len,
jbyteArray jval, jint jval_off, jint jval_len) {
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
if (env->ExceptionCheck()) {
// exception thrown: ArrayIndexOutOfBoundsException
delete[] key;
return false;
}
jbyte* value = new jbyte[jval_len];
env->GetByteArrayRegion(jval, jval_off, jval_len, value);
if (env->ExceptionCheck()) {
// exception thrown: ArrayIndexOutOfBoundsException
delete[] value;
delete[] key;
return false;
}
*/
#include <string> #include "Properties.hpp"
#include<iostream>
#include<fstream>
#include<map>
std::string Properties::ltrim(const std::string &s)
{
size_t start = s.find_first_not_of(_WHITESPACE);
return (start == std::string::npos) ? "" : s.substr(start);
}
class Properties{ std::string Properties::rtrim(const std::string &s)
private: {
std::map<std::string,std::string> _props; size_t end = s.find_last_not_of(_WHITESPACE);
const std::string _WHITESPACE = " \n\r\t\f\v"; return (end == std::string::npos) ? "" : s.substr(0, end + 1);
std::string ltrim(const std::string& s) }
std::string Properties::trim(const std::string &s)
{
return rtrim(ltrim(s));
}
Properties::Properties(std::string filename)
{
// std::cout<<"Reading Properties From file named prop.config ...........\n";
std::ifstream file(filename);
if (!file.is_open())
{ {
size_t start = s.find_first_not_of(_WHITESPACE); std::cout << "Confiq file opening failed\n";
return (start == std::string::npos) ? "" : s.substr(start); exit(0);
} }
std::string line;
std::string rtrim(const std::string& s) std::string key, value;
int delimPos;
while (getline(file, line))
{ {
size_t end = s.find_last_not_of(_WHITESPACE); delimPos = line.find('#');
return (end == std::string::npos) ? "" : s.substr(0, end + 1); line = trim(line);
if (!line.empty())
{
line = line.substr(0, delimPos);
delimPos = line.find('=');
_props.insert(make_pair(trim(line.substr(0, delimPos)), trim(line.substr(delimPos + 1))));
}
} }
}
std::string trim(const std::string& s) std::string Properties::getValue(std::string key)
{
auto it = _props.find(key);
if (it == _props.end())
{ {
return rtrim(ltrim(s)); return "";
} }
return it->second;
}
public: \ No newline at end of file
Properties(std::string filename){
//std::cout<<"Reading Properties From file named prop.config ...........\n";
std::ifstream file (filename);
if(!file.is_open()){
std::cout<<"Confiq file opening failed\n";
exit(0);
}
std::string line;
std::string key,value;
int delimPos;
while(getline(file,line)){
delimPos=line.find('#');
line=trim(line);
if(!line.empty()){
line=line.substr(0,delimPos);
delimPos=line.find('=');
_props.insert(make_pair(trim(line.substr(0,delimPos)),trim(line.substr(delimPos+1))));
}
}
}
std::string getValue(std::string key){
auto it=_props.find(key);
if(it==_props.end()){
return "";
}
return it->second;
}
};
\ No newline at end of file
...@@ -14,7 +14,6 @@ RdmaClientEndpoint::RdmaClientEndpoint(struct rdma_cm_id *id, RdmaEndpointGroup ...@@ -14,7 +14,6 @@ RdmaClientEndpoint::RdmaClientEndpoint(struct rdma_cm_id *id, RdmaEndpointGroup
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine), _timeoutMs(timeout) _sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine), _timeoutMs(timeout)
{ {
_state = CONN_STATE_INITIALIZED; _state = CONN_STATE_INITIALIZED;
//_sendBuffers = new boost::lockfree::queue<char *>(_sendQueueSize);
} }
void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *connData) void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *connData)
...@@ -45,11 +44,6 @@ void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *c ...@@ -45,11 +44,6 @@ void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *c
_state = CONN_STATE_ADDR_RESOLVED; _state = CONN_STATE_ADDR_RESOLVED;
} }
bool RdmaClientEndpoint::isConnected()
{
return _state == CONN_STATE_CONNECTED;
}
void RdmaClientEndpoint::connect() void RdmaClientEndpoint::connect()
{ {
if (_connData != NULL) if (_connData != NULL)
...@@ -70,6 +64,13 @@ void RdmaClientEndpoint::connect() ...@@ -70,6 +64,13 @@ void RdmaClientEndpoint::connect()
} }
} }
bool RdmaClientEndpoint::isConnected()
{
return _state == CONN_STATE_CONNECTED;
}
void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event) void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
{ {
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL) if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL)
...@@ -81,7 +82,7 @@ void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event) ...@@ -81,7 +82,7 @@ void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
else if (event->event == RDMA_CM_EVENT_ROUTE_RESOLVED && event->id != NULL) else if (event->event == RDMA_CM_EVENT_ROUTE_RESOLVED && event->id != NULL)
{ {
registerMemory(); registerMemory();
CPPLog::LOG_DEBUG("RdmaClientEndpoint : step5 connect"); CPPLog::LOG_DEBUG("RdmaClientEndpoint : step5 connect Request");
connect(); connect();
} }
else if (event->id != NULL && event->event == RDMA_CM_EVENT_ESTABLISHED) else if (event->id != NULL && event->event == RDMA_CM_EVENT_ESTABLISHED)
...@@ -97,8 +98,8 @@ void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event) ...@@ -97,8 +98,8 @@ void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
else else
{ {
std::ostringstream ss; std::ostringstream ss;
ss << "RdmaClientEndpoint : Not able to procces CM EVent "; ss << "RdmaClientEndpoint : Not able to procces CM Event ";
ss << rdma_event_str(event->event) <<" "<< event->id << " " << event->listen_id; ss << rdma_event_str(event->event) << " " << event->id << " " << event->listen_id;
CPPLog::LOG_ERROR(ss); CPPLog::LOG_ERROR(ss);
} }
} }
...@@ -123,16 +124,17 @@ void RdmaClientEndpoint::completeClose() ...@@ -123,16 +124,17 @@ void RdmaClientEndpoint::completeClose()
if (_state != CONN_STATE_PARTIAL_CLOSED) if (_state != CONN_STATE_PARTIAL_CLOSED)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : completeClose invalid state"); CPPLog::LOG_ERROR("RdmaClientEndpoint : completeClose invalid state");
return; //return;
} }
_state = CONN_STATE_CLOSED; _state = CONN_STATE_CLOSED;
// delete _sendBuffers; // delete _sendBuffers;
free(_sendBuff);
free(_recvBuff);
rdma_dereg_mr(_sendMr); rdma_dereg_mr(_sendMr);
rdma_dereg_mr(_recvMr); rdma_dereg_mr(_recvMr);
delete[] _sendBuff;
delete[] _recvBuff;
rdma_destroy_qp(_cm_id); rdma_destroy_qp(_cm_id);
rdma_destroy_id(_cm_id); rdma_destroy_id(_cm_id);
CPPLog::LOG_INFO("RdmaClientEndpoint : close connection");
} }
void RdmaClientEndpoint::registerMemory() void RdmaClientEndpoint::registerMemory()
...@@ -142,10 +144,10 @@ void RdmaClientEndpoint::registerMemory() ...@@ -142,10 +144,10 @@ void RdmaClientEndpoint::registerMemory()
CPPLog::LOG_ERROR("RdmaClientEndpoint : createResource address not resolved"); CPPLog::LOG_ERROR("RdmaClientEndpoint : createResource address not resolved");
return; return;
} }
_sendBuff = (char *)malloc(_sendMsgSize * _sendQueueSize); _sendBuff = new char[(_sendMsgSize+MessageHeaderSize) * _sendQueueSize];
if (_sendBuff == NULL) if (_sendBuff == NULL)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : sendBuff malloc failed"); CPPLog::LOG_ERROR("RdmaClientEndpoint : sendBuff allocation failed");
return; return;
} }
...@@ -157,10 +159,10 @@ void RdmaClientEndpoint::registerMemory() ...@@ -157,10 +159,10 @@ void RdmaClientEndpoint::registerMemory()
return; return;
} }
_recvBuff = (char *)malloc(_recvMsgSize * _recvQueueSize); _recvBuff = new char[(MessageHeaderSize + _recvMsgSize) * _recvQueueSize];
if (_recvBuff == NULL) if (_recvBuff == NULL)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : recvBuff malloc failed\n"); CPPLog::LOG_ERROR("RdmaClientEndpoint : recvBuff allocation failed\n");
return; return;
} }
...@@ -182,7 +184,9 @@ void RdmaClientEndpoint::registerMemory() ...@@ -182,7 +184,9 @@ void RdmaClientEndpoint::registerMemory()
for (int i = 0; i < _sendQueueSize; i++) for (int i = 0; i < _sendQueueSize; i++)
{ {
char *buffer = _sendBuff + i * _recvMsgSize; char *buffer = _sendBuff + i * _recvMsgSize;
std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push(buffer); _sendBuffers.push(buffer);
lock.unlock();
} }
_state = CONN_STATE_RESOURCES_ALLOCATED; _state = CONN_STATE_RESOURCES_ALLOCATED;
...@@ -239,7 +243,14 @@ void RdmaClientEndpoint::createResources() ...@@ -239,7 +243,14 @@ void RdmaClientEndpoint::createResources()
int RdmaClientEndpoint::sendMessage(const char *buffer, int size) int RdmaClientEndpoint::sendMessage(const char *buffer, int size)
{ {
if (size > _sendMsgSize) if (size > _sendMsgSize)
{
std::ostringstream ss;
ss<<"Large Message size "<<size;
ss<<" send buffer size " << _sendMsgSize;
CPPLog::LOG_ERROR(ss);
return -1; return -1;
}
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
return -1; return -1;
...@@ -281,7 +292,7 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc) ...@@ -281,7 +292,7 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc)
} }
RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *value, int valueSize) RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *value, int valueSize)
{ {
if (keySize + valueSize + (int)MessageHeaderSize > _sendMsgSize) if (keySize + valueSize + (int)MessageHeaderSize > _sendMsgSize)
return nullptr; return nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
...@@ -301,6 +312,7 @@ RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *va ...@@ -301,6 +312,7 @@ RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *va
_sendMr, 0); _sendMr, 0);
RdmaFuture *future = new RdmaFuture(request->id); RdmaFuture *future = new RdmaFuture(request->id);
futures[request->id] = future; futures[request->id] = future;
std::cout<<"created future\n";
return future; return future;
} }
RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize) RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize)
......
#include "RdmaClientEndpointGroup.hpp" #include "RdmaClientEndpointGroup.hpp"
RdmaClientEndpointGroup::RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, RdmaClientEndpointGroup::RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
int recvMsgSize,int maxInLine, int timeout) int recvMsgSize, int maxInLine, int timeout)
: _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize), : _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine),_timeoutMs(timeout) _sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine), _timeoutMs(timeout)
{ {
_cmProcessor = new RdmaCmProcessor(this); _cmProcessor = new RdmaCmProcessor(this);
_cmProcessor->start(); _cmProcessor->start();
_counter = 0;
} }
void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event) void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event)
...@@ -15,9 +16,13 @@ void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -15,9 +16,13 @@ void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event)
if (event->id != NULL && event->id->context != NULL) if (event->id != NULL && event->id->context != NULL)
{ {
((RdmaClientEndpoint *)event->id->context)->processCmEvent(event); ((RdmaClientEndpoint *)event->id->context)->processCmEvent(event);
if(event->event == RDMA_CM_EVENT_ADDR_RESOLVED) if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED)
{ {
_cqProcessor->registerEp(event->id->qp->qp_num,((RdmaClientEndpoint *)event->id->context)); _cqProcessor->registerEp(event->id->qp->qp_num, ((RdmaClientEndpoint *)event->id->context));
}
if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
_cqProcessor->deRegisterEp(event->id->qp->qp_num);
} }
} }
else else
...@@ -33,7 +38,7 @@ RdmaClientEndpoint *RdmaClientEndpointGroup::createEndpoint() ...@@ -33,7 +38,7 @@ RdmaClientEndpoint *RdmaClientEndpointGroup::createEndpoint()
struct rdma_cm_id *id = _cmProcessor->createId(); struct rdma_cm_id *id = _cmProcessor->createId();
RdmaClientEndpoint *endpoint = new RdmaClientEndpoint(id, this, RdmaClientEndpoint *endpoint = new RdmaClientEndpoint(id, this,
_sendQueueSize, _recvQueueSize, _sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize,_maxInLine, _timeoutMs); _sendMsgSize, _recvMsgSize, _maxInLine, _timeoutMs);
id->context = (void *)endpoint; id->context = (void *)endpoint;
return endpoint; return endpoint;
} }
...@@ -47,4 +52,66 @@ struct ibv_cq *RdmaClientEndpointGroup::createCq(struct rdma_cm_id *id) ...@@ -47,4 +52,66 @@ struct ibv_cq *RdmaClientEndpointGroup::createCq(struct rdma_cm_id *id)
_cqProcessor->start(); _cqProcessor->start();
} }
return _cqProcessor->getCq(); return _cqProcessor->getCq();
}
void RdmaClientEndpointGroup::createClientEps(Properties *prop)
{
_clients = stoi(prop->getValue("NSERVERS"));
std::cout << "clients" << _clients << "\n";
for (int i = 0; i < _clients; i++)
{
std::cout << "creating client for " << prop->getValue("SERVER_IP") << (i + 1);
std::cout << ":" << prop->getValue("SERVER_PORT") << (i + 1) << " \n";
RdmaClientEndpoint *ep = createEndpoint();
ep->connect((prop->getValue("SERVER_IP" + std::to_string(i + 1))).c_str(),
(prop->getValue("SERVER_PORT" + std::to_string(i + 1))).c_str(), "sal");
_clientEps.push_back(ep);
std::cout << "ep" << ep << std::endl;
}
std::cout << "vec size" << _clientEps.size() << "\n";
for (int i = 0; i < _clients; i++)
{
std::cout << "ep" << _clientEps[i] << std::endl;
int timeout = 0;
do
{
if (_clientEps[i]->isConnected())
{
break;
}
std::cout << "timeout " << timeout << "\n"
<< _clients;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
timeout += 10;
} while (timeout < 1000);
if (!_clientEps[i]->isConnected())
{
std::cout << "Client Endpoint not connected " << prop->getValue("SERVER_IP" + std::to_string(i + 1)) << "\n";
exit(1);
}
}
std::cout << "connected\n";
}
RdmaFuture *RdmaClientEndpointGroup::put(const char *key, int keySize, const char *value, int valueSize)
{
// std::cout<<"Client 1\n";
int id = _counter;
_counter = (_counter + 1) % _clients;
// std::cout<<"Client 2"<<_counter<<" "<<_clientEps.size()<<"\n";
auto future = _clientEps[id]->put(key, keySize, value, valueSize);
// std::cout<<"endpoint get\n";
return future;
}
RdmaFuture *RdmaClientEndpointGroup::get(const char *key, int keySize)
{
int id = _counter;
_counter = (_counter + 1) % _clients;
return _clientEps[id]->get(key, keySize);
}
RdmaFuture *RdmaClientEndpointGroup::deleteKey(const char *key, int keySize)
{
int id = _counter;
_counter = (_counter + 1) % _clients;
return _clientEps[id]->deleteKey(key, keySize);
} }
\ No newline at end of file
...@@ -31,6 +31,10 @@ struct ibv_cq *RdmaCqProcessor::getCq() ...@@ -31,6 +31,10 @@ struct ibv_cq *RdmaCqProcessor::getCq()
void RdmaCqProcessor::registerEp(uint64_t qp,RdmaClientEndpoint* ep) void RdmaCqProcessor::registerEp(uint64_t qp,RdmaClientEndpoint* ep)
{ {
_qpEndpointMap->emplace(qp,ep); _qpEndpointMap->emplace(qp,ep);
}
void RdmaCqProcessor::deRegisterEp(uint64_t qp)
{
} }
void RdmaCqProcessor::start() void RdmaCqProcessor::start()
{ {
......
...@@ -11,6 +11,7 @@ char *RdmaFuture::get() ...@@ -11,6 +11,7 @@ char *RdmaFuture::get()
std::unique_lock<std::mutex> lock(stateMutex); std::unique_lock<std::mutex> lock(stateMutex);
while(state!= DONE) while(state!= DONE)
{ {
std::cout<<"getting data\n";
stateCv.wait(lock); stateCv.wait(lock);
} }
// [this](){return state!=DONE;}); // [this](){return state!=DONE;});
...@@ -24,16 +25,17 @@ char *RdmaFuture::wait_for(int timeout) ...@@ -24,16 +25,17 @@ char *RdmaFuture::wait_for(int timeout)
std::unique_lock<std::mutex> lock(stateMutex); std::unique_lock<std::mutex> lock(stateMutex);
if (state == DONE) if (state == DONE)
return _data; return _data;
lock.unlock(); //lock.unlock();
// add wait logic // add wait logic
return nullptr; return nullptr;
} }
void RdmaFuture::put(char *data) void RdmaFuture::put(char *data)
{ {
std::unique_lock<std::mutex> lock(stateMutex); std::unique_lock<std::mutex> lock(stateMutex);
std::cout<<"putting data\n";
_data = data; _data = data;
state = DONE; state = DONE;
lock.unlock(); //lock.unlock();
stateCv.notify_one(); stateCv.notify_one();
// std::cout << "got data current state" <<data<< (unsigned)state; // std::cout << "got data current state" <<data<< (unsigned)state;
// std::unique_lock<std::mutex> lock(stateMutex); // std::unique_lock<std::mutex> lock(stateMutex);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment