Commit 8be2e01a authored by Paras Garg's avatar Paras Garg

Fixed Memory leak and virtual destructor problem added close for endpoints

parent 790ba763
......@@ -10,18 +10,18 @@
class Comparator
{
public:
inline bool operator()(const struct ibv_wc* c1,const struct ibv_wc* c2)
const
inline bool operator()(const struct ibv_wc *c1, const struct ibv_wc *c2)
const
{
struct MessageHeader *req1 = (struct MessageHeader *)c1->wr_id;
struct MessageHeader *req2 = (struct MessageHeader *)c2->wr_id;
if(req1->keySize != req2->keySize)
if (req1->keySize != req2->keySize)
return true;
char* key1 = (char*)req1+MessageHeaderSize;
char* key2 = (char*)req2+ MessageHeaderSize;
for(uint32_t i=0;i<req1->keySize;i++)
char *key1 = (char *)req1 + MessageHeaderSize;
char *key2 = (char *)req2 + MessageHeaderSize;
for (uint32_t i = 0; i < req1->keySize; i++)
{
if(key1[i]!=key2[i])
if (key1[i] != key2[i])
return true;
}
return false;
......@@ -38,73 +38,11 @@ private:
std::condition_variable queueCv;
public:
void push(struct ibv_wc *const &data)
{
std::unique_lock<std::mutex> lock(queueMutex);
queue1.push(data);
std::cout<<data<<std::endl;
lock.unlock();
queueCv.notify_one();
}
bool empty()
{
std::unique_lock<std::mutex> lock(queueMutex);
return queue1.empty() && queue2.empty();
}
struct ibv_wc *try_pop()
{
struct ibv_wc *value = NULL;
std::unique_lock<std::mutex> lock(queueMutex);
if (queue2.empty())
{
queueCv.wait(lock, [&]
{ return queue1.size() > 0; });
value = queue1.front();
queue1.pop();
}
else
{
value = queue2.front();
queue2.pop();
}
if (value->opcode != IBV_WC_RECV)
{
return value;
}
// std::cout<<"value "<<value<<std::endl;
if (runningRequests.empty())
{
runningRequests.insert(value);
return value
;
}
auto it = runningRequests.find(value);
if (it != runningRequests.end())
{
queue2.push(value);
//std::cout<<"found putting in 2"<<std::endl;
return NULL;
}
return value;
}
void removeFromSet(struct ibv_wc* data)
{
std::unique_lock<std::mutex> lock(queueMutex);
// std::cout<<"removing"<<data<<std::endl;
runningRequests.erase(data);
}
void
wait_and_pop(struct ibv_wc *&popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCv.wait(lock, [&]
{ return queue1.size() > 0; });
popped_value = queue1.front();
queue1.pop();
}
void push(struct ibv_wc *const &data);
bool empty();
struct ibv_wc *try_pop();
void removeFromSet(struct ibv_wc *data);
void wait_and_pop(struct ibv_wc *&popped_value);
};
#endif
......
......@@ -10,7 +10,7 @@
class Executor
{
int _size{0};
std::vector<TaskThread *> *_taskThreads{NULL};
std::vector<TaskThread *> _taskThreads{NULL};
ConcurrentQueue *_taskQueue{NULL};
RdmaEndpointGroup *_group;
......@@ -18,6 +18,7 @@ public:
Executor(int size, RdmaEndpointGroup *group);
void submit(struct ibv_wc *task);
void getTask();
void stop();
};
//long affinities[]
#endif
\ No newline at end of file
#ifndef __RDMAENDPOINT__
#define __RDMAENDPOINT__
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include <queue>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
......@@ -38,15 +38,17 @@ public:
char *_recvBuff{NULL};
struct ibv_mr *_sendMr{NULL};
struct ibv_mr *_recvMr{NULL};
boost::lockfree::queue<void *> *_sendBuffers{NULL};
std::queue<char*> _sendBuffers;
std::mutex _sendBuffersM;
RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize);
void createResources();
void processCmEvent(struct rdma_cm_event *event);
void clientClose();
void close();
virtual void processSendCompletion(struct ibv_wc *data) = 0;
virtual void processRecvCompletion(struct ibv_wc *data) = 0;
virtual ~RdmaEndpoint();
};
#endif
\ No newline at end of file
......@@ -18,10 +18,10 @@
class RdmaEndpointGroup
{
public:
std::vector<RdmaSalEndpoint *> *_salEps{NULL};
std::vector<RdmaReplicationEndpoint *> *_repEps{NULL};
std::unordered_map<uint32_t, RdmaReplicationEndpoint *> *_qpRepEndpointMap{NULL};
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap{NULL};
std::vector<RdmaSalEndpoint *> _salEps{NULL};
std::vector<RdmaReplicationEndpoint *> _repEps{NULL};
std::unordered_map<uint32_t, RdmaReplicationEndpoint *> _qpRepEndpointMap;
std::unordered_map<uint32_t, RdmaSalEndpoint *> _qpSalEndpointMap;
virtual void processCmEvent(struct rdma_cm_event *event) = 0;
};
......
......@@ -6,7 +6,6 @@
#include <stdint.h>
#include <errno.h>
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include <rocksdb/db.h>
#include "RdmaEndpoint.hpp"
......
......@@ -25,8 +25,8 @@
class RdmaServerEndpointGroup : public RdmaEndpointGroup
{
/*
* Variables to maintain Group state
*/
* Variables to maintain Group state
*/
static int CONN_STATE_INITIALIZED;
static int CONN_STATE_BINDED;
static int CONN_STATE_CONNECTED;
......@@ -40,8 +40,8 @@ class RdmaServerEndpointGroup : public RdmaEndpointGroup
Executor *_executor;
/*
* variables to maintain queue state
*/
* variables to maintain queue state
*/
int _sendQueueSize{0};
int _recvQueueSize{0};
int _compQueueSize{0};
......@@ -55,7 +55,8 @@ class RdmaServerEndpointGroup : public RdmaEndpointGroup
void clientClose();
public:
RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, int recvMsgSize);
RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize,
int sendMsgSize, int recvMsgSize, std::string dbpath);
// void setExecutor(Executor *executor);
void bind(const char *ip, const char *port, int backlog);
......
# How to run server
> To edit run time configuration like db path and server parameters edit prop.config file <br>
> Step 1 ensuring RDMA drivers are loaded <br>
> Step 2 Ensuring rocksdb library is loaded <br>
> Step 3 Compile : make server <br>
> Step 4 run make server <br>
# To do list
### High Priority:
- [] consensus
- [] failure detection and recovery
- [] explore multicast
- [x] Added logging api
- [ ] add multi server in client
- [ ] add yscb
- [ ] add dynamic hashing in client code
>### High Priority:
>- [ ] consensus
>- [ ] failure detection and recovery
>- [ ] explore multicast
>- [x] Added logging api
>- [x] add multi connection in client
>- [x] add yscb
>- [ ] add dynamic hashing in client code
>- [ ] add waiting for response form followers and wait for invalidation
### Low Priority
- [ ] Handle RDMA_CM_EVENT_TIMEWAIT_EXIT when closed connection
- [ ] Handle Work Request Flushed Error on qp
>### Low Priority
>- [ ] Handle RDMA_CM_EVENT_TIMEWAIT_EXIT when closed connection
>- [x] Handle Work Request Flushed Error on qp
>- [ ] Add condition variable in ConcurrentQueue to avoid busy waiting
References:
# References:
> https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/ <br>
> https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html <br>
> https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.html <br>
......@@ -21,15 +31,13 @@ References:
> https://www.codeproject.com/Tips/987850/Logging-in-Cplusplus <br>
> https://www.mygreatlearning.com/blog/readme-file/ <br>
git update-index --assume-unchanged FILE_NAME
and if you want to track the changes again use this command:
# Git making changes to file untrack/track from index
>git update-index --assume-unchanged FILE_NAME<br>
>git update-index --no-assume-unchanged FILE_NAME
git update-index --no-assume-unchanged FILE_NAME
Example to set cpu affinity:“`
```
# Example to set cpu affinity:“`
```c++
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
......@@ -39,7 +47,7 @@ Example to set cpu affinity:“`
std::cerr << "Error calling pthread_setaffinity_np: " << rc << "\n";
}
```
Logging API Example
# Logging API Example
```
LOG_ALWAYS("<=============================== START OF PROGRAM ===============================>");
// Log message using Direct Interface
......@@ -57,47 +65,21 @@ Logging API Example
pLogger = Logger::getInstance();
pLogger->error("Message Logged using C++ Interface, Log level: LOG_ERROR");
pLogger->alarm("Message Logged using C++ Interface, Log level: LOG_ALARM");
pLogger->always("Message Logged using C++ Interface, Log level: LOG_ALWAYS");
pLogger->buffer("Message Logged using C++ Interface, Log level: LOG_INFO");
pLogger->info("Message Logged using C++ Interface, Log level: LOG_BUFFER");
pLogger->trace("Message Logged using C++ Interface, Log level: LOG_TRACE");
pLogger->debug("Message Logged using C++ Interface, Log level: LOG_DEBUG");
// Log Variables
std::string name = "Pankaj Choudhary";
std::string address = "Delhi, India";
int age = 26;
std::ostringstream ss;
ss << endl;
ss << "\t" << "My Contact Details:" << endl;
ss << "\t" << "Name: " << name << endl;
ss << "\t" << "Address: " << address << endl;
ss << "\t" << "Age: " << age << endl << endl;
//pLogger->enableConsoleLogging();
pLogger->updateLogLevel(LOG_LEVEL_INFO);
// Log ostringstream ss to all the log levels
LOG_ALWAYS("Logging ostringstream using Direct Interface");
LOG_ERROR(ss);
LOG_ALARM(ss);
LOG_ALWAYS(ss);
LOG_INFO(ss);
LOG_BUFFER(ss);
LOG_TRACE(ss);
LOG_DEBUG(ss);
Logger::getInstance()->buffer("Logging ostringstream using C++ Interface");
Logger::getInstance()->error(ss);
Logger::getInstance()->alarm(ss);
Logger::getInstance()->always(ss);
Logger::getInstance()->buffer(ss);
Logger::getInstance()->info(ss);
Logger::getInstance()->trace(ss);
Logger::getInstance()->debug(ss);
LOG_ALWAYS("<=============================== END OF PROGRAM ===============================>");
LOG_ALWAYS("<========= END OF PROGRAM ==========>");
```
#include <TaskThread.hpp>
void ConcurrentQueue::push(struct ibv_wc *const &data)
{
std::unique_lock<std::mutex> lock(queueMutex);
queue1.push(data);
lock.unlock();
queueCv.notify_one();
}
bool ConcurrentQueue::empty()
{
std::unique_lock<std::mutex> lock(queueMutex);
return queue1.empty() && queue2.empty();
}
struct ibv_wc *ConcurrentQueue::try_pop()
{
struct ibv_wc *value = NULL;
std::unique_lock<std::mutex> lock(queueMutex);
if (queue2.empty())
{
queueCv.wait(lock, [&]
{ return queue1.size() > 0; });
value = queue1.front();
queue1.pop();
}
else
{
value = queue2.front();
queue2.pop();
}
if (value->opcode != IBV_WC_RECV)
{
return value;
}
// std::cout<<"value "<<value<<std::endl;
if (runningRequests.empty())
{
runningRequests.insert(value);
return value;
}
auto it = runningRequests.find(value);
if (it != runningRequests.end())
{
queue2.push(value);
// std::cout<<"found putting in 2"<<std::endl;
return NULL;
}
return value;
}
void ConcurrentQueue::removeFromSet(struct ibv_wc *data)
{
std::unique_lock<std::mutex> lock(queueMutex);
// std::cout<<"removing"<<data<<std::endl;
runningRequests.erase(data);
}
void ConcurrentQueue::wait_and_pop(struct ibv_wc *&popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCv.wait(lock, [&]
{ return queue1.size() > 0; });
popped_value = queue1.front();
queue1.pop();
}
\ No newline at end of file
......@@ -2,13 +2,13 @@
Executor::Executor(int size, RdmaEndpointGroup *group)
: _size(size), _group(group)
{
_taskQueue = new ConcurrentQueue();
_taskThreads = new std::vector<TaskThread *>();
_taskThreads->reserve(size);
// _taskQueue = new ConcurrentQueue();
// _taskThreads = new std::vector<TaskThread *>();
_taskThreads.reserve(size);
for (int i = 0; i < _size; i++)
{
TaskThread *thread = new TaskThread(i, _taskQueue, _group);
_taskThreads->push_back(thread);
_taskThreads.push_back(thread);
}
}
void Executor::submit(struct ibv_wc *task)
......@@ -17,4 +17,13 @@ void Executor::submit(struct ibv_wc *task)
}
void Executor::getTask()
{
}
void Executor::stop()
{
for(size_t i = 0;i < _taskThreads.size();i++)
{
_taskThreads[i]->stop();
delete _taskThreads[i];
}
delete _taskQueue;
}
\ No newline at end of file
......@@ -56,6 +56,7 @@ void RdmaCmProcessor::close()
_stop = true;
if (_cmEventThread != NULL)
_cmEventThread->join();
delete _cmEventThread;
rdma_destroy_event_channel(_eventChannel);
CPPLog::LOG_ALWAYS("Closed CM Processor");
}
#include "RdmaEndpoint.hpp"
#include "Logger.hpp"
int RdmaEndpoint::CONN_STATE_INITIALIZED = 1;
......@@ -5,32 +6,36 @@ int RdmaEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2;
int RdmaEndpoint::CONN_STATE_CONNECTED = 3;
int RdmaEndpoint::CONN_STATE_CLOSED = 4;
RdmaEndpoint::RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize)
int recvQueueSize, int sendMsgSize, int recvMsgSize)
: _cm_id(id), _completionQueue(completionQueue), _sendQueueSize(sendQueueSize),
_recvQueueSize(recvQueueSize), _sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize)
{
_state = CONN_STATE_INITIALIZED;
_sendBuffers = new boost::lockfree::queue<void*>(_sendMsgSize);
}
void RdmaEndpoint::createResources()
{
/* These states are used to avoid errors in lifetime of rdma connection
* more erros can be tracked in future using these lifecycle states
*/
if (_state != CONN_STATE_INITIALIZED)
{
CPPLog::LOG_ERROR("RdmaEndpoint : createResource invalid state\n");
}
//Step 1 to create endpoint
_protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL)
{
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed \n");
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed");
return;
}
//step 2 Creating Queue pair with completion queueu setted for send and recieve
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
//This is used to set endpoint address with qp
/*
* Endpoint address is setted in QP context to get endpoint at run time with qp
* without using any map to map qp_num to endpoint
*/
qp_init_attr.qp_context = (void *)this;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr.sq_sig_all = 1;
......@@ -50,59 +55,67 @@ void RdmaEndpoint::createResources()
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed\n");
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed");
}
if (_cm_id->pd == NULL)
{
CPPLog::LOG_ERROR("RdmaEndpoint : pd not set\n") ;
CPPLog::LOG_ERROR("RdmaEndpoint : pd not set");
_cm_id->pd = _protectionDomain;
}
_sendBuff = (char*)malloc(_sendMsgSize * _sendQueueSize);
/*
* Step 3 register memory for send and recv queue
*/
_sendBuff = new char[(_sendMsgSize * _sendQueueSize)];
if (_sendBuff == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : sendBuff malloc failed\n");
_recvBuff = (char*)malloc(_recvMsgSize * _recvQueueSize);
CPPLog::LOG_ERROR("RdmaEndpoint : sendBuff malloc failed");
_sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : sendMr reg failed\n");
CPPLog::LOG_ERROR("RdmaEndpoint : sendMr reg failed");
_recvBuff = new char[(_recvMsgSize * _recvQueueSize)];
if (_recvBuff == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : recvBuff malloc failed\n");
CPPLog::LOG_ERROR("RdmaEndpoint : recvBuff malloc failed");
_recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : recvMr reg failed\n");
CPPLog::LOG_ERROR("RdmaEndpoint : recvMr reg failed");
/*
* adding buffers for recving rdma data
*/
for (int i = 0; i < _recvQueueSize; i++)
{
char *const location = _recvBuff + i * _recvMsgSize;
char* location = _recvBuff + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr);
}
/*
* Adding buffers to queue for receving data
*/
for (int i = 0; i < _sendQueueSize; i++)
{
void* const location = _sendBuff + i * _sendMsgSize;
_sendBuffers->push(location);
char* location = _sendBuff + i * _sendMsgSize;
std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)location);
}
_state = CONN_STATE_RESOURCES_ALLOCATED;
}
void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event)
{
CPPLog::LOG_INFO("RdmaEndpoint : Event ");
CPPLog::LOG_INFO(rdma_event_str(event->event));
std::ostringstream ss;
ss<<"RdmaEndpoint : Event "<<rdma_event_str(event->event);
CPPLog::LOG_ALWAYS(ss);
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
CPPLog::LOG_INFO("RdmaEndpoint : Connect request\n");
CPPLog::LOG_ALWAYS("RdmaEndpoint : Connect request");
}
else if (event->event == RDMA_CM_EVENT_ESTABLISHED)
{
if (_state != CONN_STATE_RESOURCES_ALLOCATED)
{
CPPLog::LOG_ERROR( "RdmaEndpoint : EstablishedEvent invalid state \n");
CPPLog::LOG_ERROR("RdmaEndpoint : Established_Event but resource not alloted");
}
CPPLog::LOG_INFO("RdmaEndpoint : step 6 Connected\n");
_state = CONN_STATE_CONNECTED;
......@@ -110,45 +123,43 @@ void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event)
else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
CPPLog::LOG_INFO("RdmaEndpoint : step 7 disconnected\n");
clientClose();
close();
}
}
void RdmaEndpoint::clientClose()
void RdmaEndpoint::close()
{
if (_state != CONN_STATE_CONNECTED)
if (_state < CONN_STATE_RESOURCES_ALLOCATED)
{
CPPLog::LOG_ERROR("RdmaEndpoint : clientClose invalid state\n");
return;
}
CPPLog::LOG_INFO("RdmaEndpoint : closing connection qp \n");
// CPPLog::LOG_INFO((_cm_id->qp->qp_num);
CPPLog::LOG_INFO("RdmaEndpoint : closing connection\n");
int ret;
ret = rdma_disconnect(_cm_id);
if (ret)
{
CPPLog::LOG_ERROR( "RdmaEndpoint : rdma_disconnect failed\n");
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_disconnect failed\n");
}
ret = rdma_dereg_mr(_sendMr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed\n");
}
free(_sendBuff);
delete[] _sendBuff;
ret = rdma_dereg_mr(_recvMr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed\n");
}
free(_recvBuff);
delete[] _recvBuff;
rdma_destroy_qp(_cm_id);
CPPLog::LOG_INFO("des qp\n");
//rdma_destroy_id(_cm_id);
// ret = rdma_destroy_id(_cm_id);
// rdma_destroy_id(_cm_id);
// ret = rdma_destroy_id(_cm_id);
CPPLog::LOG_INFO("des mr\n");
if (ret)
......@@ -158,3 +169,6 @@ void RdmaEndpoint::clientClose()
_state = CONN_STATE_CLOSED;
CPPLog::LOG_INFO("closed\n");
}
RdmaEndpoint::~RdmaEndpoint()
{}
\ No newline at end of file
......@@ -74,6 +74,7 @@ void RdmaRepCqProcessor::processCQEvents()
//_executor->dispatchRepCqEvents(wc_array, ret);
}
delete[] wc_array;
}
void RdmaRepCqProcessor::processRepEvent(struct ibv_wc *data)
{
......@@ -84,4 +85,5 @@ void RdmaRepCqProcessor::close()
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
delete _compQueueThread;
}
\ No newline at end of file
#include "RdmaReplicationEndpoint.hpp"
RdmaReplicationEndpoint::RdmaReplicationEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *db)
: RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize)
,_db(db)
int recvQueueSize, int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize), _db(db)
{
}
void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data)
{
CPPLog::LOG_INFO("send completion\n");
_sendBuffers->push((void *)data->wr_id);
//CPPLog::LOG_INFO("send completion");
std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)data->wr_id);
}
void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data)
{
CPPLog::LOG_INFO("recv completion\n");
//CPPLog::LOG_INFO("recv completion\n");
std::cout << "Replication recieve" << (char *)(data->wr_id) << "\n";
char* request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id,data->byte_len);
char *request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id, data->byte_len);
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
delete[] request;
}
int RdmaReplicationEndpoint::sendMessage(const char *buffer, uint32_t size)
{
if (size > (uint32_t)_sendMsgSize)
return -1;
void* sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr)
void *sendBuffer = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0)
return -1;
sendBuffer = _sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
memcpy(sendBuffer, buffer, size);
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0);
}
......@@ -26,8 +26,9 @@ struct ibv_cq *RdmaSalCqProcessor::getCq()
}
void RdmaSalCqProcessor::start()
{
CPPLog::LOG_INFO("SalCqProcessr : starting process CQ events\n");
CPPLog::LOG_ALWAYS("SalCqProcessr : starting process CQ events\n");
_compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(),"SalCQProcessor");
}
void RdmaSalCqProcessor::processCQEvents()
{
......@@ -91,4 +92,5 @@ void RdmaSalCqProcessor::close()
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
delete _compQueueThread;
}
\ No newline at end of file
......@@ -9,19 +9,23 @@ RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completio
void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data)