Commit 5cf929e3 authored by Paras Garg's avatar Paras Garg

changed cout to log

parent fc9ee38b
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <condition_variable> #include <condition_variable>
#include <set> #include <set>
#include <string> #include <string>
#include "Logger.hpp"
class Comparator class Comparator
{ {
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "RdmaEndpointGroup.hpp" #include "RdmaEndpointGroup.hpp"
#include "ConcurrentQueue.hpp" #include "ConcurrentQueue.hpp"
#include "TaskThread.hpp" #include "TaskThread.hpp"
#include "Logger.hpp"
class Executor class Executor
{ {
int _size{0}; int _size{0};
......
#ifndef __RDMACMPROCESSOR__
#define __RDMACMPROCESSOR__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#ifndef __RDMACMPROCESSOR__ #include "Logger.hpp"
#define __RDMACMPROCESSOR__
#include "RdmaEndpointGroup.hpp" #include "RdmaEndpointGroup.hpp"
class RdmaCmProcessor class RdmaCmProcessor
......
...@@ -12,7 +12,9 @@ ...@@ -12,7 +12,9 @@
#include <vector> #include <vector>
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include "CqEventData.hpp" #include "CqEventData.hpp"
#include "Logger.hpp"
class RdmaEndpoint class RdmaEndpoint
{ {
......
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
#include <shared_mutex> #include <shared_mutex>
#include "RdmaSalEndpoint.hpp" #include "RdmaSalEndpoint.hpp"
#include "RdmaReplicationEndpoint.hpp" #include "RdmaReplicationEndpoint.hpp"
#include "Logger.hpp"
class RdmaEndpointGroup class RdmaEndpointGroup
{ {
......
#ifndef __RDMAREPCQPROCESSOR__
#define __RDMAREPCQPROCESSOR__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
...@@ -5,10 +8,9 @@ ...@@ -5,10 +8,9 @@
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#ifndef __RDMAREPCQPROCESSOR__
#define __RDMAREPCQPROCESSOR__
#include "Executor.hpp" #include "Executor.hpp"
#include "Logger.hpp"
class RdmaRepCqProcessor class RdmaRepCqProcessor
{ {
......
...@@ -7,9 +7,12 @@ ...@@ -7,9 +7,12 @@
#include <errno.h> #include <errno.h>
#include <iostream> #include <iostream>
#include <boost/lockfree/queue.hpp> #include <boost/lockfree/queue.hpp>
#include <rocksdb/db.h>
#include "RdmaEndpoint.hpp" #include "RdmaEndpoint.hpp"
#include "CqEventData.hpp" #include "CqEventData.hpp"
#include <rocksdb/db.h> #include "Logger.hpp"
class RdmaReplicationEndpoint : public RdmaEndpoint class RdmaReplicationEndpoint : public RdmaEndpoint
{ {
......
#ifndef __RDMASALCQPROCESSOR__
#define __RDMASALCQPROCESSOR__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
...@@ -5,10 +8,8 @@ ...@@ -5,10 +8,8 @@
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#ifndef __RDMASALCQPROCESSOR__
#define __RDMASALCQPROCESSOR__
#include "Executor.hpp" #include "Executor.hpp"
#include "Logger.hpp"
class RdmaSalCqProcessor class RdmaSalCqProcessor
{ {
......
...@@ -7,9 +7,10 @@ ...@@ -7,9 +7,10 @@
#include <errno.h> #include <errno.h>
#include <iostream> #include <iostream>
#include <boost/lockfree/queue.hpp> #include <boost/lockfree/queue.hpp>
#include <rocksdb/db.h>
#include "RdmaEndpoint.hpp" #include "RdmaEndpoint.hpp"
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
#include <rocksdb/db.h> #include "Logger.hpp"
class RdmaSalEndpoint : public RdmaEndpoint class RdmaSalEndpoint : public RdmaEndpoint
{ {
......
#ifndef __RDMASERVERENDPOINTGROUP__
#define __RDMASERVERENDPOINTGROUP__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
...@@ -9,9 +12,6 @@ ...@@ -9,9 +12,6 @@
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#ifndef __RDMASERVERENDPOINTGROUP__
#define __RDMASERVERENDPOINTGROUP__
#include "RdmaReplicationEndpoint.hpp" #include "RdmaReplicationEndpoint.hpp"
#include "RdmaSalEndpoint.hpp" #include "RdmaSalEndpoint.hpp"
#include "RdmaEndpointGroup.hpp" #include "RdmaEndpointGroup.hpp"
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "RdmaCmProcessor.hpp" #include "RdmaCmProcessor.hpp"
#include "RdmaSalCqProcessor.hpp" #include "RdmaSalCqProcessor.hpp"
#include "RdmaRepCqProcessor.hpp" #include "RdmaRepCqProcessor.hpp"
#include "Logger.hpp"
class RdmaServerEndpointGroup : public RdmaEndpointGroup class RdmaServerEndpointGroup : public RdmaEndpointGroup
{ {
......
#ifndef __TaskThread__ #ifndef __TaskThread__
#define __TaskThread__ #define __TaskThread__
#include <iostream>
#include <queue>
#include "Runnable.hpp" #include "Runnable.hpp"
#include "CqEventData.hpp" #include "CqEventData.hpp"
#include "MessageFormats.hpp"
#include "Logger.hpp"
#include "RdmaEndpointGroup.hpp" #include "RdmaEndpointGroup.hpp"
#include "ConcurrentQueue.hpp" #include "ConcurrentQueue.hpp"
#include <pthread.h> #include <pthread.h>
#include <iostream> #include "Logger.hpp"
#include <queue>
class TaskThread class TaskThread
{ {
......
...@@ -4,11 +4,11 @@ ...@@ -4,11 +4,11 @@
RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group) RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group)
: _endpointGroup(group) : _endpointGroup(group)
{ {
std::cout << "CMProcessor : Step 1 creating event channel" << std::endl; CPPLOG::LOG_INFO("CMProcessor : Step 1 creating event channel\n");
_eventChannel = rdma_create_event_channel(); _eventChannel = rdma_create_event_channel();
if (_eventChannel == NULL) if (_eventChannel == NULL)
{ {
std::cout << "CMProcesor : error creating event channel"; CPPLOG::LOG_ERROR( "CMProcesor : error creating event channel\n");
} }
} }
...@@ -17,7 +17,7 @@ struct rdma_cm_id *RdmaCmProcessor::createId() ...@@ -17,7 +17,7 @@ struct rdma_cm_id *RdmaCmProcessor::createId()
struct rdma_cm_id *id = NULL; struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP); int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
if (ret == -1) if (ret == -1)
std::cout << "CMProcesor : rdma_create_id failed" << std::endl; CPPLOG::LOG_ERROR("CMProcesor : rdma_create_id failed\n");
return id; return id;
} }
...@@ -25,20 +25,20 @@ void RdmaCmProcessor::processCmEvent() ...@@ -25,20 +25,20 @@ void RdmaCmProcessor::processCmEvent()
{ {
int ret; int ret;
struct rdma_cm_event *event; struct rdma_cm_event *event;
std::cout << "CMProcessor : starting cm processing thread" << std::endl; CPPLOG::LOG_INFO("CMProcessor : starting cm processing thread\n");
while (!_stop) while (!_stop)
{ {
ret = rdma_get_cm_event(_eventChannel, &event); ret = rdma_get_cm_event(_eventChannel, &event);
if (ret) if (ret)
{ {
std::cout << "CMProcesor : rdma_get_cm_event failed" << std::endl; CPPLOG::LOG_ERROR("CMProcesor : rdma_get_cm_event failed\n");
continue; continue;
} }
_endpointGroup->processCmEvent(event); _endpointGroup->processCmEvent(event);
ret = rdma_ack_cm_event(event); ret = rdma_ack_cm_event(event);
if (ret) if (ret)
{ {
std::cout << "CMProcesor : rdma_ack_cm_event failed"; CPPLOG::LOG_ERROR("CMProcesor : rdma_ack_cm_event failed\n");
} }
} }
} }
......
...@@ -17,13 +17,13 @@ void RdmaEndpoint::createResources() ...@@ -17,13 +17,13 @@ void RdmaEndpoint::createResources()
{ {
if (_state != CONN_STATE_INITIALIZED) if (_state != CONN_STATE_INITIALIZED)
{ {
std::cout << "RdmaEndpoint : createResource invalid satte" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : createResource invalid state\n");
} }
_protectionDomain = ibv_alloc_pd(_cm_id->verbs); _protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL) if (_protectionDomain == NULL)
{ {
std::cout << "RdmaEndpoint : ibv_alloc_pd failed " << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed \n");
return; return;
} }
...@@ -49,27 +49,28 @@ void RdmaEndpoint::createResources() ...@@ -49,27 +49,28 @@ void RdmaEndpoint::createResources()
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr); int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : ibv_create_cq failed\n"; CPPLOG::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed\n");
} }
if (_cm_id->pd == NULL) if (_cm_id->pd == NULL)
{ {
std::cout << "RdmaEndpoint : pd not set" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : pd not set\n") ;
_cm_id->pd = _protectionDomain; _cm_id->pd = _protectionDomain;
} }
_sendBuff = (char*)malloc(_sendMsgSize * _sendQueueSize); _sendBuff = (char*)malloc(_sendMsgSize * _sendQueueSize);
if (_sendBuff == NULL) if (_sendBuff == NULL)
std::cout << "RdmaEndpoint : sendBuff malloc failed" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : sendBuff malloc failed\n");
_recvBuff = (char*)malloc(_recvMsgSize * _recvQueueSize); _recvBuff = (char*)malloc(_recvMsgSize * _recvQueueSize);
_sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize); _sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL) if (_sendMr == NULL)
std::cout << "RdmaEndpoint : sendMr reg failed" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : sendMr reg failed" << std::endl;
if (_recvBuff == NULL) if (_recvBuff == NULL)
std::cout << "RdmaEndpoint : recvBuff malloc failed" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : recvBuff malloc failed\n");
_recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize); _recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL) if (_recvMr == NULL)
std::cout << "RdmaEndpoint : recvMr reg failed" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : recvMr reg failed\n");
for (int i = 0; i < _recvQueueSize; i++) for (int i = 0; i < _recvQueueSize; i++)
{ {
...@@ -90,23 +91,24 @@ void RdmaEndpoint::createResources() ...@@ -90,23 +91,24 @@ void RdmaEndpoint::createResources()
void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event) void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event)
{ {
std::cout << "RdmaEndpoint : Event " << rdma_event_str(event->event) << std::endl; CPPLOG::LOG_INFO("RdmaEndpoint : Event ");
CPPLOG::LOG_INFO(rdma_event_str(event->event));
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{ {
std::cout << "RdmaEndpoint : Connect request"; CPPLOG::LOG_INFO("RdmaEndpoint : Connect request\n");
} }
else if (event->event == RDMA_CM_EVENT_ESTABLISHED) else if (event->event == RDMA_CM_EVENT_ESTABLISHED)
{ {
if (_state != CONN_STATE_RESOURCES_ALLOCATED) if (_state != CONN_STATE_RESOURCES_ALLOCATED)
{ {
std::cout << "RdmaEndpoint : EstablishedEvent invalid state " << std::endl; CPPLOG::LOG_ERROR( "RdmaEndpoint : EstablishedEvent invalid state \n");
} }
std::cout << "RdmaEndpoint : step 6 Connected" << std::endl; CPPLOG::LOG_INFO("RdmaEndpoint : step 6 Connected\n");
_state = CONN_STATE_CONNECTED; _state = CONN_STATE_CONNECTED;
} }
else if (event->event == RDMA_CM_EVENT_DISCONNECTED) else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{ {
std::cout << "RdmaEndpoint : step 7 disconnected" << std::endl; CPPLOG::LOG_INFO("RdmaEndpoint : step 7 disconnected\n");
clientClose(); clientClose();
} }
} }
...@@ -115,41 +117,43 @@ void RdmaEndpoint::clientClose() ...@@ -115,41 +117,43 @@ void RdmaEndpoint::clientClose()
{ {
if (_state != CONN_STATE_CONNECTED) if (_state != CONN_STATE_CONNECTED)
{ {
std::cout << "RdmaEndpoint : clientClose invalid state" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : clientClose invalid state\n");
return; return;
} }
std::cout<<"RdmaEndpoint : closing connection qp "<<_cm_id->qp->qp_num<< std::endl; CPPLOG::LOG_INFO("RdmaEndpoint : closing connection qp ");
CPPLOG::LOG_INFO(_cm_id->qp->qp_num);
int ret; int ret;
ret = rdma_disconnect(_cm_id); ret = rdma_disconnect(_cm_id);
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : rdma_disconnect failed" << std::endl; CPPLOG::LOG_ERROR( "RdmaEndpoint : rdma_disconnect failed\n");
} }
ret = rdma_dereg_mr(_sendMr); ret = rdma_dereg_mr(_sendMr);
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : rdma_dereg_mr send failed" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed\n");
} }
free(_sendBuff); free(_sendBuff);
ret = rdma_dereg_mr(_recvMr); ret = rdma_dereg_mr(_recvMr);
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : rdma_dereg_mr recv failed" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed\n");
} }
free(_recvBuff); free(_recvBuff);
rdma_destroy_qp(_cm_id); rdma_destroy_qp(_cm_id);
std::cout<<"des qp"<<std::endl; CPPLOG::LOG_INFO("des qp\n");
rdma_destroy_id(_cm_id);
// ret = rdma_destroy_id(_cm_id); // ret = rdma_destroy_id(_cm_id);
std::cout<<"des mr"<<std::endl; CPPLOG::LOG_INFO("des mr\n";
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : rdma_destroy_id failed" << std::endl; CPPLOG::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed\n");
} }
_state = CONN_STATE_CLOSED; _state = CONN_STATE_CLOSED;
std::cout<<"closed"<<std::endl; CPPLOG::LOG_INFO("closed\n");
} }
...@@ -9,13 +9,13 @@ RdmaReplicationEndpoint::RdmaReplicationEndpoint(struct rdma_cm_id *id, struct i ...@@ -9,13 +9,13 @@ RdmaReplicationEndpoint::RdmaReplicationEndpoint(struct rdma_cm_id *id, struct i
void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data) void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data)
{ {
std::cout << "send completion\n"; CPPLOG::LOG_INFO("send completion\n");
_sendBuffers->push((void *)data->wr_id); _sendBuffers->push((void *)data->wr_id);
} }
void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data) void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data)
{ {
std::cout << "recv completion\n"; CPPLOG::LOG_INFO("recv completion\n");
std::cout << "recieve" << (char *)(data->wr_id) << "\n"; std::cout << "Replication recieve" << (char *)(data->wr_id) << "\n";
char* request = new char[data->byte_len]; char* request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id,data->byte_len); memcpy(request, (void *)data->wr_id,data->byte_len);
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr); rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
......
...@@ -5,19 +5,19 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com ...@@ -5,19 +5,19 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com
_compChannel = ibv_create_comp_channel(verbs); _compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL) if (_compChannel == NULL)
{ {
std::cout << "SalCqProcessr : ibv_create_comp_channel failed\n"; CPPLOG::LOG_ERROR( "SalCqProcessr : ibv_create_comp_channel failed\n");
return; return;
} }
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0); _completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL) if (_completionQueue == NULL)
{ {
std::cout << "SalCqProcessr : ibv_create_cq failed" << std::endl; CPPLOG::LOG_INFO( "SalCqProcessr : ibv_create_cq failed");
return; return;
} }
int ret = ibv_req_notify_cq(_completionQueue, 0); int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
std::cout << "SalCqProcessr : ibv_req_notify_cq failed\n"; CPPLOG::LOG_INFO("SalCqProcessr : ibv_req_notify_cq failed\n");
} }
} }
struct ibv_cq *RdmaSalCqProcessor::getCq() struct ibv_cq *RdmaSalCqProcessor::getCq()
...@@ -26,7 +26,7 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com ...@@ -26,7 +26,7 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com
} }
void RdmaSalCqProcessor::start() void RdmaSalCqProcessor::start()
{ {
std::cout << "SalCqProcessr : starting process CQ events" << std::endl; CPPLOG::LOG_INFO("SalCqProcessr : starting process CQ events\n" );
_compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this); _compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this);
} }
void RdmaSalCqProcessor::processCQEvents() void RdmaSalCqProcessor::processCQEvents()
...@@ -45,7 +45,7 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com ...@@ -45,7 +45,7 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com
ret = ibv_get_cq_event(_compChannel, &cq, &context); ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1) if (ret == -1)
{ {
std::cout << "SalCqProcessr : ibv_get_cq_event failed\n"; CPPLOG::LOG_ERROR("SalCqProcessr : ibv_get_cq_event failed\n");
close(); close();
} }
ibv_ack_cq_events(cq, 1); ibv_ack_cq_events(cq, 1);
...@@ -55,13 +55,13 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com ...@@ -55,13 +55,13 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com
ret = ibv_req_notify_cq(_completionQueue, 0); ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
std::cout << "SalCqProcessr : ibv_req_notify_cq failed\n"; CPPLOG::LOG_ERROR("SalCqProcessr : ibv_req_notify_cq failed\n");
close(); close();
} }
ret = ibv_poll_cq(cq, nevent, wc_array); ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0) if (ret < 0)
{ {
std::cout << "SalCqProcessr : ibv_poll_cq failed\n"; CPPLOG::LOG_ERROR("SalCqProcessr : ibv_poll_cq failed\n");
close(); close();
} }
if (ret == 0) if (ret == 0)
......
...@@ -187,6 +187,7 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -187,6 +187,7 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
void RdmaServerEndpointGroup::clientClose() void RdmaServerEndpointGroup::clientClose()
{ {
_cmProcessor->close(); _cmProcessor->close();
delete _cmProcessor; delete _cmProcessor;
_salCqProcessor->close(); _salCqProcessor->close();
......
#include "TaskThread.hpp" #include "TaskThread.hpp"
#include "MessageFormats.hpp"
TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpointGroup *group) TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpointGroup *group)
: _id(id), _group(group) : _id(id), _group(group)
{ {
_taskQueue = taskqueue; _taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this)) if (pthread_create(&thread, NULL, &TaskThread::run, this))
{ {
std::cout << "pthread create has been failed while creating taskthread " << std::endl; CPPLOG::LOG_ERROR("pthread create has been failed while creating taskthread\n");
exit(0); exit(0);
} }
cpu_set_t cpuset; cpu_set_t cpuset;
...@@ -14,8 +14,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpoint ...@@ -14,8 +14,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpoint
CPU_SET(cpu, &cpuset); CPU_SET(cpu, &cpuset);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0) if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0)
{ {
std::cerr << "Error calling pthread_setaffinity_np: " CPPLOG::LOG_ERROR("Error calling pthread_setaffinity_np\n ");
<< "\n";
} }
} }
...@@ -25,7 +24,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr ...@@ -25,7 +24,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr
_taskQueue = taskqueue; _taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this)) if (pthread_create(&thread, NULL, &TaskThread::run, this))
{ {
std::cout << "pthread create has been failed while creating taskthread " << std::endl; CPPLOG::LOG_ERROR( "pthread create has been failed while creating taskthread\n");
exit(0); exit(0);
} }
pthread_setname_np(thread,"TaskThread"); pthread_setname_np(thread,"TaskThread");
...@@ -33,7 +32,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr ...@@ -33,7 +32,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr
TaskThread::~TaskThread() TaskThread::~TaskThread()
{ {
std::cout << "Task Destructed" << std::endl; CPPLOG::LOG_INFO( "Task Destructed\n");
stop(); stop();
} }
...@@ -42,7 +41,7 @@ void TaskThread::stop() ...@@ -42,7 +41,7 @@ void TaskThread::stop()
_stop = true; _stop = true;
if (pthread_join(thread, NULL) == 0) if (pthread_join(thread, NULL) == 0)
{ {
std::cout << "pthread join failed" << std::endl; CPPLOG::LOG_ERROR("pthread join failed\n");
} }
} }
inline void *TaskThread::run(void *object) inline void *TaskThread::run(void *object)
...@@ -55,7 +54,7 @@ inline void *TaskThread::run(void *object) ...@@ -55,7 +54,7 @@ inline void *TaskThread::run(void *object)
data = thread->_taskQueue->try_pop(); data = thread->_taskQueue->try_pop();
if (data != NULL) if (data != NULL)
{ {
std::cout<<"TaskThread:: got data"<<std::endl; std::cout<<"TaskThread:: got data]n");
thread->processEvent(data); thread->processEvent(data);
thread->_taskQueue->removeFromSet(data); thread->_taskQueue->removeFromSet(data);
delete data; delete data;
...@@ -68,14 +67,18 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -68,14 +67,18 @@ void TaskThread::processEvent(struct ibv_wc *data)
{ {
if (data == NULL || data->status != IBV_WC_SUCCESS) if (data == NULL || data->status != IBV_WC_SUCCESS)
{ {
std::cout << "TaskThread : failed work completion : "; CPPLOG::LOG_INFO("TaskThread : failed work completion : ");
std::cout << ibv_wc_status_str(data->status) << " on qp " << data->qp_num << std::endl; CPPLOG::LOG_INFO(ibv_wc_status_str(data->status) );
CPPLOG::LOG_INFO(" on qp ");
CPPLOG::LOG_INFO( data->qp_num);
CPPLOG::LOG_INFO("\n");
return; return;
} }
auto it = _group->_qpSalEndpointMap->find(data->qp_num); auto it = _group->_qpSalEndpointMap->find(data->qp_num);
if (it == _group->_qpSalEndpointMap->end()) if (it == _group->_qpSalEndpointMap->end())
{ {
std::cout << data->qp_num << "RdmaSal : endpoint not registered for qp num\n"; CPPLOG::LOG_INFO(data->qp_num);
CPPLOG::LOG_INFO("RdmaSal : endpoint not registered for qp num\n");
return; return;
} }
// processSalCQEvent(data, it->second); // processSalCQEvent(data, it->second);
...@@ -108,20 +111,22 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -108,20 +111,22 @@ void TaskThread::processEvent(struct ibv_wc *data)
it->second->processPut(req); it->second->processPut(req);
break; break;
default: default:
std::cout << "SalRequest invalid req type"; CPPLOG::LOG_ERROR("SalRequest invalid req type");
break; break;
} }
delete[] buffer; delete[] buffer;
} }
break; break;
case IBV_WC_RDMA_WRITE: case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n"; CPPLOG::LOG_INFO("rdma write completion\n");
break; break;
case IBV_WC_RDMA_READ: case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n"; CPPLOG::LOG_INFO( "rdma read completion\n");
break; break;
default: default:
std::cout << "TaskThread default opcode : " << data->opcode << std::endl; CPPLOG::LOG_INFO( "TaskThread default opcode : ");
CPPLOG::LOG_INFO(data->opcode);
CPPLOG::LOG_INFO("\n");
break; break;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment