Commit 239c1d3e authored by Paras Garg's avatar Paras Garg

Removed cout and fixed few bags

parent fce8cfe9
...@@ -14,7 +14,7 @@ CXXFLAGS += -g -O3 -Wall -std=c++17 -I header ...@@ -14,7 +14,7 @@ CXXFLAGS += -g -O3 -Wall -std=c++17 -I header
#libraries #libraries
LIBS += -libverbs LIBS += -libverbs
LIBS += -lrdmacm LIBS += -lrdmacm
LIBS += -pthread LIBS += -lpthread
LIBS += -lrocksdb LIBS += -lrocksdb
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
class RdmaEndpointGroup class RdmaEndpointGroup
{ {
public: public:
std::vector<RdmaSalEndpoint *> _salEps{NULL}; std::vector<RdmaSalEndpoint *> _salEps;
std::vector<RdmaReplicationEndpoint *> _repEps{NULL}; std::vector<RdmaReplicationEndpoint *> _repEps;
std::unordered_map<uint32_t, RdmaReplicationEndpoint *> _qpRepEndpointMap; std::unordered_map<uint32_t, RdmaReplicationEndpoint *> _qpRepEndpointMap;
std::unordered_map<uint32_t, RdmaSalEndpoint *> _qpSalEndpointMap; std::unordered_map<uint32_t, RdmaSalEndpoint *> _qpSalEndpointMap;
......
...@@ -23,20 +23,22 @@ ...@@ -23,20 +23,22 @@
>- [ ] Add condition variable in ConcurrentQueue to avoid busy waiting >- [ ] Add condition variable in ConcurrentQueue to avoid busy waiting
# References: # References:
> https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/ <br> > [Get CQ Event Rdma Mojo](https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/) <br>
> https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html <br> > [Poll CQ](https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html) <br>
> https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.html <br> > [Get CQ Event](https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.html) <br>
> https://docs.microsoft.com/en-us/cpp/cpp/delegating-constructors?view=msvc-170 <br> > [Delegating Constructor](https://docs.microsoft.com/en-us/cpp/cpp/delegating-constructors?view=msvc-170) <br>
> https://www.toptal.com/c-plus-plus/c-plus-plus-understanding-compilation <br> > [Understanding C++ Compilation](https://www.toptal.com/c-plus-plus/c-plus-plus-understanding-compilation) <br>
> https://www.codeproject.com/Tips/987850/Logging-in-Cplusplus <br> > [C++ Logging](https://www.codeproject.com/Tips/987850/Logging-in-Cplusplus) <br>
> https://www.mygreatlearning.com/blog/readme-file/ <br> > [Creating Readme.md](https://www.mygreatlearning.com/blog/readme-file/) <br>
# Git making changes to file untrack/track from index # Git making changes to file untrack/track from index
>To get list of unchanged files
>git ls-files -v|grep '^h'
>git update-index --assume-unchanged FILE_NAME<br> >git update-index --assume-unchanged FILE_NAME<br>
>git update-index --no-assume-unchanged FILE_NAME >git update-index --no-assume-unchanged readme.md
# Example to set cpu affinity:“` # Example to set cpu affinity:
```c++ ```c++
cpu_set_t cpuset; cpu_set_t cpuset;
CPU_ZERO(&cpuset); CPU_ZERO(&cpuset);
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
void ConcurrentQueue::push(struct ibv_wc *const &data) void ConcurrentQueue::push(struct ibv_wc *const &data)
{ {
std::unique_lock<std::mutex> lock(queueMutex); std::unique_lock<std::mutex> lock(queueMutex);
std::cout<<"putting data\n";
queue1.push(data); queue1.push(data);
lock.unlock(); lock.unlock();
queueCv.notify_one(); queueCv.notify_one();
...@@ -29,6 +30,7 @@ struct ibv_wc *ConcurrentQueue::try_pop() ...@@ -29,6 +30,7 @@ struct ibv_wc *ConcurrentQueue::try_pop()
value = queue2.front(); value = queue2.front();
queue2.pop(); queue2.pop();
} }
//We only want to handle
if (value->opcode != IBV_WC_RECV) if (value->opcode != IBV_WC_RECV)
{ {
return value; return value;
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
Executor::Executor(int size, RdmaEndpointGroup *group) Executor::Executor(int size, RdmaEndpointGroup *group)
: _size(size), _group(group) : _size(size), _group(group)
{ {
// _taskQueue = new ConcurrentQueue(); _taskQueue = new ConcurrentQueue();
// _taskThreads = new std::vector<TaskThread *>(); // _taskThreads = new std::vector<TaskThread *>();
_taskThreads.reserve(size); _taskThreads.reserve(size);
for (int i = 0; i < _size; i++) for (int i = 0; i < _size; i++)
......
...@@ -4,11 +4,11 @@ ...@@ -4,11 +4,11 @@
RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group) RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group)
: _endpointGroup(group) : _endpointGroup(group)
{ {
CPPLog::LOG_INFO("CMProcessor : Step 1 creating event channel\n"); CPPLog::LOG_INFO("CMProcessor : Step 1 creating event channel");
_eventChannel = rdma_create_event_channel(); _eventChannel = rdma_create_event_channel();
if (_eventChannel == NULL) if (_eventChannel == NULL)
{ {
CPPLog::LOG_ERROR( "CMProcesor : error creating event channel\n"); CPPLog::LOG_ERROR( "CMProcesor : error creating event channel");
} }
} }
...@@ -17,7 +17,7 @@ struct rdma_cm_id *RdmaCmProcessor::createId() ...@@ -17,7 +17,7 @@ struct rdma_cm_id *RdmaCmProcessor::createId()
struct rdma_cm_id *id = NULL; struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP); int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
if (ret == -1) if (ret == -1)
CPPLog::LOG_ERROR("CMProcesor : rdma_create_id failed\n"); CPPLog::LOG_ERROR("CMProcesor : rdma_create_id failed");
return id; return id;
} }
...@@ -25,27 +25,27 @@ void RdmaCmProcessor::processCmEvent() ...@@ -25,27 +25,27 @@ void RdmaCmProcessor::processCmEvent()
{ {
int ret; int ret;
struct rdma_cm_event *event; struct rdma_cm_event *event;
CPPLog::LOG_INFO("CMProcessor : starting cm processing thread\n"); CPPLog::LOG_INFO("CMProcessor : starting cm processing thread");
while (!_stop) while (!_stop)
{ {
ret = rdma_get_cm_event(_eventChannel, &event); ret = rdma_get_cm_event(_eventChannel, &event);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("CMProcesor : rdma_get_cm_event failed\n"); CPPLog::LOG_ERROR("CMProcesor : rdma_get_cm_event failed");
continue; continue;
} }
_endpointGroup->processCmEvent(event); _endpointGroup->processCmEvent(event);
ret = rdma_ack_cm_event(event); ret = rdma_ack_cm_event(event);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("CMProcesor : rdma_ack_cm_event failed\n"); CPPLog::LOG_ERROR("CMProcesor : rdma_ack_cm_event failed");
} }
} }
} }
void RdmaCmProcessor::start(bool newThread) void RdmaCmProcessor::start(bool newThread)
{ {
if (newThread) if (newThread == true)
_cmEventThread = new std::thread(&RdmaCmProcessor::processCmEvent, this); _cmEventThread = new std::thread(&RdmaCmProcessor::processCmEvent, this);
else else
processCmEvent(); processCmEvent();
...@@ -53,10 +53,11 @@ void RdmaCmProcessor::start(bool newThread) ...@@ -53,10 +53,11 @@ void RdmaCmProcessor::start(bool newThread)
void RdmaCmProcessor::close() void RdmaCmProcessor::close()
{ {
CPPLog::LOG_ALWAYS("Closing CM Processor");
_stop = true; _stop = true;
if (_cmEventThread != NULL) if (_cmEventThread != NULL)
_cmEventThread->join(); _cmEventThread->join();
delete _cmEventThread; delete _cmEventThread;
rdma_destroy_event_channel(_eventChannel); rdma_destroy_event_channel(_eventChannel);
CPPLog::LOG_ALWAYS("Closed CM Processor");
} }
...@@ -20,7 +20,7 @@ void RdmaEndpoint::createResources() ...@@ -20,7 +20,7 @@ void RdmaEndpoint::createResources()
*/ */
if (_state != CONN_STATE_INITIALIZED) if (_state != CONN_STATE_INITIALIZED)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : createResource invalid state\n"); CPPLog::LOG_ERROR("RdmaEndpoint : createResource invalid state");
} }
//Step 1 to create endpoint //Step 1 to create endpoint
_protectionDomain = ibv_alloc_pd(_cm_id->verbs); _protectionDomain = ibv_alloc_pd(_cm_id->verbs);
...@@ -117,12 +117,12 @@ void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event) ...@@ -117,12 +117,12 @@ void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : Established_Event but resource not alloted"); CPPLog::LOG_ERROR("RdmaEndpoint : Established_Event but resource not alloted");
} }
CPPLog::LOG_INFO("RdmaEndpoint : step 6 Connected\n"); CPPLog::LOG_INFO("RdmaEndpoint : step 6 Connected");
_state = CONN_STATE_CONNECTED; _state = CONN_STATE_CONNECTED;
} }
else if (event->event == RDMA_CM_EVENT_DISCONNECTED) else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{ {
CPPLog::LOG_INFO("RdmaEndpoint : step 7 disconnected\n"); CPPLog::LOG_INFO("RdmaEndpoint : step 7 disconnected");
close(); close();
} }
} }
...@@ -133,41 +133,41 @@ void RdmaEndpoint::close() ...@@ -133,41 +133,41 @@ void RdmaEndpoint::close()
{ {
return; return;
} }
CPPLog::LOG_INFO("RdmaEndpoint : closing connection\n"); CPPLog::LOG_INFO("RdmaEndpoint : closing connection");
int ret; int ret;
ret = rdma_disconnect(_cm_id); ret = rdma_disconnect(_cm_id);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_disconnect failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : rdma_disconnect failed");
} }
ret = rdma_dereg_mr(_sendMr); ret = rdma_dereg_mr(_sendMr);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed");
} }
delete[] _sendBuff; delete[] _sendBuff;
ret = rdma_dereg_mr(_recvMr); ret = rdma_dereg_mr(_recvMr);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed");
} }
delete[] _recvBuff; delete[] _recvBuff;
rdma_destroy_qp(_cm_id); rdma_destroy_qp(_cm_id);
CPPLog::LOG_INFO("des qp\n"); CPPLog::LOG_INFO("des qp");
// rdma_destroy_id(_cm_id); // rdma_destroy_id(_cm_id);
// ret = rdma_destroy_id(_cm_id); // ret = rdma_destroy_id(_cm_id);
CPPLog::LOG_INFO("des mr\n"); CPPLog::LOG_INFO("des mr");
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed");
} }
_state = CONN_STATE_CLOSED; _state = CONN_STATE_CLOSED;
CPPLog::LOG_INFO("closed\n"); CPPLog::LOG_INFO("closed");
} }
RdmaEndpoint::~RdmaEndpoint() RdmaEndpoint::~RdmaEndpoint()
......
...@@ -6,19 +6,19 @@ RdmaRepCqProcessor::RdmaRepCqProcessor(Executor *ex, ibv_context *verbs, int com ...@@ -6,19 +6,19 @@ RdmaRepCqProcessor::RdmaRepCqProcessor(Executor *ex, ibv_context *verbs, int com
_compChannel = ibv_create_comp_channel(verbs); _compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL) if (_compChannel == NULL)
{ {
std::cout << "CqProcessr : ibv_create_comp_channel failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_create_comp_channel failed");
return; return;
} }
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0); _completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL) if (_completionQueue == NULL)
{ {
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl; CPPLog::LOG_ERROR("CqProcessr : ibv_create_cq failed");
return; return;
} }
int ret = ibv_req_notify_cq(_completionQueue, 0); int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
std::cout << "CqProcessr : ibv_req_notify_cq failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_req_notify_cq failed");
} }
} }
struct ibv_cq *RdmaRepCqProcessor::getCq() struct ibv_cq *RdmaRepCqProcessor::getCq()
...@@ -27,7 +27,7 @@ struct ibv_cq *RdmaRepCqProcessor::getCq() ...@@ -27,7 +27,7 @@ struct ibv_cq *RdmaRepCqProcessor::getCq()
} }
void RdmaRepCqProcessor::start() void RdmaRepCqProcessor::start()
{ {
std::cout << "CqProcessr : starting process CQ events" << std::endl; CPPLog::LOG_ALWAYS( "CqProcessr : starting process CQ events");
_compQueueThread = new std::thread(&RdmaRepCqProcessor::processCQEvents, this); _compQueueThread = new std::thread(&RdmaRepCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(), "RepCQ"); pthread_setname_np(_compQueueThread->native_handle(), "RepCQ");
} }
...@@ -43,20 +43,20 @@ void RdmaRepCqProcessor::processCQEvents() ...@@ -43,20 +43,20 @@ void RdmaRepCqProcessor::processCQEvents()
ret = ibv_get_cq_event(_compChannel, &cq, &context); ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1) if (ret == -1)
{ {
std::cout << "CqProcessr : ibv_get_cq_event failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_get_cq_event failed");
close(); close();
} }
ibv_ack_cq_events(cq, 1); ibv_ack_cq_events(cq, 1);
ret = ibv_req_notify_cq(_completionQueue, 0); ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
std::cout << "CqProcessr : ibv_req_notify_cq failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_req_notify_cq failed");
close(); close();
} }
ret = ibv_poll_cq(cq, nevent, wc_array); ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0) if (ret < 0)
{ {
std::cout << "CqProcessr : ibv_poll_cq failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_poll_cq failed");
close(); close();
} }
if (ret == 0) if (ret == 0)
......
...@@ -14,7 +14,7 @@ void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data) ...@@ -14,7 +14,7 @@ void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data)
} }
void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data) void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data)
{ {
//CPPLog::LOG_INFO("recv completion\n"); //CPPLog::LOG_INFO("recv completion");
std::cout << "Replication recieve" << (char *)(data->wr_id) << "\n"; std::cout << "Replication recieve" << (char *)(data->wr_id) << "\n";
char *request = new char[data->byte_len]; char *request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id, data->byte_len); memcpy(request, (void *)data->wr_id, data->byte_len);
......
...@@ -5,7 +5,7 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com ...@@ -5,7 +5,7 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com
_compChannel = ibv_create_comp_channel(verbs); _compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL) if (_compChannel == NULL)
{ {
CPPLog::LOG_ERROR("SalCqProcessr : ibv_create_comp_channel failed\n"); CPPLog::LOG_ERROR("SalCqProcessr : ibv_create_comp_channel failed");
return; return;
} }
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0); _completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
...@@ -17,7 +17,7 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com ...@@ -17,7 +17,7 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com
int ret = ibv_req_notify_cq(_completionQueue, 0); int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
CPPLog::LOG_INFO("SalCqProcessr : ibv_req_notify_cq failed\n"); CPPLog::LOG_INFO("SalCqProcessr : ibv_req_notify_cq failed");
} }
} }
struct ibv_cq *RdmaSalCqProcessor::getCq() struct ibv_cq *RdmaSalCqProcessor::getCq()
...@@ -26,7 +26,7 @@ struct ibv_cq *RdmaSalCqProcessor::getCq() ...@@ -26,7 +26,7 @@ struct ibv_cq *RdmaSalCqProcessor::getCq()
} }
void RdmaSalCqProcessor::start() void RdmaSalCqProcessor::start()
{ {
CPPLog::LOG_ALWAYS("SalCqProcessr : starting process CQ events\n"); CPPLog::LOG_ALWAYS("SalCqProcessr : starting process CQ events");
_compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this); _compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(),"SalCQProcessor"); pthread_setname_np(_compQueueThread->native_handle(),"SalCQProcessor");
} }
...@@ -46,7 +46,7 @@ void RdmaSalCqProcessor::processCQEvents() ...@@ -46,7 +46,7 @@ void RdmaSalCqProcessor::processCQEvents()
ret = ibv_get_cq_event(_compChannel, &cq, &context); ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1) if (ret == -1)
{ {
CPPLog::LOG_ERROR("SalCqProcessr : ibv_get_cq_event failed\n"); CPPLog::LOG_ERROR("SalCqProcessr : ibv_get_cq_event failed");
close(); close();
} }
ibv_ack_cq_events(cq, 1); ibv_ack_cq_events(cq, 1);
...@@ -56,13 +56,13 @@ void RdmaSalCqProcessor::processCQEvents() ...@@ -56,13 +56,13 @@ void RdmaSalCqProcessor::processCQEvents()
ret = ibv_req_notify_cq(_completionQueue, 0); ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("SalCqProcessr : ibv_req_notify_cq failed\n"); CPPLog::LOG_ERROR("SalCqProcessr : ibv_req_notify_cq failed");
close(); close();
} }
ret = ibv_poll_cq(cq, nevent, wc_array); ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0) if (ret < 0)
{ {
CPPLog::LOG_ERROR("SalCqProcessr : ibv_poll_cq failed\n"); CPPLog::LOG_ERROR("SalCqProcessr : ibv_poll_cq failed");
close(); close();
} }
if (ret == 0) if (ret == 0)
...@@ -73,7 +73,7 @@ void RdmaSalCqProcessor::processCQEvents() ...@@ -73,7 +73,7 @@ void RdmaSalCqProcessor::processCQEvents()
{ {
std::ostringstream ss; std::ostringstream ss;
ss<< "RdmaSalCqProcessor : failed work completion : "; ss<< "RdmaSalCqProcessor : failed work completion : ";
ss<<ibv_wc_status_str(wc_array[i].status)<<"on qp"<<wc_array[i].qp_num<<"\n"; ss<<ibv_wc_status_str(wc_array[i].status)<<"on qp"<<wc_array[i].qp_num;
CPPLog::LOG_ERROR(ss); CPPLog::LOG_ERROR(ss);
continue; continue;
} }
......
...@@ -98,7 +98,7 @@ void RdmaSalEndpoint::processPut(struct MessageHeader *req) ...@@ -98,7 +98,7 @@ void RdmaSalEndpoint::processPut(struct MessageHeader *req)
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
{ {
CPPLog::LOG_ERROR("No send Buffer\n"); CPPLog::LOG_ERROR("No send Buffer");
return; return;
} }
sendBuf = _sendBuffers.front(); sendBuf = _sendBuffers.front();
......
...@@ -67,7 +67,7 @@ struct ibv_cq *RdmaServerEndpointGroup::createSalCq(struct rdma_cm_id *id) ...@@ -67,7 +67,7 @@ struct ibv_cq *RdmaServerEndpointGroup::createSalCq(struct rdma_cm_id *id)
{ {
if (_salCqProcessor == NULL) if (_salCqProcessor == NULL)
{ {
CPPLog::LOG_ERROR("RdmaServerEndpointGroup : step 5 create salcq processor"); CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : step 5 create salcq processor");
_salCqProcessor = new RdmaSalCqProcessor(_executor, _cm_id->verbs, _compQueueSize); _salCqProcessor = new RdmaSalCqProcessor(_executor, _cm_id->verbs, _compQueueSize);
_salCqProcessor->start(); _salCqProcessor->start();
} }
...@@ -123,7 +123,7 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -123,7 +123,7 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{ {
std::ostringstream ss; std::ostringstream ss;
ss << "RdmaServerEndpointGroup : event " << rdma_event_str(event->event); ss << "RdmaServerEndpointGroup : event " << rdma_event_str(event->event);
ss << " id " << event->id << " " << std::endl; ss << " id " << event->id;
CPPLog::LOG_ALWAYS(ss); CPPLog::LOG_ALWAYS(ss);
/* /*
* Connect request came on listener ie endpointgroup * Connect request came on listener ie endpointgroup
...@@ -162,6 +162,7 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -162,6 +162,7 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
return; return;
} }
RdmaEndpoint *ep = ((RdmaEndpoint *)event->id->context); RdmaEndpoint *ep = ((RdmaEndpoint *)event->id->context);
uint32_t qp = event->id->qp->qp_num;
ep->processCmEvent(event); ep->processCmEvent(event);
if (event->event == RDMA_CM_EVENT_DISCONNECTED) if (event->event == RDMA_CM_EVENT_DISCONNECTED)
...@@ -172,9 +173,9 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -172,9 +173,9 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
if (it != _repEps.end()) if (it != _repEps.end())
{ {
_repEps.erase(it); _repEps.erase(it);
_qpRepEndpointMap.erase(qp);
delete ((RdmaReplicationEndpoint *)event->id->context); delete ((RdmaReplicationEndpoint *)event->id->context);
} }
_qpRepEndpointMap.erase(event->id->qp->qp_num);
} }
{ {
...@@ -183,9 +184,9 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -183,9 +184,9 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
if (it != _salEps.end()) if (it != _salEps.end())
{ {
_salEps.erase(it); _salEps.erase(it);
_qpSalEndpointMap.erase(qp);
delete ((RdmaSalEndpoint *)event->id->context); delete ((RdmaSalEndpoint *)event->id->context);
} }
_qpSalEndpointMap.erase(event->id->qp->qp_num);
} }
} }
} }
...@@ -210,12 +211,12 @@ void RdmaServerEndpointGroup::close() ...@@ -210,12 +211,12 @@ void RdmaServerEndpointGroup::close()
delete _cmProcessor; delete _cmProcessor;
for (size_t i = 0; i < _salEps.size(); i++) for (size_t i = 0; i < _salEps.size(); i++)
{ {
((RdmaEndpoint*)_salEps[i])->close(); ((RdmaEndpoint *)_salEps[i])->close();
delete _salEps[i]; delete _salEps[i];
} }
for (size_t i = 0; i < _repEps.size(); i++) for (size_t i = 0; i < _repEps.size(); i++)
{ {
((RdmaEndpoint*)_repEps[i])->close(); ((RdmaEndpoint *)_repEps[i])->close();
delete _repEps[i]; delete _repEps[i];
} }
......
...@@ -30,6 +30,7 @@ int main() ...@@ -30,6 +30,7 @@ int main()
Executor *ex = new Executor(executorPoolSize, group); Executor *ex = new Executor(executorPoolSize, group);
group->setExecutor(ex); group->setExecutor(ex);
group->bind(serverIP.c_str(), serverPort.c_str(), 2); group->bind(serverIP.c_str(), serverPort.c_str(), 2);
//std::cout<<"calling start cm\n";
group->startCmProcessor(false); group->startCmProcessor(false);
std::cout<<"Started Server\n"; std::cout<<"Started Server\n";
// Just to make main thread wait else program will exit // Just to make main thread wait else program will exit
......
...@@ -17,7 +17,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpoint ...@@ -17,7 +17,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpoint
CPPLog::LOG_ALWAYS(ss); CPPLog::LOG_ALWAYS(ss);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0) if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0)
{ {
CPPLog::LOG_ERROR("Error calling pthread_setaffinity_np\n "); CPPLog::LOG_ERROR("Error calling pthread_setaffinity_np ");
} }
} }
...@@ -27,7 +27,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr ...@@ -27,7 +27,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr
_taskQueue = taskqueue; _taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this)) if (pthread_create(&thread, NULL, &TaskThread::run, this))
{ {
CPPLog::LOG_ERROR( "pthread create has been failed while creating taskthread\n"); CPPLog::LOG_ERROR( "pthread create has been failed while creating taskthread");
exit(0); exit(0);
} }
pthread_setname_np(thread,"TaskThread"); pthread_setname_np(thread,"TaskThread");
...@@ -35,7 +35,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr ...@@ -35,7 +35,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr
TaskThread::~TaskThread() TaskThread::~TaskThread()
{ {
CPPLog::LOG_INFO( "TaskThread Destructed\n"); CPPLog::LOG_INFO( "TaskThread Destructed");
stop(); stop();
} }
...@@ -44,7 +44,7 @@ void TaskThread::stop() ...@@ -44,7 +44,7 @@ void TaskThread::stop()
_stop = true; _stop = true;
if (pthread_join(thread, NULL) == 0) if (pthread_join(thread, NULL) == 0)
{ {
CPPLog::LOG_ERROR("pthread join failed\n"); CPPLog::LOG_ERROR("pthread join failed");
} }
} }
...@@ -59,10 +59,11 @@ inline void *TaskThread::run(void *object) ...@@ -59,10 +59,11 @@ inline void *TaskThread::run(void *object)
while (!thread->_stop) while (!thread->_stop)
{ {
struct ibv_wc *data = NULL; struct ibv_wc *data = NULL;
std::cout<<"Get start\n";
data = thread->_taskQueue->try_pop(); data = thread->_taskQueue->try_pop();
if (data != NULL) if (data != NULL)
{ {
std::cout<<"TaskThread:: got data\n"; std::cout<<"TaskThread:: got data";
thread->processEvent(data); thread->processEvent(data);
thread->_taskQueue->removeFromSet(data); thread->_taskQueue->removeFromSet(data);
delete data; delete data;
...@@ -84,7 +85,7 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -84,7 +85,7 @@ void TaskThread::processEvent(struct ibv_wc *data)
auto it = _group->_qpSalEndpointMap.find(data->qp_num); auto it = _group->_qpSalEndpointMap.find(data->qp_num);
if (it == _group->_qpSalEndpointMap.end()) if (it == _group->_qpSalEndpointMap.end())
{ {
CPPLog::LOG_INFO("RdmaSal : endpoint not registered for qp\n"); CPPLog::LOG_INFO("RdmaSal : endpoint not registered for qp");