Commit 41b6326b authored by Paras Garg's avatar Paras Garg

Fixed double free while deregistering memory

parent 7e25adec
...@@ -29,6 +29,7 @@ $(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp ...@@ -29,6 +29,7 @@ $(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp
#$(BINDIR)/$(TARGET): $(OBJS) #$(BINDIR)/$(TARGET): $(OBJS)
$(TARGET) : $(OBJS) $(TARGET) : $(OBJS)
mkdir -p .build
$(CXX) -o $@ $^ $(LIBS) $(CXX) -o $@ $^ $(LIBS)
@echo "Linked "$<" successfully!" @echo "Linked "$<" successfully!"
...@@ -36,6 +37,9 @@ $(TARGET) : $(OBJS) ...@@ -36,6 +37,9 @@ $(TARGET) : $(OBJS)
clean: clean:
rm -f $(OBJ_DIR)/* rm -f $(OBJ_DIR)/*
rm -f $(TARGET) rm -f $(TARGET)
rm -f *.log
rm -f YCSB/*.log
rm -f libhpdosclient.so
.PHONY: count .PHONY: count
count: count:
...@@ -47,6 +51,7 @@ count: ...@@ -47,6 +51,7 @@ count:
Jclient: $(OBJS) Jclient: $(OBJS)
mkdir -p .build
$(CXX) -o libhpdosclient.so -L/usr/local/lib -shared $^ $(LIBS) $(CXX) -o libhpdosclient.so -L/usr/local/lib -shared $^ $(LIBS)
@echo "jclient "$<" successfully!" @echo "jclient "$<" successfully!"
sudo rm -f /usr/lib/libhpdosclient.so sudo rm -f /usr/lib/libhpdosclient.so
......
Steps to build jni client Steps to build jni client
> mkdir -f .build
> make JniHeader <br> > make JniHeader <br>
> make JClient <br> > make Jclient <br>
> java -cp jsrc JClient > java -cp jsrc JClient
Running YSCB Running YSCB
> mvn compile <br>
add .build to make
./bin/ycsb load hpdos -P workloads/workloadb -threads 1
mvn -pl site.ycsb:hpdos-binding -am clean package -Dcheckstyle.skip
https://medium.com/@pirogov.alexey/gdb-debug-native-part-of-java-application-c-c-libraries-and-jdk-6593af3b4f3f
to do
delete client endpoint on close
threading in client and hashing in client
add more code for success failure not found etc server error client error
add cache add support for invalidation
interface client API through endpointGroup
Endpointgroup to manage list of servers and caches, Invalidation
./bin/ycsb shell hpdos ./bin/ycsb shell hpdos
./bin/ycsb run hpdos -P workloads/workloada ./bin/ycsb run hpdos -P workloads/workloada -threads 1
./bin/ycsb load hpdos -P workloads/workloada ./bin/ycsb load hpdos -P workloads/workloada
Options: Options:
-P file Specify workload file -P file Specify workload file
...@@ -33,6 +17,17 @@ Options: ...@@ -33,6 +17,17 @@ Options:
-target n Target ops/sec (default: unthrottled) -target n Target ops/sec (default: unthrottled)
-threads n Number of client threads (default: 1) -threads n Number of client threads (default: 1)
mvn -pl site.ycsb:hpdos-binding -am clean package -Dcheckstyle.skip
https://medium.com/@pirogov.alexey/gdb-debug-native-part-of-java-application-c-c-libraries-and-jdk-6593af3b4f3f
to do
make requestId unique
delete client endpoint on close
threading in client and hashing in client
add more code for success failure not found etc server error client error
add cache add support for invalidation
# Steps to configure spdk target and linux initiator # Steps to configure spdk target and linux initiator
Build spdk using Build spdk using
https://spdk.io/doc/nvmf.html https://spdk.io/doc/nvmf.html
......
...@@ -50,13 +50,13 @@ public class HpdosClient extends DB { ...@@ -50,13 +50,13 @@ public class HpdosClient extends DB {
// Read a single record // Read a single record
public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) { public Status read(String table, String key, Set<String> fields, Map<String, ByteIterator> result) {
//System.out.println("Read" + key); System.out.println("Read" + key.length());
byte res[] = jclient.get(jclient.endpointGroup, key.getBytes()); byte res[] = jclient.get(jclient.endpointGroup, key.getBytes());
if (res == null) if (res == null)
return Status.NOT_FOUND; return Status.NOT_FOUND;
if(fields == null) if(fields == null)
{ {
return Status.OK; return Status.BAD_REQUEST;
} }
Iterator<String> it = fields.iterator(); Iterator<String> it = fields.iterator();
if (it.hasNext()) if (it.hasNext())
...@@ -72,7 +72,7 @@ public class HpdosClient extends DB { ...@@ -72,7 +72,7 @@ public class HpdosClient extends DB {
// Update a single record // Update a single record
public Status update(String table, String key, Map<String, ByteIterator> values) { public Status update(String table, String key, Map<String, ByteIterator> values) {
//System.out.println("update" + key); System.out.println("update" + key.getBytes().length );
try { try {
ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteArrayOutputStream stream = new ByteArrayOutputStream();
for (ByteIterator v : values.values()) { for (ByteIterator v : values.values()) {
......
...@@ -22,10 +22,10 @@ ...@@ -22,10 +22,10 @@
# Default data size: 1 KB records (10 fields, 100 bytes each, plus key) # Default data size: 1 KB records (10 fields, 100 bytes each, plus key)
# Request distribution: zipfian # Request distribution: zipfian
recordcount=200 recordcount=2000000
operationcount=200 operationcount=500000
fieldcount=1 fieldcount=1
fieldlength=200 fieldlength=2500
workload=site.ycsb.workloads.CoreWorkload workload=site.ycsb.workloads.CoreWorkload
......
...@@ -11,14 +11,13 @@ ...@@ -11,14 +11,13 @@
//#include <boost/lockfree/queue.hpp> //#include <boost/lockfree/queue.hpp>
#include <queue> #include <queue>
#include <map> #include <map>
#include <mutex> #include <mutex>
#include "Buffer.hpp" #include "Buffer.hpp"
#include "Logger.hpp" #include "Logger.hpp"
#include "RdmaEndpointGroup.hpp" #include "RdmaEndpointGroup.hpp"
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
#include "RdmaFuture.hpp" #include "RdmaFuture.hpp"
class RdmaClientEndpoint class RdmaClientEndpoint
{ {
...@@ -41,7 +40,7 @@ class RdmaClientEndpoint ...@@ -41,7 +40,7 @@ class RdmaClientEndpoint
int _state; int _state;
int _maxInLine; int _maxInLine;
int _timeoutMs; int _timeoutMs;
const char *_connData; const char *_connData;
char *_sendBuff = NULL; char *_sendBuff = NULL;
...@@ -49,32 +48,34 @@ class RdmaClientEndpoint ...@@ -49,32 +48,34 @@ class RdmaClientEndpoint
struct ibv_mr *_sendMr = NULL; struct ibv_mr *_sendMr = NULL;
struct ibv_mr *_recvMr = NULL; struct ibv_mr *_recvMr = NULL;
//boost::lockfree::queue<char *> *_sendBuffers; // boost::lockfree::queue<char *> *_sendBuffers;
std::queue<char*> _sendBuffers; std::queue<char *> _sendBuffers;
std::mutex _sendBuffersM; std::mutex _sendBuffersM;
std::map<uint64_t,RdmaFuture*> futures;
void completeClose(); std::map<uint64_t, RdmaFuture *> futures;
std::mutex _futureMutex;
void connect(); void connect();
void registerMemory(); void registerMemory();
void createResources(); void createQueuePairs();
public: public:
std::atomic<uint64_t> _requestId{12}; std::atomic<uint64_t> _requestId{12};
RdmaClientEndpoint(struct rdma_cm_id *id, RdmaEndpointGroup *group, int sendQueueSize, int recvQueueSize, RdmaClientEndpoint(struct rdma_cm_id *id, RdmaEndpointGroup *group, int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, int maxInLine, int timeout); int sendMsgSize, int recvMsgSize, int maxInLine, int timeout);
void processCmEvent(struct rdma_cm_event *event);
void connect(const char *ip, const char *port, const char *connData); void connect(const char *ip, const char *port, const char *connData);
bool isConnected(); bool isConnected();
void close(); void close();
void completeClose();
void processCmEvent(struct rdma_cm_event *event); void processSendComp(struct ibv_wc &);
void processSendComp(struct ibv_wc); void processRecvComp(struct ibv_wc &);
void processRecvComp(struct ibv_wc);
int sendMessage(const char *buffer, int size); int sendMessage(const char *buffer, int size);
RdmaFuture* put(const char *key, int keySize, const char *value, int valueSize); RdmaFuture *put(const char *key, int keySize, const char *value, int valueSize);
RdmaFuture* get(const char *key, int keySize); RdmaFuture *get(const char *key, int keySize);
RdmaFuture* deleteKey(const char *key, int keySize); RdmaFuture *deleteKey(const char *key, int keySize);
}; };
#endif #endif
\ No newline at end of file
...@@ -46,7 +46,7 @@ public: ...@@ -46,7 +46,7 @@ public:
RdmaFuture* put(const char *key, int keySize, const char *value, int valueSize); RdmaFuture* put(const char *key, int keySize, const char *value, int valueSize);
RdmaFuture* get(const char *key, int keySize); RdmaFuture* get(const char *key, int keySize);
RdmaFuture* deleteKey(const char *key, int keySize); RdmaFuture* deleteKey(const char *key, int keySize);
void close();
}; };
#endif #endif
\ No newline at end of file
#ifndef __RDMACMPROCESSOR__
#define __RDMACMPROCESSOR__
#include <unordered_map>
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#ifndef __RDMACMPROCESSOR__
#define __RDMACMPROCESSOR__
#include "RdmaEndpointGroup.hpp" #include "RdmaEndpointGroup.hpp"
#include <unordered_map> #include "Logger.hpp"
class RdmaCmProcessor class RdmaCmProcessor
{ {
......
...@@ -12,11 +12,12 @@ ...@@ -12,11 +12,12 @@
class RdmaCqProcessor class RdmaCqProcessor
{ {
bool _stop{false};
public: public:
struct ibv_comp_channel *_compChannel; struct ibv_comp_channel *_compChannel;
struct ibv_cq *_completionQueue; struct ibv_cq *_completionQueue;
std::thread *_compQueueThread; std::thread *_compQueueThread;
std::unordered_map<uint32_t, RdmaClientEndpoint *> *_qpEndpointMap{NULL}; std::unordered_map<uint32_t, RdmaClientEndpoint *> _qpEndpointMap;
RdmaCqProcessor(ibv_context *verbs, int compQueueSize); RdmaCqProcessor(ibv_context *verbs, int compQueueSize);
struct ibv_cq *getCq(); struct ibv_cq *getCq();
......
#For commenting used # empty line are ignored sendQS=100
#comments after parameters also supported recvQS=100
# use key=value format compQS=100
#All Parameters will be taken as string sendMS=3000
# Fixed Parameters recvMS=3000
ENABLE_LOGGING=1
ENABLE_LOGGING=0 NSERVERS=2
SERVER_IP=192.168.200.20 SERVER_IP1=192.168.200.30
SERVER_PORT=1921 SERVER_PORT1=1920
EXECUTOR_POOL_SIZE=4 SERVER_IP2=192.168.200.50
\ No newline at end of file SERVER_PORT2=1920
...@@ -32,6 +32,8 @@ JNIEXPORT jlong JNICALL Java_JClient_createEndpointGroup(JNIEnv *jenv, jobject j ...@@ -32,6 +32,8 @@ JNIEXPORT jlong JNICALL Java_JClient_createEndpointGroup(JNIEnv *jenv, jobject j
JNIEXPORT void JNICALL Java_JClient_closeEndpointGroup(JNIEnv *jenv, jobject jobj, jlong jclient) JNIEXPORT void JNICALL Java_JClient_closeEndpointGroup(JNIEnv *jenv, jobject jobj, jlong jclient)
{ {
RdmaClientEndpointGroup *group = reinterpret_cast<RdmaClientEndpointGroup *>(jclient); RdmaClientEndpointGroup *group = reinterpret_cast<RdmaClientEndpointGroup *>(jclient);
group->close();
//delete group;
// group->close(); // group->close();
} }
...@@ -61,10 +63,10 @@ JNIEXPORT jbyteArray JNICALL Java_JClient_get(JNIEnv *jenv, jobject jobj, jlong ...@@ -61,10 +63,10 @@ JNIEXPORT jbyteArray JNICALL Java_JClient_get(JNIEnv *jenv, jobject jobj, jlong
/* Create Java Byte Array and send data to java side /* Create Java Byte Array and send data to java side
* after copying data from c char array to java byte array * after copying data from c char array to java byte array
*/ */
std::cout << "get future.get()\n"; //std::cout << "get future.get()\n";
char *data = future->get(); char *data = future->get();
delete future; delete future;
std::cout << "g future.get()\n"; //std::cout << "g future.get()\n";
struct MessageHeader *response = (struct MessageHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
if (response->type == MessageType::FAILURE) if (response->type == MessageType::FAILURE)
{ {
...@@ -117,12 +119,12 @@ JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclien ...@@ -117,12 +119,12 @@ JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclien
{ {
return -1; return -1;
} }
std::cout << "put future.get()\n"; //std::cout << "put future.get()\n";
auto data = future->get(); auto data = future->get();
delete future; delete future;
std::cout << "p future.get()\n"; //std::cout << "p future.get()\n";
struct MessageHeader *response = (struct MessageHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
std::cout<<"server response "<<response->type<<"\n"; //std::cout<<"server response "<<response->type<<"\n";
if (response->type == MessageType::SUCCESS) if (response->type == MessageType::SUCCESS)
{ {
// Currently 0 indicate succces and remaing are failure more status code can be added in future // Currently 0 indicate succces and remaing are failure more status code can be added in future
...@@ -153,10 +155,10 @@ JNIEXPORT jint JNICALL Java_JClient_delete(JNIEnv *jenv, jobject jobj, jlong jcl ...@@ -153,10 +155,10 @@ JNIEXPORT jint JNICALL Java_JClient_delete(JNIEnv *jenv, jobject jobj, jlong jcl
{ {
return -1; return -1;
} }
std::cout << "delete future.get()\n"; //std::cout << "delete future.get()\n";
auto data = future->get(); auto data = future->get();
delete future; delete future;
std::cout << "d future.get()\n"; //std::cout << "d future.get()\n";
struct MessageHeader *response = (struct MessageHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
if (response->type == MessageType::FAILURE) if (response->type == MessageType::FAILURE)
{ {
......
...@@ -28,6 +28,7 @@ void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *c ...@@ -28,6 +28,7 @@ void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *c
CPPLog::LOG_DEBUG("RdmaClientEndpoint : step2 getaddrinfo"); CPPLog::LOG_DEBUG("RdmaClientEndpoint : step2 getaddrinfo");
struct addrinfo *addr; struct addrinfo *addr;
ret = getaddrinfo(ip, port, NULL, &addr); ret = getaddrinfo(ip, port, NULL, &addr);
std::cout << "get addr info " << ret << "\n";
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaServerEndpointGroup : get_addr_info failed"); CPPLog::LOG_ERROR("RdmaServerEndpointGroup : get_addr_info failed");
...@@ -35,13 +36,13 @@ void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *c ...@@ -35,13 +36,13 @@ void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *c
CPPLog::LOG_DEBUG("RdmaClientEndpoint : step2 resolve addr"); CPPLog::LOG_DEBUG("RdmaClientEndpoint : step2 resolve addr");
ret = rdma_resolve_addr(_cm_id, NULL, addr->ai_addr, _timeoutMs); ret = rdma_resolve_addr(_cm_id, NULL, addr->ai_addr, _timeoutMs);
std::cout << "resolve route " << ret << "\n";
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("unable to resolve addr"); CPPLog::LOG_ERROR("unable to resolve addr");
return; return;
} }
CPPLog::LOG_DEBUG("RdmaClientEndpoint : step2 resolve addr resolved"); CPPLog::LOG_DEBUG("RdmaClientEndpoint : step2 resolve addr resolved");
_state = CONN_STATE_ADDR_RESOLVED;
} }
void RdmaClientEndpoint::connect() void RdmaClientEndpoint::connect()
...@@ -56,27 +57,39 @@ void RdmaClientEndpoint::connect() ...@@ -56,27 +57,39 @@ void RdmaClientEndpoint::connect()
conn_param.rnr_retry_count = 7; conn_param.rnr_retry_count = 7;
conn_param.private_data = _connData; conn_param.private_data = _connData;
conn_param.private_data_len = strlen(_connData); conn_param.private_data_len = strlen(_connData);
rdma_connect(_cm_id, &conn_param); int ret = rdma_connect(_cm_id, &conn_param);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_connect error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
} }
else else
{ {
rdma_connect(_cm_id, NULL); int ret = rdma_connect(_cm_id, NULL);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_connect error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
} }
} }
bool RdmaClientEndpoint::isConnected() bool RdmaClientEndpoint::isConnected()
{ {
return _state == CONN_STATE_CONNECTED; return _state == CONN_STATE_CONNECTED;
} }
void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event) void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
{ {
std::cout << "ClientEndpoint " << rdma_event_str(event->event) << "\n";
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL) if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL)
{ {
_state = CONN_STATE_ADDR_RESOLVED;
CPPLog::LOG_DEBUG("RdmaClientEndpoint : step3 resolve_route"); CPPLog::LOG_DEBUG("RdmaClientEndpoint : step3 resolve_route");
createResources(); createQueuePairs();
rdma_resolve_route(_cm_id, _timeoutMs); rdma_resolve_route(_cm_id, _timeoutMs);
} }
else if (event->event == RDMA_CM_EVENT_ROUTE_RESOLVED && event->id != NULL) else if (event->event == RDMA_CM_EVENT_ROUTE_RESOLVED && event->id != NULL)
...@@ -95,6 +108,11 @@ void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event) ...@@ -95,6 +108,11 @@ void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
CPPLog::LOG_DEBUG("RdmaClientEndpoint : step7 Closed"); CPPLog::LOG_DEBUG("RdmaClientEndpoint : step7 Closed");
completeClose(); completeClose();
} }
else if (event->id != NULL && event->event == RDMA_CM_EVENT_UNREACHABLE)
{
std::cout << "Not able to connect client\n";
exit(1);
}
else else
{ {
std::ostringstream ss; std::ostringstream ss;
...@@ -124,14 +142,18 @@ void RdmaClientEndpoint::completeClose() ...@@ -124,14 +142,18 @@ void RdmaClientEndpoint::completeClose()
if (_state != CONN_STATE_PARTIAL_CLOSED) if (_state != CONN_STATE_PARTIAL_CLOSED)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : completeClose invalid state"); CPPLog::LOG_ERROR("RdmaClientEndpoint : completeClose invalid state");
//return; // return;
} }
_state = CONN_STATE_CLOSED; _state = CONN_STATE_CLOSED;
// delete _sendBuffers; // delete _sendBuffers;
rdma_dereg_mr(_sendMr); if (_sendMr != nullptr)
rdma_dereg_mr(_recvMr); rdma_dereg_mr(_sendMr);
delete[] _sendBuff; if (_recvMr != nullptr)
delete[] _recvBuff; rdma_dereg_mr(_recvMr);
if (_sendBuff != nullptr)
delete[] _sendBuff;
if (_recvBuff != nullptr)
delete[] _recvBuff;
rdma_destroy_qp(_cm_id); rdma_destroy_qp(_cm_id);
rdma_destroy_id(_cm_id); rdma_destroy_id(_cm_id);
CPPLog::LOG_INFO("RdmaClientEndpoint : close connection"); CPPLog::LOG_INFO("RdmaClientEndpoint : close connection");
...@@ -144,11 +166,11 @@ void RdmaClientEndpoint::registerMemory() ...@@ -144,11 +166,11 @@ void RdmaClientEndpoint::registerMemory()
CPPLog::LOG_ERROR("RdmaClientEndpoint : createResource address not resolved"); CPPLog::LOG_ERROR("RdmaClientEndpoint : createResource address not resolved");
return; return;
} }
_sendBuff = new char[(_sendMsgSize+MessageHeaderSize) * _sendQueueSize]; _sendBuff = new char[(_sendMsgSize + MessageHeaderSize) * _sendQueueSize];
if (_sendBuff == NULL) if (_sendBuff == NULL)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : sendBuff allocation failed"); CPPLog::LOG_ERROR("RdmaClientEndpoint : sendBuff allocation failed");
return; exit(1);
} }
_sendMr = rdma_reg_msgs(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMr = rdma_reg_msgs(_cm_id, reinterpret_cast<void *>(_sendBuff),
...@@ -156,14 +178,14 @@ void RdmaClientEndpoint::registerMemory() ...@@ -156,14 +178,14 @@ void RdmaClientEndpoint::registerMemory()
if (_sendMr == NULL) if (_sendMr == NULL)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : sendMr reg failed"); CPPLog::LOG_ERROR("RdmaClientEndpoint : sendMr reg failed");
return; exit(1);
} }
_recvBuff = new char[(MessageHeaderSize + _recvMsgSize) * _recvQueueSize]; _recvBuff = new char[(MessageHeaderSize + _recvMsgSize) * _recvQueueSize];
if (_recvBuff == NULL) if (_recvBuff == NULL)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : recvBuff allocation failed\n"); CPPLog::LOG_ERROR("RdmaClientEndpoint : recvBuff allocation failed\n");
return; exit(1);
} }
_recvMr = rdma_reg_msgs(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMr = rdma_reg_msgs(_cm_id, reinterpret_cast<void *>(_recvBuff),
...@@ -172,14 +194,18 @@ void RdmaClientEndpoint::registerMemory() ...@@ -172,14 +194,18 @@ void RdmaClientEndpoint::registerMemory()
if (_recvMr == NULL) if (_recvMr == NULL)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : recvMr reg failed\n"); CPPLog::LOG_ERROR("RdmaClientEndpoint : recvMr reg failed\n");
return; exit(1);
} }
for (int i = 0; i < _recvQueueSize; i++) for (int i = 0; i < _recvQueueSize; i++)
{ {
char *buffer = _recvBuff + i * _recvMsgSize; char *buffer = _recvBuff + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(buffer), reinterpret_cast<void *>(buffer), int ret = rdma_post_recv(_cm_id, reinterpret_cast<void *>(buffer), reinterpret_cast<void *>(buffer),
_recvMsgSize, _recvMr); _recvMsgSize, _recvMr);
if (ret == -1)
{
CPPLog::LOG_ERROR("Failed rmda_post_recv");
}
} }
for (int i = 0; i < _sendQueueSize; i++) for (int i = 0; i < _sendQueueSize; i++)
{ {
...@@ -191,18 +217,19 @@ void RdmaClientEndpoint::registerMemory() ...@@ -191,18 +217,19 @@ void RdmaClientEndpoint::registerMemory()
_state = CONN_STATE_RESOURCES_ALLOCATED; _state = CONN_STATE_RESOURCES_ALLOCATED;
} }
void RdmaClientEndpoint::createResources()
void RdmaClientEndpoint::createQueuePairs()
{ {
if (_state != CONN_STATE_ADDR_RESOLVED) if (_state != CONN_STATE_ADDR_RESOLVED)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : createResource address not resolved"); CPPLog::LOG_ERROR("RdmaClientEndpoint : createResource address not resolved");
return; exit(1);
} }
_protectionDomain = ibv_alloc_pd(_cm_id->verbs); _protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL) if (_protectionDomain == NULL)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : ibv_alloc_pd failed "); CPPLog::LOG_ERROR("RdmaClientEndpoint : ibv_alloc_pd failed ");
return; exit(1);
} }
struct ibv_cq *completionQueue = _group->createCq(_cm_id); struct ibv_cq *completionQueue = _group->createCq(_cm_id);
struct ibv_qp_init_attr qp_init_attr; struct ibv_qp_init_attr qp_init_attr;
...@@ -229,7 +256,8 @@ void RdmaClientEndpoint::createResources() ...@@ -229,7 +256,8 @@ void RdmaClientEndpoint::createResources()
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr); int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaClientEndpoint : ibv_create_cq failed"); CPPLog::LOG_ERROR("RdmaClientEndpoint : ibv_create_qp failed");
exit(1);
} }
if (_cm_id->pd == NULL) if (_cm_id->pd == NULL)
{ {
...@@ -245,12 +273,12 @@ int RdmaClientEndpoint::sendMessage(const char *buffer, int size) ...@@ -245,12 +273,12 @@ int RdmaClientEndpoint::sendMessage(const char *buffer, int size)
if (size > _sendMsgSize) if (size > _sendMsgSize)
{ {
std::ostringstream ss; std::ostringstream ss;
ss<<"Large Message size "<<size; ss << "Large Message size " << size;
ss<<" send buffer size " << _sendMsgSize; ss << " send buffer size " << _sendMsgSize;
CPPLog::LOG_ERROR(ss); CPPLog::LOG_ERROR(ss);
return -1; return -1;
} }
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
return -1; return -1;
...@@ -259,15 +287,27 @@ int RdmaClientEndpoint::sendMessage(const char *buffer, int size) ...@@ -259,15 +287,27 @@ int RdmaClientEndpoint::sendMessage(const char *buffer, int size)
lock.unlock(); lock.unlock();
memcpy(sendBuffer, buffer, size); memcpy(sendBuffer, buffer, size);
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0); int ret = rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
return ret;
} }
void RdmaClientEndpoint::processSendComp(struct ibv_wc wc) void RdmaClientEndpoint::processSendComp(struct ibv_wc &wc)
{ {
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if ((char *)wc.wr_id == nullptr)
{
CPPLog::LOG_ERROR("Process Send Comp got nullptr in wc_id");
return;
}
_sendBuffers.push((char *)wc.wr_id); _sendBuffers.push((char *)wc.wr_id);
} }
void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc) void RdmaClientEndpoint::processRecvComp(struct ibv_wc &wc)
{ {
char *data = new char[wc.byte_len]; char *data = new char[wc.byte_len];
memcpy(data, (void *)wc.wr_id, wc.byte_len); memcpy(data, (void *)wc.wr_id, wc.byte_len);
...@@ -280,6 +320,7 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc) ...@@ -280,6 +320,7 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc)
CPPLog::LOG_INFO(ss);*/ CPPLog::LOG_INFO(ss);*/
if (response->type != MessageType::INVALIDATE) if (response->type != MessageType::INVALIDATE)
{ {
std::unique_lock<std::mutex> lock(_futureMutex);
auto it = futures.find(response->id); auto it = futures.find(response->id);
if (it == futures.end()) if (it == futures.end())
{ {
...@@ -287,17 +328,30 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc) ...@@ -287,17 +328,30 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc)
ss << "Recv completion for invalid id" << response->id; ss << "Recv completion for invalid id" << response->id;
CPPLog::LOG_DEBUG(ss); CPPLog::LOG_DEBUG(ss);
} }
it->second->put(data); auto future = it->second;
futures.erase(it);
lock.unlock();
future->put(data);
} }
} }
RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *value, int valueSize) RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *value, int valueSize)
{ {
if (keySize + valueSize + (int)MessageHeaderSize > _sendMsgSize) if (keySize + valueSize > _sendMsgSize)
{
std::ostringstream ss;
ss << "Large Message size " << keySize + valueSize;
ss << " send buffer size " << _sendMsgSize;
CPPLog::LOG_ERROR(ss);
return nullptr; return nullptr;
}
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
{
CPPLog::LOG_ERROR("No Buffers available to send packet");
return nullptr; return nullptr;
}
char *sendBuffer = _sendBuffers.front(); char *sendBuffer = _sendBuffers.front();
_sendBuffers.pop(); _sendBuffers.pop();
lock.unlock(); lock.unlock();
...@@ -308,20 +362,39 @@ RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *va ...@@ -308,20 +362,39 @@ RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *va
request->valueSize = valueSize; request->valueSize = valueSize;
memcpy(sendBuffer + MessageHeaderSize, key, keySize); memcpy(sendBuffer + MessageHeaderSize, key, keySize);
memcpy(sendBuffer + MessageHeaderSize + keySize, value, valueSize); memcpy(sendBuffer + MessageHeaderSize + keySize, value, valueSize);
rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize + valueSize, int ret = rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize + valueSize,
_sendMr, 0); _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
return nullptr;
}
RdmaFuture *future = new RdmaFuture(request->id); RdmaFuture *future = new RdmaFuture(request->id);
std::unique_lock<std::mutex> futureLock(_futureMutex);
futures[request->id] = future; futures[request->id] = future;
std::cout<<"created future\n"; futureLock.unlock();
// std::cout << "created future\n";
return future; return future;
} }
RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize) RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize)
{ {
if (keySize + (int)MessageHeaderSize > _sendMsgSize) if (keySize > _sendMsgSize)
{
std::ostringstream ss;
ss << "Large Message size " << keySize;
ss << " send buffer size " << _sendMsgSize;
CPPLog::LOG_ERROR(ss);
return nullptr; return nullptr;
}
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
{
CPPLog::LOG_ERROR("No Buffers available to send packet");
return nullptr; return nullptr;
}
char *sendBuffer = _sendBuffers.front(); char *sendBuffer = _sendBuffers.front();
_sendBuffers.pop(); _sendBuffers.pop();
lock.unlock(); lock.unlock();
...@@ -330,19 +403,40 @@ RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize) ...@@ -330,19 +403,40 @@ RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize)
request->type = MessageType::GET; request->type = MessageType::GET;
request->keySize = keySize; request->keySize = keySize;
memcpy(sendBuffer + MessageHeaderSize, key, keySize); memcpy(sendBuffer + MessageHeaderSize, key, keySize);
rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize, /*context associated with the send request will be returned,
_sendMr, 0); through the work completion wr_id, work request identifier, field. */
int ret = rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize,
_sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
return nullptr;
}
RdmaFuture *future = new RdmaFuture(request->id); RdmaFuture *future = new RdmaFuture(request->id);
std::unique_lock<std::mutex> futureLock(_futureMutex);
futures[request->id] = future; futures[request->id] = future;
futureLock.unlock();
return future; return future;
} }
RdmaFuture *RdmaClientEndpoint::deleteKey(const char *key, int keySize) RdmaFuture *RdmaClientEndpoint::deleteKey(const char *key, int keySize)
{ {
if (keySize + (int)MessageHeaderSize > _sendMsgSize) if (keySize > _sendMsgSize)
{
std::ostringstream ss;
ss << "Large Message size " << keySize;
ss << " send buffer size " << _sendMsgSize;
CPPLog::LOG_ERROR(ss);
return nullptr; return nullptr;
}
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
{
CPPLog::LOG_ERROR("No Buffers available to send packet");
return nullptr; return nullptr;
}
char *sendBuffer = _sendBuffers.front(); char *sendBuffer = _sendBuffers.front();
_sendBuffers.pop(); _sendBuffers.pop();
lock.unlock(); lock.unlock();
...@@ -351,9 +445,18 @@ RdmaFuture *RdmaClientEndpoint::deleteKey(const char *key, int keySize) ...@@ -351,9 +445,18 @@ RdmaFuture *RdmaClientEndpoint::deleteKey(const char *key, int keySize)
request->type = MessageType::DELETE; request->type = MessageType::DELETE;
request->keySize = keySize; request->keySize = keySize;
memcpy(sendBuffer + MessageHeaderSize, key, keySize); memcpy(sendBuffer + MessageHeaderSize, key, keySize);
rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize, int ret = rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize,
_sendMr, 0); _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
return nullptr;
}
RdmaFuture *future = new RdmaFuture(request->id); RdmaFuture *future = new RdmaFuture(request->id);
std::unique_lock<std::mutex> futureLock(_futureMutex);
futures[request->id] = future; futures[request->id] = future;
futureLock.unlock();
return future; return future;
} }
...@@ -27,9 +27,10 @@ void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -27,9 +27,10 @@ void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event)
} }
else else
{ {
std::cout << "RdmaClientEndpointGroup : Not able to procces CM EVent"; std::ostringstream ss;
std::cout << rdma_event_str(event->event) << event->id << " "; ss<<"RdmaClientEndpointGroup : Not able to procces CM EVent" << rdma_event_str(event->event) ;
std::cout << event->listen_id << std::endl; ss << " id "<<event->id << " listen id"<< event->listen_id;
CPPLog::LOG_ERROR(ss);
} }
} }
...@@ -39,6 +40,8 @@ RdmaClientEndpoint *RdmaClientEndpointGroup::createEndpoint() ...@@ -39,6 +40,8 @@ RdmaClientEndpoint *RdmaClientEndpointGroup::createEndpoint()
RdmaClientEndpoint *endpoint = new RdmaClientEndpoint(id, this, RdmaClientEndpoint *endpoint = new RdmaClientEndpoint(id, this,
_sendQueueSize, _recvQueueSize, _sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize, _maxInLine, _timeoutMs); _sendMsgSize, _recvMsgSize, _maxInLine, _timeoutMs);
/* Setting Endpoint in cm_id context so that whenever cm event come we can get endpoint
*/
id->context = (void *)endpoint; id->context = (void *)endpoint;
return endpoint; return endpoint;
} }
...@@ -47,7 +50,6 @@ struct ibv_cq *RdmaClientEndpointGroup::createCq(struct rdma_cm_id *id) ...@@ -47,7 +50,6 @@ struct ibv_cq *RdmaClientEndpointGroup::createCq(struct rdma_cm_id *id)
{ {
if (_cqProcessor == NULL) if (_cqProcessor == NULL)
{ {
std::cout << "RdmaClientEndpointGroup : Creating CQ processor" << std::endl;
_cqProcessor = new RdmaCqProcessor(id->verbs, _compQueueSize); _cqProcessor = new RdmaCqProcessor(id->verbs, _compQueueSize);
_cqProcessor->start(); _cqProcessor->start();
} }
...@@ -60,34 +62,18 @@ void RdmaClientEndpointGroup::createClientEps(Properties *prop) ...@@ -60,34 +62,18 @@ void RdmaClientEndpointGroup::createClientEps(Properties *prop)
std::cout << "clients" << _clients << "\n"; std::cout << "clients" << _clients << "\n";
for (int i = 0; i < _clients; i++) for (int i = 0; i < _clients; i++)
{ {
std::cout << "creating client for " << prop->getValue("SERVER_IP") << (i + 1); std::cout << "creating client for " << prop->getValue("SERVER_IP" + std::to_string(i + 1));
std::cout << ":" << prop->getValue("SERVER_PORT") << (i + 1) << " \n"; std::cout << ":" << prop->getValue("SERVER_PORT" + std::to_string(i + 1)) << " \n";
RdmaClientEndpoint *ep = createEndpoint(); RdmaClientEndpoint *ep = createEndpoint();
ep->connect((prop->getValue("SERVER_IP" + std::to_string(i + 1))).c_str(), ep->connect((prop->getValue("SERVER_IP" + std::to_string(i + 1))).c_str(),
(prop->getValue("SERVER_PORT" + std::to_string(i + 1))).c_str(), "sal"); (prop->getValue("SERVER_PORT" + std::to_string(i + 1))).c_str(), "sal");
_clientEps.push_back(ep); _clientEps.push_back(ep);
std::cout << "ep" << ep << std::endl;
} }
std::cout << "vec size" << _clientEps.size() << "\n";
for (int i = 0; i < _clients; i++) for (int i = 0; i < _clients; i++)
{ {
std::cout << "ep" << _clientEps[i] << std::endl; while (!_clientEps[i]->isConnected())
int timeout = 0;
do
{ {
if (_clientEps[i]->isConnected()) continue;
{
break;
}
std::cout << "timeout " << timeout << "\n"
<< _clients;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
timeout += 10;
} while (timeout < 1000);
if (!_clientEps[i]->isConnected())
{
std::cout << "Client Endpoint not connected " << prop->getValue("SERVER_IP" + std::to_string(i + 1)) << "\n";
exit(1);
} }
} }
std::cout << "connected\n"; std::cout << "connected\n";
...@@ -114,4 +100,24 @@ RdmaFuture *RdmaClientEndpointGroup::deleteKey(const char *key, int keySize) ...@@ -114,4 +100,24 @@ RdmaFuture *RdmaClientEndpointGroup::deleteKey(const char *key, int keySize)
int id = _counter; int id = _counter;
_counter = (_counter + 1) % _clients; _counter = (_counter + 1) % _clients;
return _clientEps[id]->deleteKey(key, keySize); return _clientEps[id]->deleteKey(key, keySize);
}
void RdmaClientEndpointGroup::close()
{
if (_cmProcessor != nullptr)
{
_cmProcessor->close();
delete _cmProcessor;
}
if (_cqProcessor != nullptr)
{
_cqProcessor->close();
delete _cqProcessor;
}
for (auto ep : _clientEps)
{
ep->close();
ep->completeClose();
delete ep;
}
} }
\ No newline at end of file
...@@ -4,12 +4,13 @@ ...@@ -4,12 +4,13 @@
RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group) RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group)
: _endpointGroup(group) : _endpointGroup(group)
{ {
std::cout << "CMProcessor : Step 1 creating event channel" << std::endl; CPPLog::LOG_ALWAYS("CMProcessor : Step 1 creating event channel");
_eventChannel = rdma_create_event_channel(); _eventChannel = rdma_create_event_channel();
_stop = false; _stop = false;
if (_eventChannel == NULL) if (_eventChannel == NULL)
{ {
std::cout << "CMProcesor : error creating event channel"; CPPLog::LOG_ERROR("CMProcesor : error creating event channel");
exit(1);
} }
} }
...@@ -17,39 +18,44 @@ struct rdma_event_channel *RdmaCmProcessor::getEventChannel() ...@@ -17,39 +18,44 @@ struct rdma_event_channel *RdmaCmProcessor::getEventChannel()
{ {
return _eventChannel; return _eventChannel;
} }
struct rdma_cm_id *RdmaCmProcessor::createId() struct rdma_cm_id *RdmaCmProcessor::createId()
{ {
struct rdma_cm_id *id = NULL; struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP); int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
if (ret == -1) if (ret == -1)
std::cout << "CMProcesor : rdma_create_id failed" << std::endl; {
CPPLog::LOG_ERROR("CMProcesor : rdma_create_id failed");
return nullptr;
}
return id; return id;
} }
void RdmaCmProcessor::start() void RdmaCmProcessor::start()
{ {
_cmEventThread = new std::thread(&RdmaCmProcessor::processCmEvent, this); _cmEventThread = new std::thread(&RdmaCmProcessor::processCmEvent, this);
pthread_setname_np(_cmEventThread->native_handle(),"CMProcessor"); pthread_setname_np(_cmEventThread->native_handle(), "CMProcessor");
} }
void RdmaCmProcessor::processCmEvent() void RdmaCmProcessor::processCmEvent()
{ {
int ret; int ret;
struct rdma_cm_event *event; struct rdma_cm_event *event;
std::cout << "CMProcessor : starting cm processing thread" << std::endl; CPPLog::LOG_ALWAYS("CMProcessor : starting cm processing thread");
while (!_stop) while (!_stop)
{ {
ret = rdma_get_cm_event(_eventChannel, &event); ret = rdma_get_cm_event(_eventChannel, &event);
if (ret) if (ret)
{ {
std::cout << "CMProcesor : rdma_get_cm_event failed" << std::endl; CPPLog::LOG_ERROR("CMProcesor : rdma_get_cm_event failed");
continue; continue;
} }
_endpointGroup->processCmEvent(event); _endpointGroup->processCmEvent(event);
ret = rdma_ack_cm_event(event); ret = rdma_ack_cm_event(event);
if (ret) if (ret)
{ {
std::cout << "CMProcesor : rdma_ack_cm_event failed"; CPPLog::LOG_ERROR( "CMProcesor : rdma_ack_cm_event failed");
} }
} }
} }
...@@ -57,6 +63,10 @@ void RdmaCmProcessor::processCmEvent() ...@@ -57,6 +63,10 @@ void RdmaCmProcessor::processCmEvent()
void RdmaCmProcessor::close() void RdmaCmProcessor::close()
{ {
_stop = true; _stop = true;
_cmEventThread->join(); if(_cmEventThread != nullptr)
{
_cmEventThread->join();
delete _cmEventThread;
}
rdma_destroy_event_channel(_eventChannel); rdma_destroy_event_channel(_eventChannel);
} }
\ No newline at end of file
...@@ -2,46 +2,49 @@ ...@@ -2,46 +2,49 @@
RdmaCqProcessor::RdmaCqProcessor(ibv_context *verbs, int compQueueSize) RdmaCqProcessor::RdmaCqProcessor(ibv_context *verbs, int compQueueSize)
{ {
//_qpEndpointMap = new std::unordered_map<>(); CPPLog::LOG_ALWAYS("RdmaClientEndpointGroup : Creating CQ processor");
//_qpEndpointMap = new std::unordered_map<uint32_t, RdmaClientEndpoint *>();
_qpEndpointMap = new std::unordered_map<uint32_t, RdmaClientEndpoint *>();
_compChannel = ibv_create_comp_channel(verbs); _compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL) if (_compChannel == NULL)
{ {
std::cout << "CqProcessr : ibv_create_comp_channel failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_create_comp_channel failed");
return; exit(1);
} }
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0); _completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL) if (_completionQueue == NULL)
{ {
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl; CPPLog::LOG_ERROR("CqProcessr : ibv_create_cq failed");
return; exit(1);
} }
int ret = ibv_req_notify_cq(_completionQueue, 0); int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
std::cout << "CqProcessr : ibv_req_notify_cq failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_req_notify_cq failed");
} }
} }
struct ibv_cq *RdmaCqProcessor::getCq() struct ibv_cq *RdmaCqProcessor::getCq()
{ {
return _completionQueue; return _completionQueue;
} }
void RdmaCqProcessor::registerEp(uint64_t qp,RdmaClientEndpoint* ep) void RdmaCqProcessor::registerEp(uint64_t qp, RdmaClientEndpoint *ep)
{ {
_qpEndpointMap->emplace(qp,ep); _qpEndpointMap.emplace(qp, ep);
} }
void RdmaCqProcessor::deRegisterEp(uint64_t qp) void RdmaCqProcessor::deRegisterEp(uint64_t qp)
{ {
_qpEndpointMap.erase(qp);
} }
void RdmaCqProcessor::start() void RdmaCqProcessor::start()
{ {
std::cout << "CqProcessr : starting process CQ events" << std::endl; CPPLog::LOG_ALWAYS("CqProcessr : starting process CQ events");
_compQueueThread = new std::thread(&RdmaCqProcessor::processCQEvents, this); _compQueueThread = new std::thread(&RdmaCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(),"compQueueThread"); pthread_setname_np(_compQueueThread->native_handle(), "compQueueThread");
} }
void RdmaCqProcessor::processCQEvents() void RdmaCqProcessor::processCQEvents()
{ {
int ret = 0; int ret = 0;
...@@ -49,26 +52,26 @@ void RdmaCqProcessor::processCQEvents() ...@@ -49,26 +52,26 @@ void RdmaCqProcessor::processCQEvents()
void *context; void *context;
const int nevent = 10; const int nevent = 10;
struct ibv_wc wc_array[nevent]; struct ibv_wc wc_array[nevent];
while (1) while (_stop != true)
{ {
ret = ibv_get_cq_event(_compChannel, &cq, &context); ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1) if (ret == -1)
{ {
std::cout << "CqProcessr : ibv_get_cq_event failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_get_cq_event failed");
close(); close();
} }
ibv_ack_cq_events(cq, 1); ibv_ack_cq_events(cq, 1);
ret = ibv_req_notify_cq(_completionQueue, 0); ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
std::cout << "CqProcessr : ibv_req_notify_cq failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_req_notify_cq failed");
close(); close();
} }
ret = ibv_poll_cq(cq, nevent, wc_array); ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0) if (ret < 0)
{ {
std::cout << "CqProcessr : ibv_poll_cq failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_poll_cq failed");
close(); close();
} }
if (ret == 0) if (ret == 0)
...@@ -78,44 +81,54 @@ void RdmaCqProcessor::processCQEvents() ...@@ -78,44 +81,54 @@ void RdmaCqProcessor::processCQEvents()
} }
} }
inline void RdmaCqProcessor::dispatchCqEvents(ibv_wc wc[], int size) inline void RdmaCqProcessor::dispatchCqEvents(ibv_wc wc[], int size)
{ {
for (int i = 0; i < size; i++) for (int i = 0; i < size; i++)
{ {
if (wc[i].status != IBV_WC_SUCCESS) if (wc[i].status != IBV_WC_SUCCESS)
{ {
std::cout << "RdmaCqProcessor : failed work completion : " << ibv_wc_status_str(wc[i].status) << " on qp " << wc[i].qp_num << std::endl; std::cout << "RdmaCqProcessor : failed work completion : " << ibv_wc_status_str(wc[i].status) << " on qp " << wc[i].qp_num << std::endl;
return; return;
} }
auto it = _qpEndpointMap->find(wc[i].qp_num); auto it = _qpEndpointMap.find(wc[i].qp_num);
if (it == _qpEndpointMap->end()) if (it == _qpEndpointMap.end())
{ {
std::cout << "RdmaCqProcessor : endpoint not registered for qp num" << std::endl; CPPLog::LOG_ALWAYS("RdmaCqProcessor : endpoint not registered for qp num");
return; return;
} }
switch (wc[i].opcode) switch (wc[i].opcode)
{ {
case IBV_WC_SEND: case IBV_WC_SEND:
it->second->processSendComp(wc[i]);
break;
case IBV_WC_RECV:
it->second->processRecvComp(wc[i]);
break;
case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n";
break;
case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n";
break;
default:
std::cout << "RdmaCqProcessor : invalid opcode" << std::endl;
break;
}
}
}
it->second->processSendComp(wc[i]);
break;
case IBV_WC_RECV:
it->second->processRecvComp(wc[i]);
break;
case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n";
break;
case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n";
break;
default:
std::cout << "RdmaCqProcessor : invalid opcode" << std::endl;
break;
}
}
}
void RdmaCqProcessor::close() void RdmaCqProcessor::close()
{ {
_stop = true;
if(_compQueueThread != nullptr)
{
_compQueueThread->join();
delete _compQueueThread;
}
if (_completionQueue != nullptr)
ibv_destroy_cq(_completionQueue);
if (_compChannel != nullptr)
ibv_destroy_comp_channel(_compChannel);
} }
\ No newline at end of file
...@@ -3,42 +3,44 @@ ...@@ -3,42 +3,44 @@
int RdmaFuture::DONE = 2; int RdmaFuture::DONE = 2;
int RdmaFuture::PENDING = 1; int RdmaFuture::PENDING = 1;
RdmaFuture::RdmaFuture(uint64_t id) RdmaFuture::RdmaFuture(uint64_t id)
: _requestId(id), state(PENDING), _data(nullptr) {} : _requestId(id), _data(nullptr)
{
state = PENDING;
}
char *RdmaFuture::get() char *RdmaFuture::get()
{ {
// std::cout << (unsigned)state << std::endl; // std::cout << (unsigned)state << std::endl;
std::unique_lock<std::mutex> lock(stateMutex); std::unique_lock<std::mutex> lock(stateMutex);
while(state!= DONE) while (state != DONE)
{ {
std::cout<<"getting data\n"; //std::cout << "getting data\n";
stateCv.wait(lock); stateCv.wait(lock,[&]{return state == DONE;});
} }
// [this](){return state!=DONE;}); return _data;
//lock.unlock(); // [this](){return state!=DONE;});
// lock.unlock();
// stateCv.wait(stateMutex, [](state != DONE;)); // stateCv.wait(stateMutex, [](state != DONE;));
// std::cout<<"get"<<std::endl; // std::cout<<"get"<<std::endl;
return _data;
} }
char *RdmaFuture::wait_for(int timeout) char *RdmaFuture::wait_for(int timeout)
{ {
std::unique_lock<std::mutex> lock(stateMutex); std::unique_lock<std::mutex> lock(stateMutex);
if (state == DONE) if (state == DONE)
return _data; return _data;
//lock.unlock(); // lock.unlock();
// add wait logic // add wait logic
return nullptr; return nullptr;
} }
void RdmaFuture::put(char *data) void RdmaFuture::put(char *data)
{ {
std::unique_lock<std::mutex> lock(stateMutex); std::unique_lock<std::mutex> lock(stateMutex);
std::cout<<"putting data\n"; //std::cout << "putting data\n";
_data = data; _data = data;
state = DONE; state = DONE;
//lock.unlock(); // lock.unlock();
stateCv.notify_one(); stateCv.notify_one();
// std::cout << "got data current state" <<data<< (unsigned)state; // std::cout << "got data current state" <<data<< (unsigned)state;
// std::unique_lock<std::mutex> lock(stateMutex); // std::unique_lock<std::mutex> lock(stateMutex);
// std::cout << "updated" << (unsigned)state; // std::cout << "updated" << (unsigned)state;
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment