Commit 184a3b67 authored by Paras Garg's avatar Paras Garg

Added code for follower

parent b954b568
No preview for this file type
...@@ -19,13 +19,13 @@ void RdmaServerRepEndpoint::createResources() ...@@ -19,13 +19,13 @@ void RdmaServerRepEndpoint::createResources()
*/ */
if (_state != CONN_STATE_INITIALIZED) if (_state != CONN_STATE_INITIALIZED)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : createResource invalid state"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : createResource invalid state");
} }
// Step 1 to create endpoint // Step 1 to create endpoint
_protectionDomain = ibv_alloc_pd(_cm_id->verbs); _protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL) if (_protectionDomain == NULL)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : ibv_alloc_pd failed");
return; return;
} }
// step 2 Creating Queue pair with completion queueu setted for send and recieve // step 2 Creating Queue pair with completion queueu setted for send and recieve
...@@ -54,11 +54,11 @@ void RdmaServerRepEndpoint::createResources() ...@@ -54,11 +54,11 @@ void RdmaServerRepEndpoint::createResources()
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr); int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : ibv_create_cq failed");
} }
if (_cm_id->pd == NULL) if (_cm_id->pd == NULL)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : pd not set"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : pd not set");
_cm_id->pd = _protectionDomain; _cm_id->pd = _protectionDomain;
} }
/* /*
...@@ -66,19 +66,19 @@ void RdmaServerRepEndpoint::createResources() ...@@ -66,19 +66,19 @@ void RdmaServerRepEndpoint::createResources()
*/ */
_sendBuff = new char[(_sendMsgSize * _sendQueueSize)]; _sendBuff = new char[(_sendMsgSize * _sendQueueSize)];
if (_sendBuff == NULL) if (_sendBuff == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : sendBuff allocation failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : sendBuff allocation failed");
_sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize); _sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL) if (_sendMr == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : sendMr reg failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : sendMr reg failed");
_recvBuff = new char[(_recvMsgSize * _recvQueueSize)]; _recvBuff = new char[(_recvMsgSize * _recvQueueSize)];
if (_recvBuff == NULL) if (_recvBuff == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : recvBuff allocation failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : recvBuff allocation failed");
_recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize); _recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL) if (_recvMr == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : recvMr reg failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : recvMr reg failed");
/* /*
* adding buffers for recving rdma data * adding buffers for recving rdma data
...@@ -104,24 +104,24 @@ void RdmaServerRepEndpoint::createResources() ...@@ -104,24 +104,24 @@ void RdmaServerRepEndpoint::createResources()
void RdmaServerRepEndpoint::processCmEvent(struct rdma_cm_event *event) void RdmaServerRepEndpoint::processCmEvent(struct rdma_cm_event *event)
{ {
std::ostringstream ss; std::ostringstream ss;
ss << "RdmaEndpoint : Event " << rdma_event_str(event->event); ss << "RdmaServerRepEndpoint : Event " << rdma_event_str(event->event);
CPPLog::LOG_ALWAYS(ss); CPPLog::LOG_ALWAYS(ss);
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{ {
CPPLog::LOG_ALWAYS("RdmaEndpoint : Connect request"); CPPLog::LOG_ALWAYS("RdmaServerRepEndpoint : Connect request");
} }
else if (event->event == RDMA_CM_EVENT_ESTABLISHED) else if (event->event == RDMA_CM_EVENT_ESTABLISHED)
{ {
if (_state != CONN_STATE_RESOURCES_ALLOCATED) if (_state != CONN_STATE_RESOURCES_ALLOCATED)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : Established_Event but resource not alloted"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : Established_Event but resource not alloted");
} }
CPPLog::LOG_INFO("RdmaEndpoint : step 6 Connected"); CPPLog::LOG_INFO("RdmaServerRepEndpoint : step 6 Connected");
_state = CONN_STATE_CONNECTED; _state = CONN_STATE_CONNECTED;
} }
else if (event->event == RDMA_CM_EVENT_DISCONNECTED) else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{ {
CPPLog::LOG_INFO("RdmaEndpoint : step 7 disconnected"); CPPLog::LOG_INFO("RdmaServerRepEndpoint : step 7 disconnected");
close(); close();
} }
} }
...@@ -132,25 +132,25 @@ void RdmaServerRepEndpoint::close() ...@@ -132,25 +132,25 @@ void RdmaServerRepEndpoint::close()
{ {
return; return;
} }
CPPLog::LOG_INFO("RdmaEndpoint : closing connection"); CPPLog::LOG_INFO("RdmaServerRepEndpoint : closing connection");
int ret; int ret;
ret = rdma_disconnect(_cm_id); ret = rdma_disconnect(_cm_id);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_disconnect failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : rdma_disconnect failed");
} }
ret = rdma_dereg_mr(_sendMr); ret = rdma_dereg_mr(_sendMr);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : rdma_dereg_mr send failed");
} }
delete[] _sendBuff; delete[] _sendBuff;
ret = rdma_dereg_mr(_recvMr); ret = rdma_dereg_mr(_recvMr);
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : rdma_dereg_mr recv failed");
} }
delete[] _recvBuff; delete[] _recvBuff;
...@@ -163,7 +163,7 @@ void RdmaServerRepEndpoint::close() ...@@ -163,7 +163,7 @@ void RdmaServerRepEndpoint::close()
CPPLog::LOG_INFO("des mr"); CPPLog::LOG_INFO("des mr");
if (ret) if (ret)
{ {
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed"); CPPLog::LOG_ERROR("RdmaServerRepEndpoint : rdma_destroy_id failed");
} }
_state = CONN_STATE_CLOSED; _state = CONN_STATE_CLOSED;
CPPLog::LOG_INFO("closed"); CPPLog::LOG_INFO("closed");
......
...@@ -12,7 +12,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, ...@@ -12,7 +12,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue,
CPPLog::LOG_ERROR("pthread create has been failed while creating taskthread"); CPPLog::LOG_ERROR("pthread create has been failed while creating taskthread");
exit(0); exit(0);
} }
cpu_set_t cpuset; /*cpu_set_t cpuset;
CPU_ZERO(&cpuset); CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset); CPU_SET(cpu, &cpuset);
std::ostringstream ss; std::ostringstream ss;
...@@ -21,7 +21,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, ...@@ -21,7 +21,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue,
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0) if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0)
{ {
CPPLog::LOG_ERROR("Error calling pthread_setaffinity_np "); CPPLog::LOG_ERROR("Error calling pthread_setaffinity_np ");
} }*/
pthread_setname_np(thread, "TaskThread"); pthread_setname_np(thread, "TaskThread");
} }
...@@ -134,7 +134,9 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data) ...@@ -134,7 +134,9 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
ep->processPut(req); ep->processPut(req);
break; break;
default: default:
CPPLog::LOG_ERROR("SalRequest invalid req type"); std::ostringstream ss;
ss<<"SalRequest invalid req type"<<data->opcode;
CPPLog::LOG_ERROR(ss);
break; break;
} }
delete[] buffer; delete[] buffer;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment