Commit 071a78a4 authored by Paras Garg's avatar Paras Garg

Added wait for replication event

parent 29571b95
#ifndef __CQEVENTDATA__
#define __CQEVENTDATA__
#include "RdmaSalEndpoint.hpp"
class CQEventData
{
public:
int _repDone{0};
int _repRequired{0};
RdmaSalEndpoint* _ep{nullptr};
CQEventData(int repRequired,RdmaSalEndpoint* ep)
: _repRequired(repRequired),_ep(ep)
{
std::cout<<"created\n";
}
~CQEventData()
{
std::cout<<"deleted\n";
}
};
#endif
\ No newline at end of file
......@@ -2,7 +2,6 @@
#define __Executor__
#include <vector>
#include "CqEventData.hpp"
#include "RdmaEndpointGroup.hpp"
#include "TaskThread.hpp"
#include "Logger.hpp"
......
......@@ -48,7 +48,7 @@ public:
void connect(const char *ip, const char *port, const char *connData);
bool isConnected();
void processCmEvent(struct rdma_cm_event *event);
void createResources(struct ibv_cq *cq);
void createQueuePair(struct ibv_cq *cq);
void close();
};
#endif
\ No newline at end of file
......@@ -13,7 +13,6 @@
#include <mutex>
#include <shared_mutex>
#include "CqEventData.hpp"
#include "Logger.hpp"
class RdmaEndpoint
......@@ -44,7 +43,7 @@ public:
RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize);
void createResources();
void createQueuePair();
void processCmEvent(struct rdma_cm_event *event);
void close();
virtual void processSendCompletion(struct ibv_wc *data) = 0;
......
......@@ -39,7 +39,7 @@ public:
RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db);
void createResources();
void createQueuePair();
void processCmEvent(struct rdma_cm_event *event);
void processCqEvent(struct ibv_wc wc);
void processSendCompletion(struct ibv_wc *data);
......
......@@ -15,7 +15,6 @@
#include "RdmaSalEndpoint.hpp"
#include "RdmaServerRepEndpoint.hpp"
#include "RdmaEndpointGroup.hpp"
#include "CqEventData.hpp"
#include "Executor.hpp"
#include "RdmaCmProcessor.hpp"
#include "RdmaCqProcessor.hpp"
......
......@@ -25,7 +25,7 @@ public:
RdmaServerRepEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db);
void createResources();
void createQueuePair();
void processCmEvent(struct rdma_cm_event *event);
void processCqEvent(struct ibv_wc wc);
void close();
......
......@@ -4,6 +4,7 @@
#include <iostream>
#include <queue>
#include <pthread.h>
#include<memory>
#include "Runnable.hpp"
#include "CqEventData.hpp"
......@@ -12,6 +13,7 @@
#include "ConcurrentQueue.hpp"
#include "RdmaRepEndpoint.hpp"
#include "RdmaSalEndpoint.hpp"
#include "CqEventData.hpp"
class TaskThread
{
......@@ -24,6 +26,7 @@ private:
std::unordered_map<uint32_t, RdmaRepEndpoint *> *_clientRepMap;
std::unordered_map<uint32_t, RdmaRepEndpoint *> *_serverRepMap;
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_salMap;
std::unordered_map<uint32_t,std::shared_ptr<CQEventData>> cqEvents;
public:
TaskThread(int id, int cpu, ConcurrentQueue *,
......@@ -36,6 +39,7 @@ public:
void stop();
void processEvent(RdmaSalEndpoint* ep,struct ibv_wc *data);
void processRepEvent(RdmaRepEndpoint* ep,struct ibv_wc *data);
void processReplicationDone(uint32_t reqid);
~TaskThread();
};
......
......@@ -2,7 +2,7 @@
void ConcurrentQueue::push(struct ibv_wc *const &data)
{
std::unique_lock<std::mutex> lock(queueMutex);
std::cout << "putting data\n";
//std::cout << "putting data\n";
queue1.push(data);
lock.unlock();
queueCv.notify_one();
......
......@@ -31,6 +31,7 @@ void RdmaClientRepEndpoint::connect(const char *ip, const char *port, const char
if (ret)
{
CPPLog::LOG_ERROR("RdmaClientRepEndpoint : get_addr_info failed");
exit(1);
}
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step2 resolve addr");
......@@ -38,7 +39,7 @@ void RdmaClientRepEndpoint::connect(const char *ip, const char *port, const char
if (ret)
{
CPPLog::LOG_ERROR("unable to resolve addr");
//return;
exit(1);
}
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step2 resolve addr resolved");
_state = CONN_STATE_ADDR_RESOLVED;
......@@ -96,16 +97,18 @@ void RdmaClientRepEndpoint::close()
{
if (_state != CONN_STATE_CONNECTED)
{
std::cout << "RdmaClientRepEndpoint : close invalid state" << std::endl;
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : close invalid state");
}
_state = CONN_STATE_PARTIAL_CLOSED;
int ret = rdma_disconnect(_cm_id);
if (ret)
{
std::cout << "RdmaClientRepEndpoint : rdma_disconnect failed" << std::endl;
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : rdma_disconnect failed");
}
delete[](char *) _sendBuff;
delete[](char *) _recvBuff;
if(_sendBuff != nullptr)
delete[](char *) _sendBuff;
if(_recvBuff != nullptr)
delete[](char *) _recvBuff;
rdma_dereg_mr(_sendMr);
rdma_dereg_mr(_recvMr);
rdma_destroy_qp(_cm_id);
......@@ -136,34 +139,34 @@ void RdmaClientRepEndpoint::registerMemory()
{
if (_state != CONN_STATE_ROUTE_RESOLVED)
{
std::cout << "RdmaClientRepEndpoint : createResource address not resolved" << std::endl;
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : createQueuePair address not resolved");
return;
}
_sendBuff = new char[_sendMsgSize * _sendQueueSize];
if (_sendBuff == NULL)
{
std::cout << "RdmaClientRepEndpoint : sendBuff allocation failed" << std::endl;
CPPLog::LOG_DEBUG( "RdmaClientRepEndpoint : sendBuff allocation failed" );
return;
}
_sendMr = rdma_reg_msgs(_cm_id, _sendBuff, _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL)
{
std::cout << "RdmaClientRepEndpoint : sendMr reg failed" << std::endl;
CPPLog::LOG_DEBUG( "RdmaClientRepEndpoint : sendMr reg failed");
return;
}
_recvBuff = new char[_recvMsgSize * _recvQueueSize];
if (_recvBuff == NULL)
{
std::cout << "RdmaClientRepEndpoint : recvBuff allocation failed" << std::endl;
CPPLog::LOG_DEBUG( "RdmaClientRepEndpoint : recvBuff allocation failed");
return;
}
_recvMr = rdma_reg_msgs(_cm_id, _recvBuff, _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL)
{
std::cout << "RdmaClientRepEndpoint : recvMr reg failed" << std::endl;
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : recvMr reg failed");
return;
}
char *buffer = (char *)_recvBuff;
......@@ -184,17 +187,17 @@ void RdmaClientRepEndpoint::registerMemory()
_state = CONN_STATE_RESOURCES_ALLOCATED;
}
void RdmaClientRepEndpoint::createResources(struct ibv_cq *cq)
void RdmaClientRepEndpoint::createQueuePair(struct ibv_cq *cq)
{
if (_state != CONN_STATE_ADDR_RESOLVED)
{
std::cout << "RdmaClientRepEndpoint : createResource address not resolved" << std::endl;
CPPLog::LOG_DEBUG( "RdmaClientRepEndpoint : createQueuePair address not resolved");
return;
}
_protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL)
{
std::cout << "RdmaClientRepEndpoint : ibv_alloc_pd failed " << std::endl;
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : ibv_alloc_pd failed " );
return;
}
struct ibv_cq *completionQueue = cq;
......@@ -222,11 +225,11 @@ void RdmaClientRepEndpoint::createResources(struct ibv_cq *cq)
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret)
{
std::cout << "RdmaClientRepEndpoint : ibv_create_cq failed\n";
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : ibv_create_cq failed");
}
if (_cm_id->pd == NULL)
{
std::cout << "RdmaClientRepEndpoint : pd not set" << std::endl;
CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : pd not set");
_cm_id->pd = _protectionDomain;
}
......
......@@ -13,7 +13,7 @@ RdmaEndpoint::RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue
{
_state = CONN_STATE_INITIALIZED;
}
void RdmaEndpoint::createResources()
void RdmaEndpoint::createQueuePair()
{
/* These states are used to avoid errors in lifetime of rdma connection
* more erros can be tracked in future using these lifecycle states
......
#include "RdmaRepEndpoint.hpp"
RdmaRepEndpoint::RdmaRepEndpoint(struct rdma_cm_id *id, int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: _cm_id(id), _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _sendMsgSize(sendMsgSize),
_recvMsgSize(recvMsgSize), _db(db)
{
}
void RdmaRepEndpoint::processSendCompletion(struct ibv_wc *data)
......@@ -19,16 +17,30 @@ void RdmaRepEndpoint::processSendCompletion(struct ibv_wc *data)
int RdmaRepEndpoint::sendMessage(const char *buffer, uint32_t size)
{
if (size > (uint32_t)_sendMsgSize)
{
std::ostringstream ss;
ss << "Large Message size " << size;
ss << " send buffer size " << _sendMsgSize;
CPPLog::LOG_ERROR(ss);
return -1;
}
char *sendBuffer = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0)
return -1;
sendBuffer = (char*)_sendBuffers.front();
sendBuffer = (char *)_sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
memcpy((void *)sendBuffer, buffer, size);
return rdma_post_send(_cm_id, (void *)sendBuffer, (void *)sendBuffer, size, _sendMr, 0);
int ret = rdma_post_send(_cm_id, (void *)sendBuffer, (void *)sendBuffer, size, _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
return ret;
}
return ret;
}
void RdmaRepEndpoint::processRecvCompletion(struct ibv_wc *data)
......@@ -45,7 +57,7 @@ void RdmaRepEndpoint::processRecvCompletion(struct ibv_wc *data)
processPut(req);
delete[] request;
}
/*since RepEndpoint need not to wait for response from other servers etc we will send the response immediately*/
void RdmaRepEndpoint::processDelete(struct MessageHeader *req)
{
rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize});
......@@ -64,7 +76,13 @@ void RdmaRepEndpoint::processDelete(struct MessageHeader *req)
{
response->type = MessageType::SUCCESS;
}
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
int ret = rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
void RdmaRepEndpoint::processGet(struct MessageHeader *req)
......@@ -75,7 +93,7 @@ void RdmaRepEndpoint::processGet(struct MessageHeader *req)
{
return;
}
sendBuf = (char*)_sendBuffers.front();
sendBuf = (char *)_sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
std::string value;
......@@ -90,7 +108,13 @@ void RdmaRepEndpoint::processGet(struct MessageHeader *req)
response->valueSize = value.size();
memcpy(response + MessageHeaderSize, value.c_str(), value.size());
}
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize + value.size(), _sendMr, 0);
int ret = rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize + value.size(), _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
void RdmaRepEndpoint::processPut(struct MessageHeader *req)
......@@ -104,7 +128,7 @@ void RdmaRepEndpoint::processPut(struct MessageHeader *req)
CPPLog::LOG_ERROR("No send Buffer");
return;
}
sendBuf = (char*)_sendBuffers.front();
sendBuf = (char *)_sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf;
......@@ -113,5 +137,11 @@ void RdmaRepEndpoint::processPut(struct MessageHeader *req)
response->id = req->id;
if (s.ok())
response->type = MessageType::SUCCESS;
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
int ret = rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
......@@ -5,12 +5,12 @@ RdmaRepEndpointGroup::RdmaRepEndpointGroup(int sendQueueSize, int recvQueueSize,
: _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize), _sendMsgSize(sendMsgSize),
_recvMsgSize(recvMsgSize), _maxInLine(maxInLine), _timeoutMs(timeout), _db(db)
{
std::cout << "RdmaRepEndpointGroup : Step 1 creating event channel" << std::endl;
CPPLog::LOG_ALWAYS("RdmaRepEndpointGroup : Step 1 creating event channel");
_eventChannel = rdma_create_event_channel();
_stopCMThread = false;
if (_eventChannel == NULL)
{
std::cout << "RdmaRepEndpointGroup : error creating event channel";
CPPLog::LOG_ALWAYS("RdmaRepEndpointGroup : error creating event channel");
}
_qpRepEndpointMap = new std::unordered_map<uint32_t, RdmaRepEndpoint *>();
}
......@@ -36,32 +36,34 @@ void RdmaRepEndpointGroup::processCmEvents()
{
int ret;
struct rdma_cm_event *event;
std::cout << "RdmaRepEndpointGroup : starting cm processing thread" << std::endl;
CPPLog::LOG_ALWAYS( "RdmaRepEndpointGroup : starting cm processing thread" );
while (!_stopCMThread)
{
ret = rdma_get_cm_event(_eventChannel, &event);
if (ret)
{
std::cout << "RdmaRepEndpointGroup : rdma_get_cm_event failed" << std::endl;
CPPLog::LOG_ALWAYS("RdmaRepEndpointGroup : rdma_get_cm_event failed" );
continue;
}
processCmEvent(event);
ret = rdma_ack_cm_event(event);
if (ret)
{
std::cout << "RdmaRepEndpointGroup : rdma_ack_cm_event failed";
CPPLog::LOG_ALWAYS( "RdmaRepEndpointGroup : rdma_ack_cm_event failed");
}
}
}
void RdmaRepEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{
std::cout << "RdmaRepEndpointGroup : event" << rdma_event_str(event->event) << std::endl;
std::ostringstream ss;
ss << "RdmaRepEndpointGroup : event" << rdma_event_str(event->event) << std::endl;
CPPLog::LOG_ALWAYS(ss);
if (event->id != NULL && event->id->context != NULL)
{
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL)
{
((RdmaClientRepEndpoint *)event->id->context)->createResources(createCq(event->id));
((RdmaClientRepEndpoint *)event->id->context)->createQueuePair(createCq(event->id));
}
((RdmaClientRepEndpoint *)event->id->context)->processCmEvent(event);
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED)
......@@ -72,14 +74,17 @@ void RdmaRepEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{
if (_qpRepEndpointMap->find(event->id->qp->qp_num) != _qpRepEndpointMap->end())
_qpRepEndpointMap->erase(event->id->qp->qp_num);
//Remove request related to this endpoint from qp
delete ((RdmaRepEndpoint *)event->id->context);
}
}
else
{
std::cout << "RdmaRepEndpointGroup : Not able to procces CM EVent";
std::cout << rdma_event_str(event->event) << event->id << " ";
std::cout << event->listen_id << std::endl;
std::ostringstream ss;
ss << "RdmaRepEndpointGroup : Not able to procces CM EVent";
ss << rdma_event_str(event->event) << event->id << " ";
ss << event->listen_id << std::endl;
CPPLog::LOG_ALWAYS(ss);
}
}
......@@ -88,7 +93,7 @@ RdmaClientRepEndpoint *RdmaRepEndpointGroup::createEndpoint()
struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
if (ret == -1)
std::cout << "CMProcesor : rdma_create_id failed" << std::endl;
CPPLog::LOG_ALWAYS( "CMProcesor : rdma_create_id failed");
RdmaClientRepEndpoint *endpoint = new RdmaClientRepEndpoint(id, _sendQueueSize, _recvQueueSize, _sendMsgSize,
_recvMsgSize, _maxInLine, _timeoutMs, _db);
id->context = (void *)endpoint;
......
......@@ -5,7 +5,6 @@ int RdmaSalEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2;
int RdmaSalEndpoint::CONN_STATE_CONNECTED = 3;
int RdmaSalEndpoint::CONN_STATE_CLOSED = 4;
RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: _cm_id(id), _completionQueue(completionQueue), _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize),
......@@ -13,7 +12,7 @@ RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completio
{
_state = CONN_STATE_INITIALIZED;
}
void RdmaSalEndpoint::createResources()
void RdmaSalEndpoint::createQueuePair()
{
/* These states are used to avoid errors in lifetime of rdma connection
* more erros can be tracked in future using these lifecycle states
......@@ -87,8 +86,14 @@ void RdmaSalEndpoint::createResources()
for (int i = 0; i < _recvQueueSize; i++)
{
char *location = _recvBuff + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr);
int ret = rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_recv error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
/*
* Adding buffers to queue for receving data
......@@ -146,14 +151,16 @@ void RdmaSalEndpoint::close()
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed");
}
delete[] _sendBuff;
if (_sendBuff != nullptr)
delete[] _sendBuff;
ret = rdma_dereg_mr(_recvMr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed");
}
delete[] _recvBuff;
if (_recvBuff != nullptr)
delete[] _recvBuff;
rdma_destroy_qp(_cm_id);
......@@ -189,7 +196,14 @@ int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size)
_sendBuffers.pop();
lock.unlock();
memcpy((void *)sendBuffer, buffer, size);
return rdma_post_send(_cm_id, (void *)sendBuffer, (void *)sendBuffer, size, _sendMr, 0);
int ret = rdma_post_send(_cm_id, (void *)sendBuffer, (void *)sendBuffer, size, _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
return ret;
}
void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data)
......@@ -225,7 +239,13 @@ void RdmaSalEndpoint::processDelete(struct MessageHeader *req)
{
response->type = MessageType::SUCCESS;
}
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
int ret = rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
void RdmaSalEndpoint::processGet(struct MessageHeader *req)
......@@ -251,7 +271,13 @@ void RdmaSalEndpoint::processGet(struct MessageHeader *req)
response->valueSize = value.size();
memcpy(response + MessageHeaderSize, value.c_str(), value.size());
}
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize + value.size(), _sendMr, 0);
int ret = rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize + value.size(), _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
void RdmaSalEndpoint::processPut(struct MessageHeader *req)
......@@ -274,5 +300,11 @@ void RdmaSalEndpoint::processPut(struct MessageHeader *req)
response->id = req->id;
if (s.ok())
response->type = MessageType::SUCCESS;
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
int ret = rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
......@@ -62,7 +62,7 @@ void RdmaServerEndpointGroup::bind(const char *ip, const char *port, int backlog
{
int ret;
std::ostringstream ss;
ss<<"RdmaServerEndpointGroup : Step 2 bind_addr"<<ip<<" "<<port<<"\n";
ss << "RdmaServerEndpointGroup : Step 2 bind_addr" << ip << " " << port;
CPPLog::LOG_ALWAYS(ss);
struct addrinfo *addr;
ret = getaddrinfo(ip, port, NULL, &addr);
......@@ -97,10 +97,8 @@ struct ibv_cq *RdmaServerEndpointGroup::createCq(struct rdma_cm_id *id)
return _cqProcessor->getCq();
}
void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event)
{
CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : step 4 Got Connect Request Sal Endpoint");
/*
* create and add to vectors for replication and invalidation processing
* connData is used to identify whether connection came for client or from other Sal;
......@@ -110,24 +108,26 @@ void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event)
if (strcmp(connData, "sal") == 0)
{
CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : step 4 Got Connect Request Sal Endpoint");
RdmaSalEndpoint *endpoint = nullptr;
endpoint = new RdmaSalEndpoint(event->id, createCq(event->id), _sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize, _db);
event->id->context = (void *)endpoint;
endpoint->createResources();
endpoint->createQueuePair();
// std::unique_lock lock(_salMutex);
std::cout<<"Sal Map Size"<<_qpSalEndpointMap->size()<<"qp "<<event->id->qp->qp_num<<"\n";
std::cout << "Sal Map Size" << _qpSalEndpointMap->size() << "qp " << event->id->qp->qp_num << "\n";
_qpSalEndpointMap->emplace(event->id->qp->qp_num, endpoint);
std::cout<<"Sal Map Size"<<_qpSalEndpointMap->size()<<"\n";
std::cout << "Sal Map Size" << _qpSalEndpointMap->size() << "\n";
}
else
{
CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : step 4 Got Connect Request follower Endpoint");
RdmaServerRepEndpoint *endpoint = nullptr;
endpoint = new RdmaServerRepEndpoint(event->id, createCq(event->id), _sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize, _db);
event->id->context = (void *)endpoint;
endpoint->createResources();
endpoint->createQueuePair();
// std::unique_lock lock(_salMutex);
_qpRepEndpointMap->emplace(event->id->qp->qp_num, endpoint);
}
......@@ -149,16 +149,21 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{
createEpCmEvent(event);
}
/*
* Event came for server on listen cm_id
*/
else if (event->id != NULL && _cm_id == event->id)
{
if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
std::cout << "RdmaServerEndpointGroup : Disconnect Sal Endpoint" << std::endl;
CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : Disconnect Sal Endpoint");
close();
}
else
{
std::cout << "RdmaServerEndpointGroup : unknown Event for listener" << rdma_event_str(event->event) << std::endl;
std::ostringstream ss;
ss << "RdmaServerEndpointGroup : unknown Event for listener" << rdma_event_str(event->event) << std::endl;
CPPLog::LOG_ALWAYS(ss);
}
}
/*
......@@ -193,6 +198,10 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
// std::unique_lock lock(_salMutex);
/*
* Since we are processing event on group after endpoint had processed it .
* it is safe to assume that endpoint has been closed already we just need to delete the endpoint
*/
auto it = _qpSalEndpointMap->find(qp);
if (it != _qpSalEndpointMap->end())
{
......@@ -235,7 +244,7 @@ void RdmaServerEndpointGroup::close()
delete ((RdmaSalEndpoint *)it->second);
}
delete _qpSalEndpointMap;
for (auto it = _qpRepEndpointMap->begin(); it != _qpRepEndpointMap->end(); it++)
{
((RdmaServerRepEndpoint *)it->second)->close();
......
......@@ -12,7 +12,7 @@ RdmaServerRepEndpoint::RdmaServerRepEndpoint(struct rdma_cm_id *id, struct ibv_c
_state = CONN_STATE_INITIALIZED;
}
void RdmaServerRepEndpoint::createResources()
void RdmaServerRepEndpoint::createQueuePair()
{
/* These states are used to avoid errors in lifetime of rdma connection
* more erros can be tracked in future using these lifecycle states
......
......@@ -47,17 +47,17 @@ inline void *TaskThread::run(void *object)
*/
TaskThread *thread = reinterpret_cast<TaskThread *>(object);
std::ostringstream ss;
ss<<"Running task thread"<<thread->_id;
ss << "Running task thread" << thread->_id;
CPPLog::LOG_ALWAYS(ss);
while (!thread->_stop)
{
struct ibv_wc *data = NULL;
//std::cout << "Get start\n";
// std::cout << "Get start\n";
data = thread->_taskQueue->try_pop();
if (data != NULL)
{
//std::cout << "TaskThread:: got data";
if (data == NULL || data->status != IBV_WC_SUCCESS)
// std::cout << "TaskThread:: got data";
if (data->status != IBV_WC_SUCCESS)
{
std::ostringstream ss;
ss << "TaskThread : failed work completion : ";
......@@ -72,6 +72,7 @@ inline void *TaskThread::run(void *object)
{
salEp = it->second;
}
// Now we have to check whether event came for replication group or not
auto it2 = thread->_clientRepMap->find(data->qp_num);
if (it2 != thread->_clientRepMap->end())
{
......@@ -80,15 +81,15 @@ inline void *TaskThread::run(void *object)
else
{
auto it3 = thread->_serverRepMap->find(data->qp_num);
if(it3 != thread->_serverRepMap->end())
if (it3 != thread->_serverRepMap->end())
repEp = it3->second;
}
if (salEp == nullptr && repEp == nullptr)
{
std::ostringstream ss;
ss<<"RdmaSal : endpoint not registered for qp"<<data->qp_num;
ss << "RdmaSal : endpoint not registered for qp" << data->qp_num;
CPPLog::LOG_INFO(ss);
continue;
}
else if (salEp != nullptr)
......@@ -104,7 +105,7 @@ inline void *TaskThread::run(void *object)
void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
{
std::cout<<"processing sal event\n";
std::cout << "processing sal event\n";
/* sal Request*/
switch (data->opcode)
{
......@@ -119,23 +120,31 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
rdma_post_recv(ep->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
ep->_recvMsgSize, ep->_recvMr);
struct MessageHeader *req = (struct MessageHeader *)buffer;
std::cout << "TaskThread 1\n";
std::shared_ptr<CQEventData> cqevent = std::make_shared<CQEventData>(_serverRepMap->size() + _clientRepMap->size(), ep);
uint32_t id = req->id;
/*
* We need to wait for replication and invalidation to be done for delete and put request,hence we are putting data in map
*/
rocksdb::Status s;
switch (req->type)
{
case MessageType::GET:
ep->processGet(req);
break;
case MessageType::DELETE:
replicateSalRequest(buffer, data->byte_len);
ep->processDelete(req);
cqEvents.emplace(id, cqevent);
replicateSalRequest(buffer, data->byte_len);
s = ep->_db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize});
break;
case MessageType::PUT:
replicateSalRequest(buffer, data->byte_len);
ep->processPut(req);
cqEvents.emplace(id, cqevent);
replicateSalRequest(buffer, data->byte_len);
s = ep->_db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize},
{(char *)req + MessageHeaderSize + req->keySize, req->valueSize});;
break;
default:
std::ostringstream ss;
ss<<"SalRequest invalid req type"<<data->opcode;
ss << "SalRequest invalid req type" << data->opcode;
CPPLog::LOG_ERROR(ss);
break;
}
......@@ -152,7 +161,7 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
{
std::cout<<"processing rep event\n";
std::cout << "processing rep event\n";
switch (data->opcode)
{
case IBV_WC_SEND:
......@@ -166,7 +175,6 @@ void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
rdma_post_recv(ep->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
ep->_recvMsgSize, ep->_recvMr);
struct MessageHeader *req = (struct MessageHeader *)buffer;
std::cout << "TaskThread 1\n";
switch (req->type)
{
case MessageType::GET:
......@@ -178,9 +186,15 @@ void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
case MessageType::PUT:
ep->processPut(req);
break;
case MessageType::SUCCESS:
processReplicationDone(req->id);
break;
case MessageType::FAILURE:
processReplicationDone(req->id);
break;
default:
std::ostringstream ss;
ss<<"RepRequest invalid req type"<<data->opcode;
ss << "RepRequest invalid req type" << data->opcode;
CPPLog::LOG_ERROR(ss);
break;
}
......@@ -195,13 +209,55 @@ void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
}
}
void TaskThread::processReplicationDone(uint32_t id)
{
auto it = cqEvents.find(id);
if(it == cqEvents.end())
{
CPPLog::LOG_ERROR("Cqevent not found");
return;
}
std::cout<<"rep done\n";
++(it->second->_repDone);
if(it->second->_repDone < it->second->_repRequired)
return;
RdmaSalEndpoint* ep = it->second->_ep;
char *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(ep->_sendBuffersM);
if (ep->_sendBuffers.size() == 0)
{
CPPLog::LOG_ERROR("No send Buffer");
return;
}
sendBuf = ep->_sendBuffers.front();
ep->_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/
//response->type = MessageType::FAILURE;
//response->id = id;
//if (s.ok())
response->type = MessageType::SUCCESS;
int ret = rdma_post_send(ep->_cm_id, sendBuf, sendBuf, MessageHeaderSize, ep->_sendMr, 0);
if (ret == -1)
{
std::ostringstream ss;
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
void TaskThread::replicateSalRequest(char *req, uint32_t size)
{
/* for replication we will not create a new packet we will just forward the same packet recieved from client*/
auto repIt = _clientRepMap->begin();
for (; repIt != _clientRepMap->end(); repIt++)
{
repIt->second->sendMessage(req, size);
}
auto serverRepIt = _serverRepMap->begin();
for (; serverRepIt != _serverRepMap->end(); serverRepIt++)
{
......@@ -217,7 +273,7 @@ void TaskThread::replicateSalRequest(char *req, uint32_t size)
memcpy(buffer + MessageHeaderSize, salReq + MessageHeaderSize, salReq->keySize);
// Send Invalidation to sal's
auto salIt = _salMap->begin();
for (;salIt != _salMap->end();salIt++)
for (; salIt != _salMap->end(); salIt++)
{
salIt->second->sendMessage(buffer, MessageHeaderSize + salReq->keySize);
}
......
......@@ -50,7 +50,7 @@ int main()
CPPLog::LOG_ERROR(status.ToString().c_str());
exit(1);
}
CPPLog::LOG_ALWAYS("Rocks started");
CPPLog::LOG_ALWAYS("Rocks started\n\n");
Executor *executor = new Executor(executorPoolSize);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment