Commit 380739f6 authored by Paras Garg's avatar Paras Garg

refractor rdmasalendpoint

parent e3d7e950
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
#include "Logger.hpp" #include "Logger.hpp"
class RdmaSalEndpoint class RdmaSalEndpoint
{ {
public: public:
static int CONN_STATE_INITIALIZED; static int CONN_STATE_INITIALIZED;
...@@ -32,16 +32,17 @@ public: ...@@ -32,16 +32,17 @@ public:
char *_recvBuff{NULL}; char *_recvBuff{NULL};
struct ibv_mr *_sendMr{NULL}; struct ibv_mr *_sendMr{NULL};
struct ibv_mr *_recvMr{NULL}; struct ibv_mr *_recvMr{NULL};
std::queue<char*> _sendBuffers; std::queue<void *> _sendBuffers;
std::mutex _sendBuffersM; std::mutex _sendBuffersM;
rocksdb::DB *_db; rocksdb::DB *_db;
RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize, RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db); int recvQueueSize, int sendMsgSize, int recvMsgSize, rocksdb::DB *_db);
void createQueuePair(); void createQueuePair();
void processCmEvent(struct rdma_cm_event *event); void processCmEvent(struct rdma_cm_event *event);
void processCqEvent(struct ibv_wc wc); void processCqEvent(struct ibv_wc wc);
void *getBuffer();
void processSendCompletion(struct ibv_wc *data); void processSendCompletion(struct ibv_wc *data);
void processRecvCompletion(struct ibv_wc *data); void processRecvCompletion(struct ibv_wc *data);
void processDelete(struct MessageHeader *req); void processDelete(struct MessageHeader *req);
......
...@@ -102,7 +102,7 @@ void RdmaSalEndpoint::createQueuePair() ...@@ -102,7 +102,7 @@ void RdmaSalEndpoint::createQueuePair()
{ {
char *location = _sendBuff + i * _sendMsgSize; char *location = _sendBuff + i * _sendMsgSize;
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)location); _sendBuffers.push((void*)location);
} }
_state = CONN_STATE_RESOURCES_ALLOCATED; _state = CONN_STATE_RESOURCES_ALLOCATED;
} }
...@@ -188,15 +188,15 @@ int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size) ...@@ -188,15 +188,15 @@ int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size)
{ {
if (size > (uint32_t)_sendMsgSize) if (size > (uint32_t)_sendMsgSize)
return -1; return -1;
char *sendBuffer = nullptr; void *sendBuffer = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM); sendBuffer = getBuffer();
if (_sendBuffers.size() == 0) if(sendBuffer == nullptr)
{
CPPLog::LOG_ERROR("Overflow : No send Buffer available");
return -1; return -1;
sendBuffer = _sendBuffers.front(); }
_sendBuffers.pop(); memcpy(sendBuffer, buffer, size);
lock.unlock(); int ret = rdma_post_send(_cm_id, sendBuffer, (void *)sendBuffer, size, _sendMr, 0);
memcpy((void *)sendBuffer, buffer, size);
int ret = rdma_post_send(_cm_id, (void *)sendBuffer, (void *)sendBuffer, size, _sendMr, 0);
if (ret == -1) if (ret == -1)
{ {
std::ostringstream ss; std::ostringstream ss;
...@@ -225,12 +225,12 @@ void RdmaSalEndpoint::processDelete(struct MessageHeader *req) ...@@ -225,12 +225,12 @@ void RdmaSalEndpoint::processDelete(struct MessageHeader *req)
{ {
rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize}); rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize});
void *sendBuf = nullptr; void *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM); sendBuf = getBuffer();
if (_sendBuffers.size() == 0) if(sendBuf == nullptr)
{
CPPLog::LOG_ERROR("Overflow : No send Buffer available");
return; return;
sendBuf = _sendBuffers.front(); }
_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf; MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->type = MessageType::FAILURE; response->type = MessageType::FAILURE;
...@@ -247,21 +247,29 @@ void RdmaSalEndpoint::processDelete(struct MessageHeader *req) ...@@ -247,21 +247,29 @@ void RdmaSalEndpoint::processDelete(struct MessageHeader *req)
CPPLog::LOG_ERROR(ss); CPPLog::LOG_ERROR(ss);
} }
} }
void *RdmaSalEndpoint::getBuffer()
void RdmaSalEndpoint::processGet(struct MessageHeader *req)
{ {
char *sendBuf = nullptr; void *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
return nullptr;
sendBuf = _sendBuffers.front();
_sendBuffers.pop();
return sendBuf;
}
void RdmaSalEndpoint::processGet(struct MessageHeader *req)
{
void *sendBuffer = nullptr;
sendBuffer = getBuffer();
if(sendBuffer == nullptr)
{ {
CPPLog::LOG_ERROR("Overflow : No send Buffer available");
return; return;
} }
sendBuf = _sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
std::string value; std::string value;
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + MessageHeaderSize, req->keySize}, &value); rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + MessageHeaderSize, req->keySize}, &value);
MessageHeader *response = (MessageHeader *)sendBuf; MessageHeader *response = (MessageHeader *)sendBuffer;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->type = MessageType::FAILURE; response->type = MessageType::FAILURE;
response->id = req->id; response->id = req->id;
...@@ -271,7 +279,7 @@ void RdmaSalEndpoint::processGet(struct MessageHeader *req) ...@@ -271,7 +279,7 @@ void RdmaSalEndpoint::processGet(struct MessageHeader *req)
response->valueSize = value.size(); response->valueSize = value.size();
memcpy(response + MessageHeaderSize, value.c_str(), value.size()); memcpy(response + MessageHeaderSize, value.c_str(), value.size());
} }
int ret = rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize + value.size(), _sendMr, 0); int ret = rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + value.size(), _sendMr, 0);
if (ret == -1) if (ret == -1)
{ {
std::ostringstream ss; std::ostringstream ss;
...@@ -284,23 +292,21 @@ void RdmaSalEndpoint::processPut(struct MessageHeader *req) ...@@ -284,23 +292,21 @@ void RdmaSalEndpoint::processPut(struct MessageHeader *req)
{ {
rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize}, rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize},
{(char *)req + MessageHeaderSize + req->keySize, req->valueSize}); {(char *)req + MessageHeaderSize + req->keySize, req->valueSize});
char *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM); void *sendBuffer = nullptr;
if (_sendBuffers.size() == 0) sendBuffer = getBuffer();
if(sendBuffer == nullptr)
{ {
CPPLog::LOG_ERROR("No send Buffer"); CPPLog::LOG_ERROR("Overflow : No send Buffer available");
return; return;
} }
sendBuf = _sendBuffers.front(); MessageHeader *response = (MessageHeader *)sendBuffer;
_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->type = MessageType::FAILURE; response->type = MessageType::FAILURE;
response->id = req->id; response->id = req->id;
if (s.ok()) if (s.ok())
response->type = MessageType::SUCCESS; response->type = MessageType::SUCCESS;
int ret = rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0); int ret = rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize, _sendMr, 0);
if (ret == -1) if (ret == -1)
{ {
std::ostringstream ss; std::ostringstream ss;
......
...@@ -240,6 +240,8 @@ void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool loca ...@@ -240,6 +240,8 @@ void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool loca
CPPLog::LOG_ERROR(ss); CPPLog::LOG_ERROR(ss);
return; return;
} }
if(it->second->_ep == nullptr)
std::cout<<"Null Endpoint in processing replication done\n";
// std::cout<<"rep done\n"; // std::cout<<"rep done\n";
// For local write we do not increase repdone count // For local write we do not increase repdone count
if (!localWrite) if (!localWrite)
...@@ -254,16 +256,15 @@ void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool loca ...@@ -254,16 +256,15 @@ void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool loca
cqEventlock.unlock(); cqEventlock.unlock();
RdmaSalEndpoint *ep = it->second->_ep; RdmaSalEndpoint *ep = it->second->_ep;
char *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(ep->_sendBuffersM); void *sendBuf = nullptr;
if (ep->_sendBuffers.size() == 0) sendBuf = ep->getBuffer();
if(sendBuf == nullptr)
{ {
CPPLog::LOG_ERROR("No send Buffer"); CPPLog::LOG_ERROR("Overflow : No send Buffer available");
return; return;
} }
sendBuf = ep->_sendBuffers.front();
ep->_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf; MessageHeader *response = (MessageHeader *)sendBuf;
response->id = id; response->id = id;
response->type = type; response->type = type;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment