Commit 790ba763 authored by Paras Garg's avatar Paras Garg

changed message formats combined to single header for all message type

parent 5cf929e3
...@@ -13,13 +13,13 @@ public: ...@@ -13,13 +13,13 @@ public:
inline bool operator()(const struct ibv_wc* c1,const struct ibv_wc* c2) inline bool operator()(const struct ibv_wc* c1,const struct ibv_wc* c2)
const const
{ {
struct SalRequestHeader *req1 = (struct SalRequestHeader *)c1->wr_id; struct MessageHeader *req1 = (struct MessageHeader *)c1->wr_id;
struct SalRequestHeader *req2 = (struct SalRequestHeader *)c2->wr_id; struct MessageHeader *req2 = (struct MessageHeader *)c2->wr_id;
if(req1->keySize != req2->keySize) if(req1->keySize != req2->keySize)
return true; return true;
char* key1 = (char*)req1+SalRequestHeaderSize; char* key1 = (char*)req1+MessageHeaderSize;
char* key2 = (char*)req2+SalRequestHeaderSize; char* key2 = (char*)req2+ MessageHeaderSize;
for(int i=0;i<req1->keySize;i++) for(uint32_t i=0;i<req1->keySize;i++)
{ {
if(key1[i]!=key2[i]) if(key1[i]!=key2[i])
return true; return true;
......
#ifndef __MessageFormats__ #ifndef __MessageFormats__
#define __MessageFormats__ #define __MessageFormats__
enum RequestType enum MessageType
{ {
GET, GET = (1u << 0),
PUT, PUT = (1u << 1),
DELETE DELETE = (1u << 2),
}; INVALIDATE = (1u << 3),
enum ResponseStatus SUCCESS = (1u << 4),
{ FAILURE = (1u <<5)
SUCCESS,
FAILURE,
INVALIDATE
}; };
struct __attribute__ ((__packed__)) SalRequestHeader struct __attribute__ ((__packed__)) MessageHeader
{ {
uint32_t id; uint32_t id;
enum RequestType type; enum MessageType type;
uint32_t keySize; uint32_t keySize;
uint32_t valueSize; uint32_t valueSize;
}; };
static const uint32_t MessageHeaderSize = sizeof(MessageHeader);
/*
struct __attribute__ ((__packed__)) SalResponseHeader struct __attribute__ ((__packed__)) SalResponseHeader
{ {
uint32_t id; uint32_t id;
enum ResponseStatus status; enum MessageType status;
/* //Note value will be present only in case of response status is success
* Note value will be present only in case of response status is success
*/
uint32_t valueSize; uint32_t valueSize;
}; };
struct __attribute__ ((__packed__)) InvRequestHeader struct __attribute__ ((__packed__)) InvRequestHeader
{ {
uint32_t id; uint32_t id;
enum ResponseStatus type; enum MessageType type;
uint32_t keySize; uint32_t keySize;
}; };
...@@ -42,11 +40,12 @@ struct __attribute__ ((__packed__)) InvRequestHeader ...@@ -42,11 +40,12 @@ struct __attribute__ ((__packed__)) InvRequestHeader
struct __attribute__ ((__packed__)) InvResponseHeader struct __attribute__ ((__packed__)) InvResponseHeader
{ {
uint32_t id; uint32_t id;
enum ResponseStatus status; enum MessageType status; MessageHeader
}; };
static uint32_t SalRequestHeaderSize = sizeof(SalRequestHeader); */
/*
static uint32_t SalResponseHeaderSize = sizeof(SalResponseHeader); static uint32_t SalResponseHeaderSize = sizeof(SalResponseHeader);
static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader); static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader);
static uint32_t InvResponseHeaderSize = sizeof(InvResponseHeader); static uint32_t InvResponseHeaderSize = sizeof(InvResponseHeader);
*/
#endif #endif
\ No newline at end of file
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
#include "Executor.hpp" #include "Executor.hpp"
#include "Logger.hpp" #include "Logger.hpp"
class RdmaRepCqProcessor class RdmaRepCqProcessor
{ {
public: public:
...@@ -21,91 +20,12 @@ public: ...@@ -21,91 +20,12 @@ public:
bool _stop{false}; bool _stop{false};
Executor *_executor{NULL}; Executor *_executor{NULL};
RdmaRepCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize) RdmaRepCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize);
: _executor(ex) struct ibv_cq *getCq();
{ void start();
_compChannel = ibv_create_comp_channel(verbs); void processCQEvents();
if (_compChannel == NULL) void processRepEvent(struct ibv_wc *data);
{ void close();
std::cout << "CqProcessr : ibv_create_comp_channel failed\n";
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl;
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
}
}
struct ibv_cq *getCq()
{
return _completionQueue;
}
void start()
{
std::cout << "CqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaRepCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(),"RepCQ");
}
void processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc *wc_array = new struct ibv_wc[nevent];
while (!_stop)
{
ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1)
{
std::cout << "CqProcessr : ibv_get_cq_event failed\n";
close();
}
ibv_ack_cq_events(cq, 1);
ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
close();
}
ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0)
{
std::cout << "CqProcessr : ibv_poll_cq failed\n";
close();
}
if (ret == 0)
continue;
for (int i = 0; i < ret; i++)
{
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
//data->vendor_err = 1;
//_executor->submit(data);
/*
* created new thread because currently we are not getting data from followers
*/
new std::thread(&RdmaRepCqProcessor::processRepEvent, this,data);
}
//_executor->dispatchRepCqEvents(wc_array, ret);
}
}
void processRepEvent(struct ibv_wc* data)
{
std::cout<<"procesing Replication request"<<std::endl;
}
void close()
{
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
}
}; };
#endif #endif
...@@ -23,9 +23,9 @@ public: ...@@ -23,9 +23,9 @@ public:
void processCqEvent(struct ibv_wc wc); void processCqEvent(struct ibv_wc wc);
void processSendCompletion(struct ibv_wc *data); void processSendCompletion(struct ibv_wc *data);
void processRecvCompletion(struct ibv_wc *data); void processRecvCompletion(struct ibv_wc *data);
void processDelete(struct SalRequestHeader *); void processDelete(struct MessageHeader *);
void processGet(struct SalRequestHeader *req); void processGet(struct MessageHeader *req);
void processPut(struct SalRequestHeader *req); void processPut(struct MessageHeader *req);
int sendMessage(const char *buffer, uint32_t size); int sendMessage(const char *buffer, uint32_t size);
void close(); void close();
}; };
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
- [] failure detection and recovery - [] failure detection and recovery
- [] explore multicast - [] explore multicast
- [x] Added logging api - [x] Added logging api
- [ ] Add future in client
- [ ] add multi server in client - [ ] add multi server in client
- [ ] add yscb - [ ] add yscb
- [ ] add dynamic hashing in client code - [ ] add dynamic hashing in client code
......
...@@ -11,7 +11,7 @@ Executor::Executor(int size, RdmaEndpointGroup *group) ...@@ -11,7 +11,7 @@ Executor::Executor(int size, RdmaEndpointGroup *group)
_taskThreads->push_back(thread); _taskThreads->push_back(thread);
} }
} }
void Executor::submit(struct ibv_wc *task) void Executor::submit(struct ibv_wc *task)
{ {
_taskQueue->push(task); _taskQueue->push(task);
} }
......
...@@ -4,11 +4,11 @@ ...@@ -4,11 +4,11 @@
RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group) RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group)
: _endpointGroup(group) : _endpointGroup(group)
{ {
CPPLOG::LOG_INFO("CMProcessor : Step 1 creating event channel\n"); CPPLog::LOG_INFO("CMProcessor : Step 1 creating event channel\n");
_eventChannel = rdma_create_event_channel(); _eventChannel = rdma_create_event_channel();
if (_eventChannel == NULL) if (_eventChannel == NULL)
{ {
CPPLOG::LOG_ERROR( "CMProcesor : error creating event channel\n"); CPPLog::LOG_ERROR( "CMProcesor : error creating event channel\n");
} }
} }
...@@ -17,7 +17,7 @@ struct rdma_cm_id *RdmaCmProcessor::createId() ...@@ -17,7 +17,7 @@ struct rdma_cm_id *RdmaCmProcessor::createId()
struct rdma_cm_id *id = NULL; struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP); int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
if (ret == -1) if (ret == -1)
CPPLOG::LOG_ERROR("CMProcesor : rdma_create_id failed\n"); CPPLog::LOG_ERROR("CMProcesor : rdma_create_id failed\n");
return id; return id;
} }
...@@ -25,20 +25,20 @@ void RdmaCmProcessor::processCmEvent() ...@@ -25,20 +25,20 @@ void RdmaCmProcessor::processCmEvent()
{ {
int ret; int ret;
struct rdma_cm_event *event; struct rdma_cm_event *event;
CPPLOG::LOG_INFO("CMProcessor : starting cm processing thread\n"); CPPLog::LOG_INFO("CMProcessor : starting cm processing thread\n");
while (!_stop) while (!_stop)
{ {
ret = rdma_get_cm_event(_eventChannel, &event); ret = rdma_get_cm_event(_eventChannel, &event);
if (ret) if (ret)
{ {
CPPLOG::LOG_ERROR("CMProcesor : rdma_get_cm_event failed\n"); CPPLog::LOG_ERROR("CMProcesor : rdma_get_cm_event failed\n");
continue; continue;
} }
_endpointGroup->processCmEvent(event); _endpointGroup->processCmEvent(event);
ret = rdma_ack_cm_event(event); ret = rdma_ack_cm_event(event);
if (ret) if (ret)
{ {
CPPLOG::LOG_ERROR("CMProcesor : rdma_ack_cm_event failed\n"); CPPLog::LOG_ERROR("CMProcesor : rdma_ack_cm_event failed\n");
} }
} }
} }
......
#include "RdmaEndpoint.hpp" #include "RdmaEndpoint.hpp"
#include "Logger.hpp"
int RdmaEndpoint::CONN_STATE_INITIALIZED = 1; int RdmaEndpoint::CONN_STATE_INITIALIZED = 1;
int RdmaEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2; int RdmaEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2;
int RdmaEndpoint::CONN_STATE_CONNECTED = 3; int RdmaEndpoint::CONN_STATE_CONNECTED = 3;
...@@ -17,13 +18,13 @@ void RdmaEndpoint::createResources() ...@@ -17,13 +18,13 @@ void RdmaEndpoint::createResources()
{ {
if (_state != CONN_STATE_INITIALIZED) if (_state != CONN_STATE_INITIALIZED)
{ {
CPPLOG::LOG_ERROR("RdmaEndpoint : createResource invalid state\n"); CPPLog::LOG_ERROR("RdmaEndpoint : createResource invalid state\n");
} }
_protectionDomain = ibv_alloc_pd(_cm_id->verbs); _protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL) if (_protectionDomain == NULL)
{ {
CPPLOG::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed \n"); CPPLog::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed \n");
return; return;
} }
...@@ -49,28 +50,28 @@ void RdmaEndpoint::createResources() ...@@ -49,28 +50,28 @@ void RdmaEndpoint::createResources()
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr); int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret) if (ret)
{ {
CPPLOG::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed\n");
} }
if (_cm_id->pd == NULL) if (_cm_id->pd == NULL)
{ {
CPPLOG::LOG_ERROR("RdmaEndpoint : pd not set\n") ; CPPLog::LOG_ERROR("RdmaEndpoint : pd not set\n") ;
_cm_id->pd = _protectionDomain; _cm_id->pd = _protectionDomain;
} }
_sendBuff = (char*)malloc(_sendMsgSize * _sendQueueSize); _sendBuff = (char*)malloc(_sendMsgSize * _sendQueueSize);
if (_sendBuff == NULL) if (_sendBuff == NULL)
CPPLOG::LOG_ERROR("RdmaEndpoint : sendBuff malloc failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : sendBuff malloc failed\n");
_recvBuff = (char*)malloc(_recvMsgSize * _recvQueueSize); _recvBuff = (char*)malloc(_recvMsgSize * _recvQueueSize);
_sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize); _sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL) if (_sendMr == NULL)
CPPLOG::LOG_ERROR("RdmaEndpoint : sendMr reg failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : sendMr reg failed\n");
if (_recvBuff == NULL) if (_recvBuff == NULL)
CPPLOG::LOG_ERROR("RdmaEndpoint : recvBuff malloc failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : recvBuff malloc failed\n");
_recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize); _recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL) if (_recvMr == NULL)
CPPLOG::LOG_ERROR("RdmaEndpoint : recvMr reg failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : recvMr reg failed\n");
for (int i = 0; i < _recvQueueSize; i++) for (int i = 0; i < _recvQueueSize; i++)
{ {
...@@ -91,24 +92,24 @@ void RdmaEndpoint::createResources() ...@@ -91,24 +92,24 @@ void RdmaEndpoint::createResources()
void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event) void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event)
{ {
CPPLOG::LOG_INFO("RdmaEndpoint : Event "); CPPLog::LOG_INFO("RdmaEndpoint : Event ");
CPPLOG::LOG_INFO(rdma_event_str(event->event)); CPPLog::LOG_INFO(rdma_event_str(event->event));
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{ {
CPPLOG::LOG_INFO("RdmaEndpoint : Connect request\n"); CPPLog::LOG_INFO("RdmaEndpoint : Connect request\n");
} }
else if (event->event == RDMA_CM_EVENT_ESTABLISHED) else if (event->event == RDMA_CM_EVENT_ESTABLISHED)
{ {
if (_state != CONN_STATE_RESOURCES_ALLOCATED) if (_state != CONN_STATE_RESOURCES_ALLOCATED)
{ {
CPPLOG::LOG_ERROR( "RdmaEndpoint : EstablishedEvent invalid state \n"); CPPLog::LOG_ERROR( "RdmaEndpoint : EstablishedEvent invalid state \n");
} }
CPPLOG::LOG_INFO("RdmaEndpoint : step 6 Connected\n"); CPPLog::LOG_INFO("RdmaEndpoint : step 6 Connected\n");
_state = CONN_STATE_CONNECTED; _state = CONN_STATE_CONNECTED;
} }
else if (event->event == RDMA_CM_EVENT_DISCONNECTED) else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{ {
CPPLOG::LOG_INFO("RdmaEndpoint : step 7 disconnected\n"); CPPLog::LOG_INFO("RdmaEndpoint : step 7 disconnected\n");
clientClose(); clientClose();
} }
} }
...@@ -117,43 +118,43 @@ void RdmaEndpoint::clientClose() ...@@ -117,43 +118,43 @@ void RdmaEndpoint::clientClose()
{ {
if (_state != CONN_STATE_CONNECTED) if (_state != CONN_STATE_CONNECTED)
{ {
CPPLOG::LOG_ERROR("RdmaEndpoint : clientClose invalid state\n"); CPPLog::LOG_ERROR("RdmaEndpoint : clientClose invalid state\n");
return; return;
} }
CPPLOG::LOG_INFO("RdmaEndpoint : closing connection qp "); CPPLog::LOG_INFO("RdmaEndpoint : closing connection qp \n");
CPPLOG::LOG_INFO(_cm_id->qp->qp_num); // CPPLog::LOG_INFO((_cm_id->qp->qp_num);
int ret; int ret;
ret = rdma_disconnect(_cm_id); ret = rdma_disconnect(_cm_id);
if (ret) if (ret)
{ {
CPPLOG::LOG_ERROR( "RdmaEndpoint : rdma_disconnect failed\n"); CPPLog::LOG_ERROR( "RdmaEndpoint : rdma_disconnect failed\n");
} }
ret = rdma_dereg_mr(_sendMr); ret = rdma_dereg_mr(_sendMr);
if (ret) if (ret)
{ {
CPPLOG::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed\n");
} }
free(_sendBuff); free(_sendBuff);
ret = rdma_dereg_mr(_recvMr); ret = rdma_dereg_mr(_recvMr);
if (ret) if (ret)
{ {
CPPLOG::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed\n");
} }
free(_recvBuff); free(_recvBuff);
rdma_destroy_qp(_cm_id); rdma_destroy_qp(_cm_id);
CPPLOG::LOG_INFO("des qp\n"); CPPLog::LOG_INFO("des qp\n");
rdma_destroy_id(_cm_id); //rdma_destroy_id(_cm_id);
// ret = rdma_destroy_id(_cm_id); // ret = rdma_destroy_id(_cm_id);
CPPLOG::LOG_INFO("des mr\n"; CPPLog::LOG_INFO("des mr\n");
if (ret) if (ret)
{ {
CPPLOG::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed\n"); CPPLog::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed\n");
} }
_state = CONN_STATE_CLOSED; _state = CONN_STATE_CLOSED;
CPPLOG::LOG_INFO("closed\n"); CPPLog::LOG_INFO("closed\n");
} }
#include "RdmaRepCqProcessor.hpp" #include "RdmaRepCqProcessor.hpp"
\ No newline at end of file
RdmaRepCqProcessor::RdmaRepCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize)
: _executor(ex)
{
_compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL)
{
std::cout << "CqProcessr : ibv_create_comp_channel failed\n";
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl;
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
}
}
struct ibv_cq *RdmaRepCqProcessor::getCq()
{
return _completionQueue;
}
void RdmaRepCqProcessor::start()
{
std::cout << "CqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaRepCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(), "RepCQ");
}
void RdmaRepCqProcessor::processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc *wc_array = new struct ibv_wc[nevent];
while (!_stop)
{
ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1)
{
std::cout << "CqProcessr : ibv_get_cq_event failed\n";
close();
}
ibv_ack_cq_events(cq, 1);
ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
close();
}
ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0)
{
std::cout << "CqProcessr : ibv_poll_cq failed\n";
close();
}
if (ret == 0)
continue;
for (int i = 0; i < ret; i++)
{
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
// data->vendor_err = 1;
//_executor->submit(data);
/*
* created new thread because currently we are not getting data from followers
*/
new std::thread(&RdmaRepCqProcessor::processRepEvent, this, data);
}
//_executor->dispatchRepCqEvents(wc_array, ret);
}
}
void RdmaRepCqProcessor::processRepEvent(struct ibv_wc *data)
{
std::cout << "procesing Replication request" << std::endl;
}
void RdmaRepCqProcessor::close()
{
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
}
\ No newline at end of file
...@@ -9,12 +9,12 @@ RdmaReplicationEndpoint::RdmaReplicationEndpoint(struct rdma_cm_id *id, struct i ...@@ -9,12 +9,12 @@ RdmaReplicationEndpoint::RdmaReplicationEndpoint(struct rdma_cm_id *id, struct i
void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data) void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data)
{ {
CPPLOG::LOG_INFO("send completion\n"); CPPLog::LOG_INFO("send completion\n");
_sendBuffers->push((void *)data->wr_id); _sendBuffers->push((void *)data->wr_id);
} }
void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data) void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data)
{ {
CPPLOG::LOG_INFO("recv completion\n"); CPPLog::LOG_INFO("recv completion\n");
std::cout << "Replication recieve" << (char *)(data->wr_id) << "\n"; std::cout << "Replication recieve" << (char *)(data->wr_id) << "\n";
char* request = new char[data->byte_len]; char* request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id,data->byte_len); memcpy(request, (void *)data->wr_id,data->byte_len);
...@@ -23,7 +23,7 @@ void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data) ...@@ -23,7 +23,7 @@ void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data)
int RdmaReplicationEndpoint::sendMessage(const char *buffer, uint32_t size) int RdmaReplicationEndpoint::sendMessage(const char *buffer, uint32_t size)
{ {
if (size > _sendMsgSize) if (size > (uint32_t)_sendMsgSize)
return -1; return -1;
void* sendBuffer = nullptr; void* sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer); _sendBuffers->pop(sendBuffer);
......
#include "RdmaSalCqProcessor.hpp" #include "RdmaSalCqProcessor.hpp"
RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize) RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize)
: _executor(ex) : _executor(ex)
{
_compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL)
{ {
_compChannel = ibv_create_comp_channel(verbs); CPPLog::LOG_ERROR("SalCqProcessr : ibv_create_comp_channel failed\n");
if (_compChannel == NULL) return;
{
CPPLOG::LOG_ERROR( "SalCqProcessr : ibv_create_comp_channel failed\n");
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
CPPLOG::LOG_INFO( "SalCqProcessr : ibv_create_cq failed");
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
CPPLOG::LOG_INFO("SalCqProcessr : ibv_req_notify_cq failed\n");
}
} }
struct ibv_cq *RdmaSalCqProcessor::getCq() _completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{ {
return _completionQueue; CPPLog::LOG_INFO("SalCqProcessr : ibv_create_cq failed");
return;
} }
void RdmaSalCqProcessor::start() int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{ {
CPPLOG::LOG_INFO("SalCqProcessr : starting process CQ events\n" ); CPPLog::LOG_INFO("SalCqProcessr : ibv_req_notify_cq failed\n");
_compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this);
} }
void RdmaSalCqProcessor::processCQEvents() }
struct ibv_cq *RdmaSalCqProcessor::getCq()
{
return _completionQueue;
}
void RdmaSalCqProcessor::start()
{
CPPLog::LOG_INFO("SalCqProcessr : starting process CQ events\n");
_compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this);
}
void RdmaSalCqProcessor::processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc wc_array[nevent];
while (!_stop)
{ {
int ret = 0; /*
struct ibv_cq *cq; * get_CQ_event is a blocking call and it wait save some cpu cycles but.
void *context; * it might not be that efficient compared to polling
const int nevent = 10; */
struct ibv_wc wc_array[nevent]; ret = ibv_get_cq_event(_compChannel, &cq, &context);
while (!_stop) if (ret == -1)
{ {
/* CPPLog::LOG_ERROR("SalCqProcessr : ibv_get_cq_event failed\n");
* get_CQ_event is a blocking call and it wait save some cpu cycles but. close();
* it might not be that efficient compared to polling }
*/ ibv_ack_cq_events(cq, 1);
ret = ibv_get_cq_event(_compChannel, &cq, &context); /*
if (ret == -1) * Create a request for next completion cycle
{ */
CPPLOG::LOG_ERROR("SalCqProcessr : ibv_get_cq_event failed\n"); ret = ibv_req_notify_cq(_completionQueue, 0);
close(); if (ret)
} {
ibv_ack_cq_events(cq, 1); CPPLog::LOG_ERROR("SalCqProcessr : ibv_req_notify_cq failed\n");
/* close();
* Create a request for next completion cycle }
*/ ret = ibv_poll_cq(cq, nevent, wc_array);
ret = ibv_req_notify_cq(_completionQueue, 0); if (ret < 0)
if (ret) {
{ CPPLog::LOG_ERROR("SalCqProcessr : ibv_poll_cq failed\n");
CPPLOG::LOG_ERROR("SalCqProcessr : ibv_req_notify_cq failed\n"); close();
close(); }
} if (ret == 0)
ret = ibv_poll_cq(cq, nevent, wc_array); continue;
if (ret < 0) for (int i = 0; i < ret; i++)
{
if (wc_array[i].status != IBV_WC_SUCCESS)
{ {
CPPLOG::LOG_ERROR("SalCqProcessr : ibv_poll_cq failed\n"); std::ostringstream ss;
close(); ss<< "RdmaSalCqProcessor : failed work completion : ";
} ss<<ibv_wc_status_str(wc_array[i].status)<<"on qp"<<wc_array[i].qp_num<<"\n";
if (ret == 0) CPPLog::LOG_ERROR(ss);
continue; continue;
for (int i = 0; i < ret; i++)
{
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
/*
* vendor_err is set to check whether the request came from sal or followers
* data->vendor_err = 0;
*/
_executor->submit(data);
} }
// _executor->dispatchSalCqEvents(wc_array, ret); struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
/*
* vendor_err is set to check whether the request came from sal or followers
* data->vendor_err = 0;
*/
_executor->submit(data);
} }
} }
}
void RdmaSalCqProcessor::close() void RdmaSalCqProcessor::close()
{ {
_stop = true; _stop = true;
if (_compQueueThread != NULL) if (_compQueueThread != NULL)
_compQueueThread->join(); _compQueueThread->join();
} }
\ No newline at end of file \ No newline at end of file
...@@ -14,7 +14,7 @@ void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data) ...@@ -14,7 +14,7 @@ void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data)
int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size) int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size)
{ {
if (size > _sendMsgSize) if (size > (uint32_t)_sendMsgSize)
return -1; return -1;
void *sendBuffer = nullptr; void *sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer); _sendBuffers->pop(sendBuffer);
...@@ -28,64 +28,64 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data) ...@@ -28,64 +28,64 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data)
{ {
char *request = new char[data->byte_len]; char *request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id, data->byte_len); memcpy(request, (void *)data->wr_id, data->byte_len);
struct SalRequestHeader *req = (struct SalRequestHeader *)request; struct MessageHeader *req = (struct MessageHeader *)request;
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr); rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
if (req->type == RequestType::DELETE) if (req->type == MessageType::DELETE)
processDelete(req); processDelete(req);
if (req->type == RequestType::GET) if (req->type == MessageType::GET)
processGet(req); processGet(req);
if (req->type == RequestType::PUT) if (req->type == MessageType::PUT)
processPut(req); processPut(req);
delete[] request; delete[] request;
} }
void RdmaSalEndpoint::processDelete(struct SalRequestHeader *req) void RdmaSalEndpoint::processDelete(struct MessageHeader *req)
{ {
rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}); rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize});
void *sendBuf = nullptr; void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf); _sendBuffers->pop(sendBuf);
if (sendBuf == nullptr) if (sendBuf == nullptr)
{ {
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf; MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->status = ResponseStatus::FAILURE; response->type = MessageType::FAILURE;
response->id = req->id; response->id = req->id;
if (s.ok()) if (s.ok())
{ {
response->status = ResponseStatus::SUCCESS; response->type = MessageType::SUCCESS;
} }
rdma_post_send(_cm_id, sendBuf, sendBuf, SalResponseHeaderSize, _sendMr, 0); rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
} }
void RdmaSalEndpoint::processGet(struct SalRequestHeader *req) void RdmaSalEndpoint::processGet(struct MessageHeader *req)
{ {
std::string value; std::string value;
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, &value); rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + MessageHeaderSize, req->keySize}, &value);
void *sendBuf = nullptr; void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf); _sendBuffers->pop(sendBuf);
if (sendBuf == nullptr) if (sendBuf == nullptr)
{ {
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf; MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->status = ResponseStatus::FAILURE; response->type = MessageType::FAILURE;
response->id = req->id; response->id = req->id;
if (s.ok()) if (s.ok())
{ {
response->status = ResponseStatus::SUCCESS; response->type = MessageType::SUCCESS;
response->valueSize = value.size(); response->valueSize = value.size();
memcpy(response + SalResponseHeaderSize, value.c_str(), value.size()); memcpy(response + MessageHeaderSize, value.c_str(), value.size());
} }
rdma_post_send(_cm_id, sendBuf, sendBuf, SalRequestHeaderSize + value.size(), _sendMr, 0); rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize + value.size(), _sendMr, 0);
} }
void RdmaSalEndpoint::processPut(struct SalRequestHeader *req) void RdmaSalEndpoint::processPut(struct MessageHeader *req)
{ {
rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize},
{(char *)req + SalRequestHeaderSize + req->keySize, req->valueSize}); {(char *)req + MessageHeaderSize + req->keySize, req->valueSize});
void *sendBuf = nullptr; void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf); _sendBuffers->pop(sendBuf);
if (sendBuf == nullptr) if (sendBuf == nullptr)
...@@ -93,11 +93,11 @@ void RdmaSalEndpoint::processPut(struct SalRequestHeader *req) ...@@ -93,11 +93,11 @@ void RdmaSalEndpoint::processPut(struct SalRequestHeader *req)
std::cout<<"No send Buffer"<<std::endl; std::cout<<"No send Buffer"<<std::endl;
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf; MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->status = ResponseStatus::FAILURE; response->type = MessageType::FAILURE;
response->id = req->id; response->id = req->id;
if (s.ok()) if (s.ok())
response->status = ResponseStatus::FAILURE; response->type = MessageType::FAILURE;
rdma_post_send(_cm_id, sendBuf, sendBuf, SalResponseHeaderSize, _sendMr, 0); rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
} }
...@@ -91,7 +91,7 @@ void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event) ...@@ -91,7 +91,7 @@ void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event)
const char *connData = reinterpret_cast<const char *>(event->param.conn.private_data); const char *connData = reinterpret_cast<const char *>(event->param.conn.private_data);
if (strcmp(connData, "sal") == 0) if (strcmp(connData, "sal") == 0)
{ {
std::cout << "sal" << std::endl; //std::cout << "sal" << std::endl;
RdmaSalEndpoint *endpoint = new RdmaSalEndpoint(event->id, createSalCq(event->id), _sendQueueSize, _recvQueueSize, RdmaSalEndpoint *endpoint = new RdmaSalEndpoint(event->id, createSalCq(event->id), _sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize,_db); _sendMsgSize, _recvMsgSize,_db);
event->id->context = (void *)endpoint; event->id->context = (void *)endpoint;
......
...@@ -6,7 +6,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpoint ...@@ -6,7 +6,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpoint
_taskQueue = taskqueue; _taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this)) if (pthread_create(&thread, NULL, &TaskThread::run, this))
{ {
CPPLOG::LOG_ERROR("pthread create has been failed while creating taskthread\n"); CPPLog::LOG_ERROR("pthread create has been failed while creating taskthread\n");
exit(0); exit(0);
} }
cpu_set_t cpuset; cpu_set_t cpuset;
...@@ -14,7 +14,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpoint ...@@ -14,7 +14,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpoint
CPU_SET(cpu, &cpuset); CPU_SET(cpu, &cpuset);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0) if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0)
{ {
CPPLOG::LOG_ERROR("Error calling pthread_setaffinity_np\n "); CPPLog::LOG_ERROR("Error calling pthread_setaffinity_np\n ");
} }
} }
...@@ -24,7 +24,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr ...@@ -24,7 +24,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr
_taskQueue = taskqueue; _taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this)) if (pthread_create(&thread, NULL, &TaskThread::run, this))
{ {
CPPLOG::LOG_ERROR( "pthread create has been failed while creating taskthread\n"); CPPLog::LOG_ERROR( "pthread create has been failed while creating taskthread\n");
exit(0); exit(0);
} }
pthread_setname_np(thread,"TaskThread"); pthread_setname_np(thread,"TaskThread");
...@@ -32,7 +32,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr ...@@ -32,7 +32,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr
TaskThread::~TaskThread() TaskThread::~TaskThread()
{ {
CPPLOG::LOG_INFO( "Task Destructed\n"); CPPLog::LOG_INFO( "Task Destructed\n");
stop(); stop();
} }
...@@ -41,7 +41,7 @@ void TaskThread::stop() ...@@ -41,7 +41,7 @@ void TaskThread::stop()
_stop = true; _stop = true;
if (pthread_join(thread, NULL) == 0) if (pthread_join(thread, NULL) == 0)
{ {
CPPLOG::LOG_ERROR("pthread join failed\n"); CPPLog::LOG_ERROR("pthread join failed\n");
} }
} }
inline void *TaskThread::run(void *object) inline void *TaskThread::run(void *object)
...@@ -54,7 +54,7 @@ inline void *TaskThread::run(void *object) ...@@ -54,7 +54,7 @@ inline void *TaskThread::run(void *object)
data = thread->_taskQueue->try_pop(); data = thread->_taskQueue->try_pop();
if (data != NULL) if (data != NULL)
{ {
std::cout<<"TaskThread:: got data]n"); std::cout<<"TaskThread:: got data\n";
thread->processEvent(data); thread->processEvent(data);
thread->_taskQueue->removeFromSet(data); thread->_taskQueue->removeFromSet(data);
delete data; delete data;
...@@ -67,18 +67,18 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -67,18 +67,18 @@ void TaskThread::processEvent(struct ibv_wc *data)
{ {
if (data == NULL || data->status != IBV_WC_SUCCESS) if (data == NULL || data->status != IBV_WC_SUCCESS)
{ {
CPPLOG::LOG_INFO("TaskThread : failed work completion : "); CPPLog::LOG_INFO("TaskThread : failed work completion : ");
CPPLOG::LOG_INFO(ibv_wc_status_str(data->status) ); CPPLog::LOG_INFO(ibv_wc_status_str(data->status) );
CPPLOG::LOG_INFO(" on qp "); CPPLog::LOG_INFO(" on qp ");
CPPLOG::LOG_INFO( data->qp_num); // CPPLog::LOG_INFO( data->qp_num);
CPPLOG::LOG_INFO("\n"); CPPLog::LOG_INFO("\n");
return; return;
} }
auto it = _group->_qpSalEndpointMap->find(data->qp_num); auto it = _group->_qpSalEndpointMap->find(data->qp_num);
if (it == _group->_qpSalEndpointMap->end()) if (it == _group->_qpSalEndpointMap->end())
{ {
CPPLOG::LOG_INFO(data->qp_num); // CPPLog::LOG_INFO(data->qp_num);
CPPLOG::LOG_INFO("RdmaSal : endpoint not registered for qp num\n"); CPPLog::LOG_INFO("RdmaSal : endpoint not registered for qp num\n");
return; return;
} }
// processSalCQEvent(data, it->second); // processSalCQEvent(data, it->second);
...@@ -94,39 +94,37 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -94,39 +94,37 @@ void TaskThread::processEvent(struct ibv_wc *data)
memcpy(buffer, (void *)data->wr_id, data->byte_len); memcpy(buffer, (void *)data->wr_id, data->byte_len);
rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id, rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
it->second->_recvMsgSize, it->second->_recvMr); it->second->_recvMsgSize, it->second->_recvMr);
struct SalRequestHeader *req = (struct SalRequestHeader *)buffer; struct MessageHeader *req = (struct MessageHeader *)buffer;
std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize;
std::cout << " " << req->type << "size" << data->byte_len << "\n";
switch (req->type) switch (req->type)
{ {
case RequestType::GET: case MessageType::GET:
it->second->processGet(req); it->second->processGet(req);
break; break;
case RequestType::DELETE: case MessageType::DELETE:
replicateSalRequest(buffer, data->byte_len); replicateSalRequest(buffer, data->byte_len);
it->second->processDelete(req); it->second->processDelete(req);
break; break;
case RequestType::PUT: case MessageType::PUT:
replicateSalRequest(buffer, data->byte_len); replicateSalRequest(buffer, data->byte_len);
it->second->processPut(req); it->second->processPut(req);
break; break;
default: default:
CPPLOG::LOG_ERROR("SalRequest invalid req type"); CPPLog::LOG_ERROR("SalRequest invalid req type");
break; break;
} }
delete[] buffer; delete[] buffer;
} }
break; break;
case IBV_WC_RDMA_WRITE: case IBV_WC_RDMA_WRITE:
CPPLOG::LOG_INFO("rdma write completion\n"); CPPLog::LOG_INFO("rdma write completion\n");
break; break;
case IBV_WC_RDMA_READ: case IBV_WC_RDMA_READ:
CPPLOG::LOG_INFO( "rdma read completion\n"); CPPLog::LOG_INFO( "rdma read completion\n");
break; break;
default: default:
CPPLOG::LOG_INFO( "TaskThread default opcode : "); CPPLog::LOG_INFO( "TaskThread default opcode : ");
CPPLOG::LOG_INFO(data->opcode); // CPPLog::LOG_INFO(data->opcode);
CPPLOG::LOG_INFO("\n"); CPPLog::LOG_INFO("\n");
break; break;
} }
} }
...@@ -137,16 +135,16 @@ void TaskThread::replicateSalRequest(char *req, uint32_t size) ...@@ -137,16 +135,16 @@ void TaskThread::replicateSalRequest(char *req, uint32_t size)
{ {
i->second->sendMessage(req, size); i->second->sendMessage(req, size);
} }
SalRequestHeader *salReq = (SalRequestHeader *)req; MessageHeader *salReq = (MessageHeader *)req;
char *buffer = new char[InvRequestHeaderSize + salReq->keySize]; char *buffer = new char[MessageHeaderSize + salReq->keySize];
InvRequestHeader *invRequest = (InvRequestHeader *)(buffer); MessageHeader *invRequest = (MessageHeader *)(buffer);
invRequest->type = ResponseStatus::INVALIDATE; invRequest->type = MessageType::INVALIDATE;
invRequest->id = salReq->id; invRequest->id = salReq->id;
invRequest->keySize = salReq->keySize; invRequest->keySize = salReq->keySize;
memcpy(buffer + InvRequestHeaderSize, salReq + SalRequestHeaderSize, salReq->keySize); memcpy(buffer + MessageHeaderSize, salReq + MessageHeaderSize, salReq->keySize);
//Send Invalidation to sal's //Send Invalidation to sal's
for (auto i = _group->_qpSalEndpointMap->begin(); i != _group->_qpSalEndpointMap->end(); i++) for (auto i = _group->_qpSalEndpointMap->begin(); i != _group->_qpSalEndpointMap->end(); i++)
{ {
i->second->sendMessage(buffer, InvRequestHeaderSize + salReq->keySize); i->second->sendMessage(buffer, MessageHeaderSize + salReq->keySize);
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment