Commit b954b568 authored by Paras Garg's avatar Paras Garg

Added code for follower

parent 72250328
No preview for this file type
No preview for this file type
...@@ -3,12 +3,59 @@ ...@@ -3,12 +3,59 @@
#include <queue> #include <queue>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <set>
#include <string>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include "Logger.hpp"
class Comparator
{
public:
inline bool operator()(const struct ibv_wc *c1, const struct ibv_wc *c2)
const
{
struct MessageHeader *req1 = (struct MessageHeader *)c1->wr_id;
struct MessageHeader *req2 = (struct MessageHeader *)c2->wr_id;
if (req1->keySize != req2->keySize)
return true;
char *key1 = (char *)req1 + MessageHeaderSize;
char *key2 = (char *)req2 + MessageHeaderSize;
for (uint32_t i = 0; i < req1->keySize; i++)
{
if (key1[i] != key2[i])
return true;
}
return false;
}
};
class ConcurrentQueue
{
private:
std::queue<struct ibv_wc *> queue1;
std::queue<struct ibv_wc *> queue2;
std::mutex queueMutex;
std::set<struct ibv_wc *, Comparator> runningRequests;
std::condition_variable queueCv;
public:
void push(struct ibv_wc *const &data);
bool empty();
struct ibv_wc *try_pop();
void removeFromSet(struct ibv_wc *data);
void wait_and_pop(struct ibv_wc *&popped_value);
};
#endif
/*
template <typename Data> template <typename Data>
class ConcurrentQueue class ConcurrentQueue
{ {
private: private:
std::queue<Data> queue; std::queue1<Data> queue;
std::queue2<Data> queue;
std::mutex queueMutex; std::mutex queueMutex;
std::condition_variable queueCv; std::condition_variable queueCv;
...@@ -16,7 +63,7 @@ public: ...@@ -16,7 +63,7 @@ public:
void push(Data const &data) void push(Data const &data)
{ {
std::unique_lock<std::mutex> lock(queueMutex); std::unique_lock<std::mutex> lock(queueMutex);
queue.push(data); queue1.push(data);
lock.unlock(); lock.unlock();
queueCv.notify_one(); queueCv.notify_one();
} }
...@@ -30,7 +77,7 @@ public: ...@@ -30,7 +77,7 @@ public:
bool try_pop(Data &popped_value) bool try_pop(Data &popped_value)
{ {
std::unique_lock<std::mutex> lock(queueMutex); std::unique_lock<std::mutex> lock(queueMutex);
if (queue.empty()) if (queue2.empty())
{ {
return false; return false;
} }
...@@ -49,3 +96,4 @@ public: ...@@ -49,3 +96,4 @@ public:
} }
}; };
#endif #endif
*/
\ No newline at end of file
#ifndef __Executor__ #ifndef __Executor__
#define __Executor__ #define __Executor__
#include <vector> #include <vector>
#include <map>
#include "CqEventData.hpp" #include "CqEventData.hpp"
#include "ConcurrentQueue.hpp" #include "RdmaEndpointGroup.hpp"
#include "TaskThread.hpp" #include "TaskThread.hpp"
#include "RdmaSalEndpoint.hpp" #include "Logger.hpp"
class Executor class Executor
{ {
int _size{0}; int _size{0};
std::vector<TaskThread *> *_taskThreads{NULL}; std::vector<TaskThread *> _taskThreads{NULL};
ConcurrentQueue<struct ibv_wc *> *_taskQueue{NULL}; ConcurrentQueue *_taskQueue{NULL};
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap{NULL};
public: public:
Executor(int size,std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap); Executor(int size);
void createThreads(std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap,
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap);
void submit(struct ibv_wc *task); void submit(struct ibv_wc *task);
void getTask(); void getTask();
void stop();
}; };
//long affinities[] // long affinities[]
#endif #endif
\ No newline at end of file
#ifndef __MessageFormats__ #ifndef __MessageFormats__
#define __MessageFormats__ #define __MessageFormats__
enum RequestType enum MessageType
{ {
GET, GET = (1u << 0),
PUT, PUT = (1u << 1),
DELETE, DELETE = (1u << 2),
INVALIDATE INVALIDATE = (1u << 3),
}; SUCCESS = (1u << 4),
enum ResponseStatus FAILURE = (1u <<5)
{
SUCCESS,
FAILURE
}; };
struct __attribute__ ((__packed__)) SalRequestHeader struct __attribute__ ((__packed__)) MessageHeader
{ {
uint32_t id; uint32_t id;
enum RequestType type; enum MessageType type;
uint32_t keySize; uint32_t keySize;
uint32_t valueSize; uint32_t valueSize;
}; };
static const uint32_t MessageHeaderSize = sizeof(MessageHeader);
/*
struct __attribute__ ((__packed__)) SalResponseHeader struct __attribute__ ((__packed__)) SalResponseHeader
{ {
uint32_t id; uint32_t id;
enum ResponseStatus status; enum MessageType status;
/* //Note value will be present only in case of response status is success
* Note value will be present only in case of response status is success
*/
uint32_t valueSize; uint32_t valueSize;
}; };
struct __attribute__ ((__packed__)) InvRequestHeader struct __attribute__ ((__packed__)) InvRequestHeader
{ {
uint32_t id; uint32_t id;
enum RequestType type; enum MessageType type;
uint32_t keySize; uint32_t keySize;
}; };
...@@ -42,11 +40,12 @@ struct __attribute__ ((__packed__)) InvRequestHeader ...@@ -42,11 +40,12 @@ struct __attribute__ ((__packed__)) InvRequestHeader
struct __attribute__ ((__packed__)) InvResponseHeader struct __attribute__ ((__packed__)) InvResponseHeader
{ {
uint32_t id; uint32_t id;
enum ResponseStatus status; enum MessageType status; MessageHeader
}; };
static uint32_t SalRequestHeaderSize = sizeof(SalRequestHeader); */
/*
static uint32_t SalResponseHeaderSize = sizeof(SalResponseHeader); static uint32_t SalResponseHeaderSize = sizeof(SalResponseHeader);
static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader); static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader);
static uint32_t InvResponseHeaderSize = sizeof(InvResponseHeader); static uint32_t InvResponseHeaderSize = sizeof(InvResponseHeader);
*/
#endif #endif
\ No newline at end of file
#ifndef __Properties__
#define __Properties__
#include <string>
#include <iostream>
#include <fstream>
#include <map>
class Properties
{
private:
std::map<std::string, std::string> _props;
const std::string _WHITESPACE = " \n\r\t\f\v";
std::string ltrim(const std::string &s);
std::string rtrim(const std::string &s);
std::string trim(const std::string &);
public:
Properties(std::string filename);
std::string getValue(std::string key);
};
#endif
\ No newline at end of file
#ifndef __RDMACLIENTREPENDPOINT__
#define __RDMACLIENTREPENDPOINT__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
...@@ -5,16 +8,18 @@ ...@@ -5,16 +8,18 @@
#include <iostream> #include <iostream>
#include <errno.h> #include <errno.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <mutex>
#ifndef __RDMACLIENTENDPOINT__ #include <atomic>
#define __RDMACLIENTENDPOINT__ #include <shared_mutex>
#include <queue>
#include <rocksdb/db.h>
#include "RdmaRepEndpoint.hpp"
#include "Logger.hpp"
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
#include <boost/lockfree/queue.hpp>
class RdmaClientEndpoint class RdmaClientRepEndpoint : public RdmaRepEndpoint
{ {
static int CONN_STATE_INITIALIZED; static int CONN_STATE_INITIALIZED;
static int CONN_STATE_ADDR_RESOLVED; static int CONN_STATE_ADDR_RESOLVED;
static int CONN_STATE_ROUTE_RESOLVED; static int CONN_STATE_ROUTE_RESOLVED;
...@@ -22,42 +27,28 @@ class RdmaClientEndpoint ...@@ -22,42 +27,28 @@ class RdmaClientEndpoint
static int CONN_STATE_CONNECTED; static int CONN_STATE_CONNECTED;
static int CONN_STATE_PARTIAL_CLOSED; static int CONN_STATE_PARTIAL_CLOSED;
static int CONN_STATE_CLOSED; static int CONN_STATE_CLOSED;
int _state{0};
struct rdma_cm_id *_cm_id = NULL; int _maxInLine{0};
struct ibv_pd *_protectionDomain; int _timeoutMs{0};
int _retryCount{0};
int _maxRetryCount{2};
int _sendQueueSize;
int _recvQueueSize;
int _sendMsgSize;
int _recvMsgSize;
int _state;
int _timeoutMs;
int _maxInLine;
const char *_connData; const char *_connData;
void *_sendBuff = NULL;
void *_recvBuff = NULL;
struct ibv_mr *_sendMr = NULL;
struct ibv_mr *_recvMr = NULL;
boost::lockfree::queue<void *> *_sendBuffers;
void completeClose();
void connect();
void registerMemory(); void registerMemory();
public: public:
std::atomic<uint64_t> _requestId{12}; std::atomic<uint64_t> _requestId{12};
RdmaClientEndpoint(struct rdma_cm_id *id, int sendQueueSize, int recvQueueSize, RdmaClientRepEndpoint(struct rdma_cm_id *id, int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, int maxInLine, int timeout); int sendMsgSize, int recvMsgSize, int maxInLine, int timeout, rocksdb::DB *_db);
void connect();
void connect(const char *ip, const char *port, const char *connData); void connect(const char *ip, const char *port, const char *connData);
bool isConnected(); bool isConnected();
void processCmEvent(struct rdma_cm_event *event); void processCmEvent(struct rdma_cm_event *event);
void createResources(struct ibv_cq *cq); void createResources(struct ibv_cq *cq);
void close(); void close();
int sendMessage(const char *buffer, int size);
void processSendComp(struct ibv_wc);
void processRecvComp(struct ibv_wc);
}; };
#endif #endif
\ No newline at end of file
#ifndef __RDMACMPROCESSOR__
#define __RDMACMPROCESSOR__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <iostream>
#include "Logger.hpp"
#include "RdmaEndpointGroup.hpp"
class RdmaCmProcessor
{
struct rdma_event_channel *_eventChannel{NULL};
std::thread *_cmEventThread{NULL};
RdmaEndpointGroup *_endpointGroup{NULL};
bool _stop{false};
public:
RdmaCmProcessor(RdmaEndpointGroup *group);
struct rdma_cm_id *createId();
void processCmEvent();
void start(bool newThread);
void close();
};
#endif
\ No newline at end of file
#ifndef __RDMACQPROCESSOR__
#define __RDMACQPROCESSOR__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
...@@ -5,30 +8,23 @@ ...@@ -5,30 +8,23 @@
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#ifndef __RDMACQPROCESSOR__ #include "Executor.hpp"
#define __RDMACQPROCESSOR__ #include "Logger.hpp"
#include "RdmaClientEndpoint.hpp"
class RdmaCqProcessor class RdmaCqProcessor
{ {
public: public:
struct ibv_comp_channel *_compChannel; struct ibv_comp_channel *_compChannel{NULL};
struct ibv_cq *_completionQueue; struct ibv_cq *_completionQueue{NULL};
std::thread *_compQueueThread; std::thread *_compQueueThread{NULL};
std::unordered_map<uint32_t, RdmaClientEndpoint *> *_qpEndpointMap{NULL}; bool _stop{false};
Executor *_executor{NULL};
RdmaCqProcessor(ibv_context *verbs, int compQueueSize); RdmaCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize);
struct ibv_cq *getCq(); struct ibv_cq *getCq();
void start(); void start();
void processCQEvents(); void processCQEvents();
void dispatchCqEvents(ibv_wc *wc_array, int size);
void close(); void close();
void registerEp(uint64_t qpum, RdmaClientEndpoint* ep);
}; };
#endif #endif
// https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/
// https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html
// https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.html
\ No newline at end of file
#ifndef __RDMAENDPOINT__ #ifndef __RDMAENDPOINT__
#define __RDMAENDPOINT__ #define __RDMAENDPOINT__
#include <iostream> #include <iostream>
#include <boost/lockfree/queue.hpp> #include <queue>
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
...@@ -12,7 +12,9 @@ ...@@ -12,7 +12,9 @@
#include <vector> #include <vector>
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include "CqEventData.hpp" #include "CqEventData.hpp"
#include "Logger.hpp"
class RdmaEndpoint class RdmaEndpoint
{ {
...@@ -36,15 +38,17 @@ public: ...@@ -36,15 +38,17 @@ public:
char *_recvBuff{NULL}; char *_recvBuff{NULL};
struct ibv_mr *_sendMr{NULL}; struct ibv_mr *_sendMr{NULL};
struct ibv_mr *_recvMr{NULL}; struct ibv_mr *_recvMr{NULL};
boost::lockfree::queue<void *> *_sendBuffers{NULL}; std::queue<char*> _sendBuffers;
std::mutex _sendBuffersM;
RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize, RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize); int recvQueueSize, int sendMsgSize, int recvMsgSize);
void createResources(); void createResources();
void processCmEvent(struct rdma_cm_event *event); void processCmEvent(struct rdma_cm_event *event);
void clientClose(); void close();
virtual void processSendCompletion(struct ibv_wc *data) = 0; virtual void processSendCompletion(struct ibv_wc *data) = 0;
virtual void processRecvCompletion(struct ibv_wc *data) = 0; virtual void processRecvCompletion(struct ibv_wc *data) = 0;
virtual ~RdmaEndpoint();
}; };
#endif #endif
\ No newline at end of file
#ifndef __RDMA_ENDPOINT_GROUP__
#define __RDMA_ENDPOINT_GROUP__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <netdb.h>
#include <arpa/inet.h>
#include <map>
#include <vector>
#include <mutex>
#include <shared_mutex>
#include "RdmaSalEndpoint.hpp"
#include "RdmaRepEndpoint.hpp"
#include "Logger.hpp"
class RdmaEndpointGroup
{
public:
std::vector<RdmaSalEndpoint *> _salEps;
std::unordered_map<uint32_t, RdmaSalEndpoint *> _qpSalEndpointMap;
virtual void processCmEvent(struct rdma_cm_event *event) = 0;
};
#endif
\ No newline at end of file
#ifndef __RDMAREPENDPOINT__
#define __RDMAREPENDPOINT__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <netdb.h>
#include <iostream>
#include <errno.h>
#include <arpa/inet.h>
#include <mutex>
#include <atomic>
#include <shared_mutex>
#include <queue>
#include <rocksdb/db.h>
#include "Logger.hpp"
#include "MessageFormats.hpp"
class RdmaRepEndpoint
{
public:
struct rdma_cm_id *_cm_id{NULL};
struct ibv_pd *_protectionDomain;
int _sendQueueSize;
int _recvQueueSize;
int _sendMsgSize;
int _recvMsgSize;
char *_sendBuff = NULL;
char *_recvBuff = NULL;
struct ibv_mr *_sendMr = NULL;
struct ibv_mr *_recvMr = NULL;
std::queue<char *> _sendBuffers;
std::mutex _sendBuffersM;
rocksdb::DB *_db;
void connect();
void registerMemory();
public:
std::atomic<uint64_t> _requestId{12};
RdmaRepEndpoint(struct rdma_cm_id *id, int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, rocksdb::DB *_db);
void processSendCompletion(struct ibv_wc *data);
void processRecvCompletion(struct ibv_wc *data);
void processDelete(struct MessageHeader *req);
void processGet(struct MessageHeader *req);
void processPut(struct MessageHeader *req);
int sendMessage(const char *buffer, uint32_t size);
};
#endif
\ No newline at end of file
#ifndef __RDMAREPENDPOINTGROUP__
#define __RDMAREPENDPOINTGROUP__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
#include <errno.h> #include <errno.h>
#include <unordered_map> #include <unordered_map>
#include <rocksdb/db.h>
#ifndef __RDMACLIENTENDPOINTGROUP__
#define __RDMACLIENTENDPOINTGROUP__
#include "RdmaCqProcessor.hpp" #include "RdmaCqProcessor.hpp"
#include "RdmaClientEndpoint.hpp" #include "RdmaRepEndpoint.hpp"
#include "RdmaClientRepEndpoint.hpp"
#include "Executor.hpp"
class RdmaClientEndpointGroup class RdmaRepEndpointGroup
{ {
RdmaCqProcessor *_cqProcessor = NULL; RdmaCqProcessor *_cqProcessor = NULL;
//struct rdma_cm_id *_cm_id; // struct rdma_cm_id *_cm_id;
int _sendQueueSize; int _sendQueueSize;
int _recvQueueSize; int _recvQueueSize;
int _compQueueSize; int _compQueueSize;
int _sendMsgSize; int _sendMsgSize;
int _recvMsgSize; int _recvMsgSize;
int _timeoutMs;
int _maxInLine; int _maxInLine;
int _timeoutMs;
struct rdma_event_channel *_eventChannel; struct rdma_event_channel *_eventChannel;
std::thread *_cmEventThread; std::thread *_cmEventThread;
bool _stop; bool _stopCMThread;
Executor *_executor;
rocksdb::DB *_db;
public: public:
RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, std::unordered_map<uint32_t, RdmaRepEndpoint *> *_qpRepEndpointMap;
int recvMsgSize,int maxInLine, int timeout); RdmaRepEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
int recvMsgSize, int maxInLine, int timeout, rocksdb::DB *db);
void processCmEvents(); void processCmEvents();
void processCmEvent(struct rdma_cm_event *event); void processCmEvent(struct rdma_cm_event *event);
void start(); void startCmProcessor();
struct ibv_cq *createCq(struct rdma_cm_id *id); void setExecutor(Executor *ex);
RdmaClientEndpoint *createEndpoint(); RdmaClientRepEndpoint *createEndpoint();
void close(); void close();
struct ibv_cq *createCq(struct rdma_cm_id *id);
}; };
#endif #endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
#include <thread>
#include <unordered_map>
#ifndef __RDMASALCQPROCESSOR__
#define __RDMASALCQPROCESSOR__
#include "Executor.hpp"
class RdmaSalCqProcessor
{
public:
struct ibv_comp_channel *_compChannel{NULL};
struct ibv_cq *_completionQueue{NULL};
std::thread *_compQueueThread{NULL};
bool _stop{false};
Executor *_executor{NULL};
RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize);
struct ibv_cq *getCq();
void start();
void processCQEvents();
void close();
};
#endif
#ifndef __RdmaInvEndpoint__ #ifndef __RdmaSalEndpoint__
#define __RdmaInvEndpoint__ #define __RdmaSalEndpoint__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
#include <errno.h> #include <errno.h>
#include <iostream> #include <iostream>
#include <boost/lockfree/queue.hpp> #include <rocksdb/db.h>
#include "RdmaEndpoint.hpp" #include "RdmaEndpoint.hpp"
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
#include <rocksdb/db.h> #include "Logger.hpp"
class RdmaSalEndpoint : public RdmaEndpoint class RdmaSalEndpoint
{ {
public: public:
static int CONN_STATE_INITIALIZED;
static int CONN_STATE_RESOURCES_ALLOCATED;
static int CONN_STATE_CONNECTED;
static int CONN_STATE_CLOSED;
int _state{0};
struct rdma_cm_id *_cm_id{NULL};
struct ibv_cq *_completionQueue{NULL};
struct ibv_pd *_protectionDomain{NULL};
int _sendQueueSize{0};
int _recvQueueSize{0};
int _sendMsgSize{0};
int _recvMsgSize{0};
char *_sendBuff{NULL};
char *_recvBuff{NULL};
struct ibv_mr *_sendMr{NULL};
struct ibv_mr *_recvMr{NULL};
std::queue<char*> _sendBuffers;
std::mutex _sendBuffersM;
rocksdb::DB *_db; rocksdb::DB *_db;
RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize, RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db); int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db);
void createResources();
void processCmEvent(struct rdma_cm_event *event);
void processCqEvent(struct ibv_wc wc); void processCqEvent(struct ibv_wc wc);
void processSendCompletion(struct ibv_wc *data); void processSendCompletion(struct ibv_wc *data);
void processRecvCompletion(struct ibv_wc *data); void processRecvCompletion(struct ibv_wc *data);
void processDelete(struct SalRequestHeader *); void processDelete(struct MessageHeader *req);
void processGet(struct SalRequestHeader *req); void processGet(struct MessageHeader *req);
void processPut(struct SalRequestHeader *req); void processPut(struct MessageHeader *req);
int sendMessage(const char *buffer, uint32_t size); int sendMessage(const char *buffer, uint32_t size);
void close(); void close();
}; };
......
#ifndef __RdmaServerEndpointGroup__
#define __RdmaServerEndpointGroup__
#include <rdma/rdma_cma.h> #include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h> #include <rdma/rdma_verbs.h>
#include <stdint.h> #include <stdint.h>
...@@ -9,13 +12,14 @@ ...@@ -9,13 +12,14 @@
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#ifndef __RDMASERVERENDPOINTGROUP__
#define __RDMASERVERENDPOINTGROUP__
#include "RdmaSalEndpoint.hpp" #include "RdmaSalEndpoint.hpp"
#include "RdmaServerRepEndpoint.hpp"
#include "RdmaEndpointGroup.hpp"
#include "CqEventData.hpp" #include "CqEventData.hpp"
#include "Executor.hpp" #include "Executor.hpp"
#include "RdmaSalCqProcessor.hpp" #include "RdmaCmProcessor.hpp"
#include "RdmaCqProcessor.hpp"
#include "Logger.hpp"
class RdmaServerEndpointGroup class RdmaServerEndpointGroup
{ {
...@@ -27,17 +31,15 @@ class RdmaServerEndpointGroup ...@@ -27,17 +31,15 @@ class RdmaServerEndpointGroup
static int CONN_STATE_CONNECTED; static int CONN_STATE_CONNECTED;
static int CONN_STATE_CLOSED; static int CONN_STATE_CLOSED;
bool _stop{false}; RdmaCqProcessor *_cqProcessor{NULL};
std::thread *_cmEventThread{NULL};
struct rdma_event_channel *_eventChannel{NULL};
Executor *_executor; Executor *_executor;
struct rdma_cm_id *_cm_id{NULL}; struct rdma_cm_id *_cm_id{NULL};
RdmaSalCqProcessor *_salCqProcessor{NULL}; struct rdma_event_channel *_eventChannel{NULL};
std::vector<RdmaSalEndpoint *> *_salEps{NULL}; std::thread *_cmEventThread{NULL};
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap{NULL}; bool _stopCmProcessor{false};
/* /*
* variables to maintain queue state * variables to maintain queue state
...@@ -50,23 +52,24 @@ class RdmaServerEndpointGroup ...@@ -50,23 +52,24 @@ class RdmaServerEndpointGroup
rocksdb::DB *_db; rocksdb::DB *_db;
mutable std::shared_mutex _salMutex; mutable std::shared_mutex _salMutex;
mutable std::shared_mutex _repMutex;
void clientClose(); void clientClose();
public: public:
RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, int recvMsgSize); std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap;
void bind(const char *ip, const char *port, int backlog); std::unordered_map<uint32_t, RdmaRepEndpoint*> *_qpRepEndpointMap;
struct ibv_cq *createSalCq(struct rdma_cm_id *id); RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize,
void dispatchSalCqEvents(ibv_wc wc[], int size); int sendMsgSize, int recvMsgSize, rocksdb::DB *db);
void createExecutor(int threadSize); // void setExecutor(Executor *executor);
void bind(const char *ip, const char *port, int backlog);
/* struct ibv_cq *createCq(struct rdma_cm_id *id);
* Sending false will run cmProcessor on calling thread used to save creation of new thread void dispatchCqEvents(ibv_wc wc[], int size);
* when main thread can also do same work void setExecutor(Executor *ex);
*/
void startCmProcessor(bool newThread);
void processCmEvents(); void processCmEvents();
void startCmProcessor(bool newThread);
void processCmEvent(struct rdma_cm_event *event); void processCmEvent(struct rdma_cm_event *event);
void createEpCmEvent(struct rdma_cm_event *event); void createEpCmEvent(struct rdma_cm_event *event);
void close(); void close();
......
#ifndef __RdmaServerRepEndpoint__
#define __RdmaServerRepEndpoint__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <errno.h>
#include <iostream>
#include <rocksdb/db.h>
#include "RdmaEndpoint.hpp"
#include "MessageFormats.hpp"
#include "Logger.hpp"
#include "RdmaRepEndpoint.hpp"
class RdmaServerRepEndpoint : public RdmaRepEndpoint
{
public:
static int CONN_STATE_INITIALIZED;
static int CONN_STATE_RESOURCES_ALLOCATED;
static int CONN_STATE_CONNECTED;
static int CONN_STATE_CLOSED;
int _state{0};
struct ibv_cq *_completionQueue{NULL};
RdmaServerRepEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db);
void createResources();
void processCmEvent(struct rdma_cm_event *event);
void processCqEvent(struct ibv_wc wc);
void close();
};
#endif
\ No newline at end of file
#ifndef __TaskThread__ #ifndef __TaskThread__
#define __TaskThread__ #define __TaskThread__
#include "Runnable.hpp"
#include "CqEventData.hpp"
#include "ConcurrentQueue.hpp"
#include <pthread.h>
#include <iostream> #include <iostream>
#include <queue> #include <queue>
#include <map> #include <pthread.h>
#include "Runnable.hpp"
#include "CqEventData.hpp"
#include "MessageFormats.hpp"
#include "Logger.hpp"
#include "ConcurrentQueue.hpp"
#include "RdmaRepEndpoint.hpp"
#include "RdmaSalEndpoint.hpp" #include "RdmaSalEndpoint.hpp"
class TaskThread class TaskThread
{ {
private: private:
ConcurrentQueue<struct ibv_wc *> *_taskQueue; ConcurrentQueue *_taskQueue;
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap;
bool _stop{false}; bool _stop{false};
int _id; int _id;
pthread_t thread; pthread_t thread;
std::unordered_map<uint32_t, RdmaRepEndpoint *> *_clientRepMap;
std::unordered_map<uint32_t, RdmaRepEndpoint *> *_serverRepMap;
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_salMap;
public: public:
TaskThread(int id, int cpu, ConcurrentQueue<struct ibv_wc *> *, std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap); TaskThread(int id, int cpu, ConcurrentQueue *,
TaskThread(int id, ConcurrentQueue<struct ibv_wc *> *, std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap); std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap,
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap);
void replicateSalRequest(char *salRequest, uint32_t size); void replicateSalRequest(char *salRequest, uint32_t size);
static void *run(void *object); static void *run(void *object);
void stop(); void stop();
void processEvent(struct ibv_wc *data); void processEvent(RdmaSalEndpoint* ep,struct ibv_wc *data);
void processRepEvent(RdmaRepEndpoint* ep,struct ibv_wc *data);
~TaskThread(); ~TaskThread();
}; };
......
...@@ -4,7 +4,21 @@ ...@@ -4,7 +4,21 @@
#All Parameters will be taken as string #All Parameters will be taken as string
# Fixed Parameters # Fixed Parameters
ENABLE_LOGGING=0 sendQS=50
recvQS=50
compQS=50
sendMS=500
recvMS=500
DB_PATH=/tmp/testdb1
EXECUTOR_POOL_SIZE=4
ENABLE_LOGGING=1
SERVER_IP=192.168.200.20 SERVER_IP=192.168.200.20
SERVER_PORT=1921 SERVER_PORT=1921
EXECUTOR_POOL_SIZE=4
\ No newline at end of file FOLLOWERS=1
FOLLOWER1_IP=192.168.200.20
FOLLOWER1_PORT=1920
FOLLOWER2_IP=192.168.200.20
FOLLOWER2_IP=1921
\ No newline at end of file
#include <TaskThread.hpp>
void ConcurrentQueue::push(struct ibv_wc *const &data)
{
std::unique_lock<std::mutex> lock(queueMutex);
std::cout << "putting data\n";
queue1.push(data);
lock.unlock();
queueCv.notify_one();
}
bool ConcurrentQueue::empty()
{
std::unique_lock<std::mutex> lock(queueMutex);
return queue1.empty() && queue2.empty();
}
struct ibv_wc *ConcurrentQueue::try_pop()
{
struct ibv_wc *value = NULL;
std::unique_lock<std::mutex> lock(queueMutex);
/* first check in queue 2 if it is empty then check in queue 1
* Since queue2 is empty and if queue1 is empty we can make thread to wait
* Currently thread will busy wait if there are contending request in queue2
* ie same request is in thread 2 and same request exist in runningRequests set
*/
if (queue2.empty())
{
queueCv.wait(lock, [&]
{ return queue1.size() > 0; });
value = queue1.front();
queue1.pop();
}
else
{
value = queue2.front();
queue2.pop();
}
/* At this point value will not be null because it either gets data
* from queue 1 or form queue2
* To make it sequential consistent we need to order only put requests
* it means ,we can return all other requests immediately
*/
MessageHeader *request = (MessageHeader *)value->wr_id;
if (value->opcode == IBV_WC_RECV && request->type == MessageType::PUT)
{
// std::cout<<"value "<<value<<std::endl;
/* Since this is the first request we need to put this in runningRequests set
* because this might contend in future and return this immediately
*/
if (runningRequests.empty())
{
runningRequests.insert(value);
return value;
}
/* If this is contending with some runningRequest we need to put it inside queue2
* and check for other request till queue1 not become empty, if there is no request in
* queue1 then we will return null and taskthread will retry(probable busy waiting scenario)
* Currently this way will make busy waiting of thread if queue1 is empty,
* and it will always check in queue2 whenever it will retry for next time
* probable solution is to add cv and wait it until we consume that runningRequest or new request come in queue1
*/
auto it = runningRequests.find(value);
if (it != runningRequests.end())
{
queue2.push(value);
// std::cout<<"found putting in 2"<<std::endl;
return NULL;
}
return value;
}
/* Returning all other request immediately*/
return value;
}
void ConcurrentQueue::removeFromSet(struct ibv_wc *data)
{
/* Remove Request from runningRequest Set after processing it so that other threads can
* process contending request from queue2
*/
std::unique_lock<std::mutex> lock(queueMutex);
// std::cout<<"removing"<<data<<std::endl;
runningRequests.erase(data);
}
/* needs correction*/
void ConcurrentQueue::wait_and_pop(struct ibv_wc *&popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCv.wait(lock, [&]
{ return queue1.size() > 0; });
popped_value = queue1.front();
queue1.pop();
}
\ No newline at end of file
#include "Executor.hpp" #include "Executor.hpp"
Executor::Executor(int size,std::unordered_map<uint32_t, RdmaSalEndpoint *> *qpSalEndpointMap) Executor::Executor(int size)
: _size(size), _qpSalEndpointMap(qpSalEndpointMap) : _size(size)
{
_taskQueue = new ConcurrentQueue();
// _taskThreads = new std::vector<TaskThread *>();
_taskThreads.reserve(size);
}
void Executor::createThreads(std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap,
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap)
{ {
_taskQueue = new ConcurrentQueue<struct ibv_wc *>();
_taskThreads = new std::vector<TaskThread *>();
_taskThreads->reserve(size);
for (int i = 0; i < _size; i++) for (int i = 0; i < _size; i++)
{ {
TaskThread *thread = new TaskThread(i, _taskQueue, _qpSalEndpointMap); TaskThread *thread = new TaskThread(i, i, _taskQueue, clientRepMap,serverRepMap, salMap);
_taskThreads->push_back(thread); _taskThreads.push_back(thread);
} }
} }
void Executor::submit(struct ibv_wc *task)
void Executor::submit(struct ibv_wc *task)
{ {
_taskQueue->push(task); _taskQueue->push(task);
} }
void Executor::getTask() void Executor::getTask()
{ {
} }
void Executor::stop()
{
for (size_t i = 0; i < _taskThreads.size(); i++)
{
_taskThreads[i]->stop();
delete _taskThreads[i];
}
delete _taskQueue;
}
#include <string> #include "Properties.hpp"
#include<iostream>
#include<fstream>
#include<map>
std::string Properties::ltrim(const std::string &s)
class Properties{ {
private:
std::map<std::string,std::string> _props;
const std::string _WHITESPACE = " \n\r\t\f\v";
std::string ltrim(const std::string& s)
{
size_t start = s.find_first_not_of(_WHITESPACE); size_t start = s.find_first_not_of(_WHITESPACE);
return (start == std::string::npos) ? "" : s.substr(start); return (start == std::string::npos) ? "" : s.substr(start);
} }
std::string rtrim(const std::string& s) std::string Properties::rtrim(const std::string &s)
{ {
size_t end = s.find_last_not_of(_WHITESPACE); size_t end = s.find_last_not_of(_WHITESPACE);
return (end == std::string::npos) ? "" : s.substr(0, end + 1); return (end == std::string::npos) ? "" : s.substr(0, end + 1);
} }
std::string trim(const std::string& s) std::string Properties::trim(const std::string &s)
{ {
return rtrim(ltrim(s)); return rtrim(ltrim(s));
} }
public: Properties::Properties(std::string filename)
{
// std::cout<<"Reading Properties From file named prop.config ...........\n";
Properties(std::string filename){ std::ifstream file(filename);
//std::cout<<"Reading Properties From file named prop.config ...........\n"; if (!file.is_open())
{
std::ifstream file (filename); std::cout << "Confiq file opening failed\n";
if(!file.is_open()){
std::cout<<"Confiq file opening failed\n";
exit(0); exit(0);
} }
std::string line; std::string line;
std::string key,value; std::string key, value;
int delimPos; int delimPos;
while(getline(file,line)){ while (getline(file, line))
delimPos=line.find('#'); {
line=trim(line); delimPos = line.find('#');
if(!line.empty()){ line = trim(line);
line=line.substr(0,delimPos); if (!line.empty())
delimPos=line.find('='); {
_props.insert(make_pair(trim(line.substr(0,delimPos)),trim(line.substr(delimPos+1)))); line = line.substr(0, delimPos);
} delimPos = line.find('=');
_props.insert(make_pair(trim(line.substr(0, delimPos)), trim(line.substr(delimPos + 1))));
} }
} }
std::string getValue(std::string key){ }
auto it=_props.find(key); std::string Properties::getValue(std::string key)
if(it==_props.end()){ {
auto it = _props.find(key);
if (it == _props.end())
{
return ""; return "";
} }
return it->second; return it->second;
} }
}; \ No newline at end of file
\ No newline at end of file
#include "RdmaClientEndpoint.hpp" #include "RdmaClientRepEndpoint.hpp"
int RdmaClientEndpoint::CONN_STATE_INITIALIZED = 0;
int RdmaClientEndpoint::CONN_STATE_ADDR_RESOLVED = 1; int RdmaClientRepEndpoint::CONN_STATE_INITIALIZED = 0;
int RdmaClientEndpoint::CONN_STATE_ROUTE_RESOLVED = 2; int RdmaClientRepEndpoint::CONN_STATE_ADDR_RESOLVED = 1;
int RdmaClientEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 3; int RdmaClientRepEndpoint::CONN_STATE_ROUTE_RESOLVED = 2;
int RdmaClientEndpoint::CONN_STATE_CONNECTED = 4; int RdmaClientRepEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 3;
int RdmaClientEndpoint::CONN_STATE_PARTIAL_CLOSED = 5; int RdmaClientRepEndpoint::CONN_STATE_CONNECTED = 4;
int RdmaClientEndpoint::CONN_STATE_CLOSED = 6; int RdmaClientRepEndpoint::CONN_STATE_PARTIAL_CLOSED = 5;
int RdmaClientRepEndpoint::CONN_STATE_CLOSED = 6;
RdmaClientEndpoint::RdmaClientEndpoint(struct rdma_cm_id *id,int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, int maxInLine, int timeout) RdmaClientRepEndpoint::RdmaClientRepEndpoint(struct rdma_cm_id *id, int sendQueueSize, int recvQueueSize,
: _cm_id(id), _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), int sendMsgSize, int recvMsgSize, int maxInLine, int timeout, rocksdb::DB *db)
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine), _timeoutMs(timeout) : RdmaRepEndpoint(id, sendQueueSize, recvQueueSize, sendMsgSize,
recvMsgSize,db), _maxInLine(maxInLine), _timeoutMs(timeout)
{ {
_state = CONN_STATE_INITIALIZED; _state = CONN_STATE_INITIALIZED;
_sendBuffers = new boost::lockfree::queue<void*>(_sendQueueSize);
} }
void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *connData)
void RdmaClientRepEndpoint::connect(const char *ip, const char *port, const char *connData)
{ {
_connData = connData; _connData = connData;
if (_state != CONN_STATE_INITIALIZED) if (_state != CONN_STATE_INITIALIZED)
{ {
std::cout << "RdmaClientEndpoint : connect state not initialized" << std::endl; CPPLog::LOG_ERROR("RdmaClientRepEndpoint : connect state not initialized");
return;
} }
int ret; int ret;
std::cout << "RdmaClientEndpoint : step2 getaddrinfo" << std::endl; CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step2 getaddrinfo");
struct addrinfo *addr; struct addrinfo *addr;
ret = getaddrinfo(ip, port, NULL, &addr); ret = getaddrinfo(ip, port, NULL, &addr);
if (ret) if (ret)
{ {
std::cout << "RdmaClientEndpoint : get_addr_info failed" << std::endl; CPPLog::LOG_ERROR("RdmaClientRepEndpoint : get_addr_info failed");
} }
std::cout << "RdmaClientEndpoint : step2 resolve addr" << std::endl; CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step2 resolve addr");
ret = rdma_resolve_addr(_cm_id, NULL, addr->ai_addr, _timeoutMs); ret = rdma_resolve_addr(_cm_id, NULL, addr->ai_addr, _timeoutMs);
if (ret) if (ret)
{ {
std::cout << "unable to resolve addr" << std::endl; CPPLog::LOG_ERROR("unable to resolve addr");
return; //return;
} }
std::cout << "RdmaClientEndpoint : step2 resolve addr resolved" << std::endl; CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step2 resolve addr resolved");
_state = CONN_STATE_ADDR_RESOLVED; _state = CONN_STATE_ADDR_RESOLVED;
} }
bool RdmaClientEndpoint::isConnected()
bool RdmaClientRepEndpoint::isConnected()
{ {
return _state == CONN_STATE_CONNECTED; return _state == CONN_STATE_CONNECTED;
} }
void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
void RdmaClientRepEndpoint::processCmEvent(struct rdma_cm_event *event)
{ {
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL) if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL)
{ {
std::cout << "RdmaClientEndpoint : step3 resolve_route" << std::endl; CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step3 resolve_route");
rdma_resolve_route(_cm_id, _timeoutMs); rdma_resolve_route(_cm_id, _timeoutMs);
} }
else if (event->event == RDMA_CM_EVENT_ROUTE_RESOLVED && event->id != NULL) else if (event->event == RDMA_CM_EVENT_ROUTE_RESOLVED && event->id != NULL)
{ {
registerMemory(); registerMemory();
std::cout << "RdmaClientEndpoint : step5 connect" << std::endl; CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step5 connect");
connect(); connect();
} }
else if (event->id != NULL && event->event == RDMA_CM_EVENT_ESTABLISHED) else if (event->id != NULL && event->event == RDMA_CM_EVENT_ESTABLISHED)
{ {
std::cout << "RdmaClientEndpoint : step6 Connected" << std::endl; CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step6 Connected");
_state = CONN_STATE_CONNECTED; _state = CONN_STATE_CONNECTED;
} }
else if (event->id != NULL && event->event == RDMA_CM_EVENT_UNREACHABLE)
{
std::cout << "Not able to connect replication endpoint\n";
if(_retryCount < _maxRetryCount)
{
std::cout<<"retry to connect with server\n";
connect();
_retryCount ++;
}
}
else if (event->id != NULL && event->event == RDMA_CM_EVENT_DISCONNECTED) else if (event->id != NULL && event->event == RDMA_CM_EVENT_DISCONNECTED)
{ {
std::cout << "RdmaClientEndpoint : step7 Closed" << std::endl; CPPLog::LOG_DEBUG("RdmaClientRepEndpoint : step7 Closed");
completeClose(); close();
} }
else else
{ {
std::cout << "RdmaClientEndpoint : Not able to procces CM EVent" << rdma_event_str(event->event) << event->id << " " << event->listen_id << std::endl; std::ostringstream ss;
ss << "RdmaClientRepEndpoint : Not able to procces CM EVent" << rdma_event_str(event->event);
ss << event->id << " " << event->listen_id;
CPPLog::LOG_ERROR(ss);
} }
} }
void RdmaClientEndpoint::close()
void RdmaClientRepEndpoint::close()
{ {
if (_state != CONN_STATE_CONNECTED) if (_state != CONN_STATE_CONNECTED)
{ {
std::cout << "RdmaClientEndpoint : close invalid state" << std::endl; std::cout << "RdmaClientRepEndpoint : close invalid state" << std::endl;
return;
} }
_state = CONN_STATE_PARTIAL_CLOSED; _state = CONN_STATE_PARTIAL_CLOSED;
int ret = rdma_disconnect(_cm_id); int ret = rdma_disconnect(_cm_id);
if (ret) if (ret)
{ {
std::cout << "RdmaClientEndpoint : rdma_disconnect failed" << std::endl; std::cout << "RdmaClientRepEndpoint : rdma_disconnect failed" << std::endl;
} }
} delete[](char *) _sendBuff;
delete[](char *) _recvBuff;
void RdmaClientEndpoint::completeClose()
{
if (_state != CONN_STATE_PARTIAL_CLOSED)
{
std::cout << "RdmaClientEndpoint : completeClose invalid state" << std::endl;
return;
}
_state = CONN_STATE_CLOSED;
delete _sendBuffers;
free(_sendBuff);
free(_recvBuff);
rdma_dereg_mr(_sendMr); rdma_dereg_mr(_sendMr);
rdma_dereg_mr(_recvMr); rdma_dereg_mr(_recvMr);
rdma_destroy_qp(_cm_id); rdma_destroy_qp(_cm_id);
rdma_destroy_id(_cm_id); // rdma_destroy_id(_cm_id);
} }
void RdmaClientEndpoint::connect() void RdmaClientRepEndpoint::connect()
{ {
if (_connData != NULL) if (_connData != NULL)
{ {
...@@ -126,73 +131,76 @@ void RdmaClientEndpoint::connect() ...@@ -126,73 +131,76 @@ void RdmaClientEndpoint::connect()
rdma_connect(_cm_id, NULL); rdma_connect(_cm_id, NULL);
} }
} }
void RdmaClientEndpoint::registerMemory()
void RdmaClientRepEndpoint::registerMemory()
{ {
if (_state != CONN_STATE_ROUTE_RESOLVED) if (_state != CONN_STATE_ROUTE_RESOLVED)
{ {
std::cout << "RdmaClientEndpoint : createResource address not resolved" << std::endl; std::cout << "RdmaClientRepEndpoint : createResource address not resolved" << std::endl;
return; return;
} }
_sendBuff = malloc(_sendMsgSize * _sendQueueSize); _sendBuff = new char[_sendMsgSize * _sendQueueSize];
if (_sendBuff == NULL) if (_sendBuff == NULL)
{ {
std::cout << "RdmaClientEndpoint : sendBuff malloc failed" << std::endl; std::cout << "RdmaClientRepEndpoint : sendBuff allocation failed" << std::endl;
return; return;
} }
_sendMr = rdma_reg_msgs(_cm_id, _sendBuff, _sendMsgSize * _sendQueueSize); _sendMr = rdma_reg_msgs(_cm_id, _sendBuff, _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL) if (_sendMr == NULL)
{ {
std::cout << "RdmaClientEndpoint : sendMr reg failed" << std::endl; std::cout << "RdmaClientRepEndpoint : sendMr reg failed" << std::endl;
return; return;
} }
_recvBuff = malloc(_recvMsgSize * _recvQueueSize); _recvBuff = new char[_recvMsgSize * _recvQueueSize];
if (_recvBuff == NULL) if (_recvBuff == NULL)
{ {
std::cout << "RdmaClientEndpoint : recvBuff malloc failed" << std::endl; std::cout << "RdmaClientRepEndpoint : recvBuff allocation failed" << std::endl;
return; return;
} }
_recvMr = rdma_reg_msgs(_cm_id, _recvBuff, _recvMsgSize * _recvQueueSize); _recvMr = rdma_reg_msgs(_cm_id, _recvBuff, _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL) if (_recvMr == NULL)
{ {
std::cout << "RdmaClientEndpoint : recvMr reg failed" << std::endl; std::cout << "RdmaClientRepEndpoint : recvMr reg failed" << std::endl;
return; return;
} }
char *buffer = (char *)_recvBuff; char *buffer = (char *)_recvBuff;
for (int i = 0; i < _recvQueueSize; i++) for (int i = 0; i < _recvQueueSize; i++)
{ {
void* const location = buffer + i * _recvMsgSize; void *const location = buffer + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location), rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr); _recvMsgSize, _recvMr);
} }
buffer = (char *)_sendBuff; buffer = (char *)_sendBuff;
for (int i = 0; i < _sendQueueSize; i++) for (int i = 0; i < _sendQueueSize; i++)
{ {
void* const location = buffer + i * _sendMsgSize; void *const location = buffer + i * _sendMsgSize;
_sendBuffers->push(location); std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)location);
} }
_state = CONN_STATE_RESOURCES_ALLOCATED; _state = CONN_STATE_RESOURCES_ALLOCATED;
} }
void RdmaClientEndpoint::createResources(struct ibv_cq *cq)
void RdmaClientRepEndpoint::createResources(struct ibv_cq *cq)
{ {
if (_state != CONN_STATE_ADDR_RESOLVED) if (_state != CONN_STATE_ADDR_RESOLVED)
{ {
std::cout << "RdmaClientEndpoint : createResource address not resolved" << std::endl; std::cout << "RdmaClientRepEndpoint : createResource address not resolved" << std::endl;
return; return;
} }
_protectionDomain = ibv_alloc_pd(_cm_id->verbs); _protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL) if (_protectionDomain == NULL)
{ {
std::cout << "RdmaClientEndpoint : ibv_alloc_pd failed " << std::endl; std::cout << "RdmaClientRepEndpoint : ibv_alloc_pd failed " << std::endl;
return; return;
} }
struct ibv_cq *completionQueue = cq; struct ibv_cq *completionQueue = cq;
struct ibv_qp_init_attr qp_init_attr; struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr)); memset(&qp_init_attr, 0, sizeof(qp_init_attr));
//This is used to set endpoint address with qp // This is used to set endpoint address with qp
qp_init_attr.qp_context = (void *)this; qp_init_attr.qp_context = (void *)this;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion // if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr.sq_sig_all = 1; qp_init_attr.sq_sig_all = 1;
...@@ -214,36 +222,14 @@ void RdmaClientEndpoint::createResources(struct ibv_cq *cq) ...@@ -214,36 +222,14 @@ void RdmaClientEndpoint::createResources(struct ibv_cq *cq)
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr); int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret) if (ret)
{ {
std::cout << "RdmaClientEndpoint : ibv_create_cq failed\n"; std::cout << "RdmaClientRepEndpoint : ibv_create_cq failed\n";
} }
if (_cm_id->pd == NULL) if (_cm_id->pd == NULL)
{ {
std::cout << "RdmaClientEndpoint : pd not set" << std::endl; std::cout << "RdmaClientRepEndpoint : pd not set" << std::endl;
_cm_id->pd = _protectionDomain; _cm_id->pd = _protectionDomain;
} }
_state = CONN_STATE_ROUTE_RESOLVED; _state = CONN_STATE_ROUTE_RESOLVED;
} }
int RdmaClientEndpoint::sendMessage(const char *buffer, int size)
{
if (size > _sendMsgSize)
return -1;
void* sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr)
return -1;
memcpy(sendBuffer, buffer, size);
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0);
}
void RdmaClientEndpoint::processSendComp(struct ibv_wc wc)
{
_sendBuffers->push((void *)wc.wr_id);
}
void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc)
{
struct SalResponseHeader* response = (struct SalResponseHeader*)wc.wr_id;
std::cout<<"Recieve response for id "<<response->id<<" size "<<response->size <<" type "<<response->type<<"\n";
rdma_post_recv(_cm_id, (void *)wc.wr_id, (void *)wc.wr_id, _recvMsgSize, _recvMr);
}
\ No newline at end of file
#include "RdmaCmProcessor.hpp"
#include <iostream>
RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group)
: _endpointGroup(group)
{
CPPLog::LOG_INFO("CMProcessor : Step 1 creating event channel");
_eventChannel = rdma_create_event_channel();
if (_eventChannel == NULL)
{
CPPLog::LOG_ERROR( "CMProcesor : error creating event channel");
}
}
struct rdma_cm_id *RdmaCmProcessor::createId()
{
struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
if (ret == -1)
CPPLog::LOG_ERROR("CMProcesor : rdma_create_id failed");
return id;
}
void RdmaCmProcessor::processCmEvent()
{
int ret;
struct rdma_cm_event *event;
CPPLog::LOG_INFO("CMProcessor : starting cm processing thread");
while (!_stop)
{
ret = rdma_get_cm_event(_eventChannel, &event);
if (ret)
{
CPPLog::LOG_ERROR("CMProcesor : rdma_get_cm_event failed");
continue;
}
_endpointGroup->processCmEvent(event);
ret = rdma_ack_cm_event(event);
if (ret)
{
CPPLog::LOG_ERROR("CMProcesor : rdma_ack_cm_event failed");
}
}
}
void RdmaCmProcessor::start(bool newThread)
{
if (newThread == true)
_cmEventThread = new std::thread(&RdmaCmProcessor::processCmEvent, this);
else
processCmEvent();
}
void RdmaCmProcessor::close()
{
CPPLog::LOG_ALWAYS("Closing CM Processor");
_stop = true;
if (_cmEventThread != NULL)
_cmEventThread->join();
delete _cmEventThread;
rdma_destroy_event_channel(_eventChannel);
}
#include "RdmaCqProcessor.hpp" #include "RdmaCqProcessor.hpp"
RdmaCqProcessor::RdmaCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize)
RdmaCqProcessor::RdmaCqProcessor(ibv_context *verbs, int compQueueSize) : _executor(ex)
{ {
//_qpEndpointMap = new std::unordered_map<>();
_qpEndpointMap = new std::unordered_map<uint32_t, RdmaClientEndpoint *>();
_compChannel = ibv_create_comp_channel(verbs); _compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL) if (_compChannel == NULL)
{ {
std::cout << "CqProcessr : ibv_create_comp_channel failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_create_comp_channel failed");
return; return;
} }
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0); _completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL) if (_completionQueue == NULL)
{ {
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl; CPPLog::LOG_INFO("CqProcessr : ibv_create_cq failed");
return; return;
} }
int ret = ibv_req_notify_cq(_completionQueue, 0); int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
std::cout << "CqProcessr : ibv_req_notify_cq failed\n"; CPPLog::LOG_INFO("CqProcessr : ibv_req_notify_cq failed");
} }
} }
struct ibv_cq *RdmaCqProcessor::getCq() struct ibv_cq *RdmaCqProcessor::getCq()
{ {
return _completionQueue; return _completionQueue;
} }
void RdmaCqProcessor::registerEp(uint64_t qp,RdmaClientEndpoint* ep)
{
_qpEndpointMap->emplace(qp,ep);
}
void RdmaCqProcessor::start() void RdmaCqProcessor::start()
{ {
std::cout << "CqProcessr : starting process CQ events" << std::endl; CPPLog::LOG_ALWAYS("CqProcessr : starting process CQ events");
_compQueueThread = new std::thread(&RdmaCqProcessor::processCQEvents, this); _compQueueThread = new std::thread(&RdmaCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(),"compQueueThread"); pthread_setname_np(_compQueueThread->native_handle(),"CQProcessor");
} }
void RdmaCqProcessor::processCQEvents() void RdmaCqProcessor::processCQEvents()
{ {
...@@ -45,73 +37,56 @@ void RdmaCqProcessor::processCQEvents() ...@@ -45,73 +37,56 @@ void RdmaCqProcessor::processCQEvents()
void *context; void *context;
const int nevent = 10; const int nevent = 10;
struct ibv_wc wc_array[nevent]; struct ibv_wc wc_array[nevent];
while (1) while (!_stop)
{ {
/*
* get_CQ_event is a blocking call and it wait save some cpu cycles but.
* it might not be that efficient compared to polling
*/
ret = ibv_get_cq_event(_compChannel, &cq, &context); ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1) if (ret == -1)
{ {
std::cout << "CqProcessr : ibv_get_cq_event failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_get_cq_event failed");
close(); close();
} }
ibv_ack_cq_events(cq, 1); ibv_ack_cq_events(cq, 1);
/*
* Create a request for next completion cycle
*/
ret = ibv_req_notify_cq(_completionQueue, 0); ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret) if (ret)
{ {
std::cout << "CqProcessr : ibv_req_notify_cq failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_req_notify_cq failed");
close(); close();
} }
ret = ibv_poll_cq(cq, nevent, wc_array); ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0) if (ret < 0)
{ {
std::cout << "CqProcessr : ibv_poll_cq failed\n"; CPPLog::LOG_ERROR("CqProcessr : ibv_poll_cq failed");
close(); close();
} }
if (ret == 0) if (ret == 0)
continue; continue;
for (int i = 0; i < ret; i++)
dispatchCqEvents(wc_array, ret);
}
}
inline void RdmaCqProcessor::dispatchCqEvents(ibv_wc wc[], int size)
{
for (int i = 0; i < size; i++)
{ {
if (wc[i].status != IBV_WC_SUCCESS) if (wc_array[i].status != IBV_WC_SUCCESS)
{ {
std::cout << "RdmaCqProcessor : failed work completion : " << ibv_wc_status_str(wc[i].status) << " on qp " << wc[i].qp_num << std::endl; std::ostringstream ss;
return; ss<< "RdmaCqProcessor : failed work completion : ";
} ss<<ibv_wc_status_str(wc_array[i].status)<<"on qp"<<wc_array[i].qp_num;
auto it = _qpEndpointMap->find(wc[i].qp_num); CPPLog::LOG_ERROR(ss);
if (it == _qpEndpointMap->end()) continue;
{
std::cout << "RdmaCqProcessor : endpoint not registered for qp num" << std::endl;
return;
}
switch (wc[i].opcode)
{
case IBV_WC_SEND:
it->second->processSendComp(wc[i]);
break;
case IBV_WC_RECV:
it->second->processRecvComp(wc[i]);
break;
case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n";
break;
case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n";
break;
default:
std::cout << "RdmaCqProcessor : invalid opcode" << std::endl;
break;
} }
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
_executor->submit(data);
} }
} }
}
void RdmaCqProcessor::close() void RdmaCqProcessor::close()
{ {
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
delete _compQueueThread;
} }
\ No newline at end of file
#include "RdmaEndpoint.hpp" #include "RdmaEndpoint.hpp"
#include "Logger.hpp"
int RdmaEndpoint::CONN_STATE_INITIALIZED = 1; int RdmaEndpoint::CONN_STATE_INITIALIZED = 1;
int RdmaEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2; int RdmaEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2;
int RdmaEndpoint::CONN_STATE_CONNECTED = 3; int RdmaEndpoint::CONN_STATE_CONNECTED = 3;
int RdmaEndpoint::CONN_STATE_CLOSED = 4; int RdmaEndpoint::CONN_STATE_CLOSED = 4;
RdmaEndpoint::RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize, RdmaEndpoint::RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize) int recvQueueSize, int sendMsgSize, int recvMsgSize)
: _cm_id(id), _completionQueue(completionQueue), _sendQueueSize(sendQueueSize), : _cm_id(id), _completionQueue(completionQueue), _sendQueueSize(sendQueueSize),
_recvQueueSize(recvQueueSize), _sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize) _recvQueueSize(recvQueueSize), _sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize)
{ {
_state = CONN_STATE_INITIALIZED; _state = CONN_STATE_INITIALIZED;
_sendBuffers = new boost::lockfree::queue<void*>(_sendMsgSize);
} }
void RdmaEndpoint::createResources() void RdmaEndpoint::createResources()
{ {
/* These states are used to avoid errors in lifetime of rdma connection
* more erros can be tracked in future using these lifecycle states
*/
if (_state != CONN_STATE_INITIALIZED) if (_state != CONN_STATE_INITIALIZED)
{ {
std::cout << "RdmaEndpoint : createResource invalid satte" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : createResource invalid state");
} }
//Step 1 to create endpoint
_protectionDomain = ibv_alloc_pd(_cm_id->verbs); _protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL) if (_protectionDomain == NULL)
{ {
std::cout << "RdmaEndpoint : ibv_alloc_pd failed " << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed");
return; return;
} }
//step 2 Creating Queue pair with completion queueu setted for send and recieve
struct ibv_qp_init_attr qp_init_attr; struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr)); memset(&qp_init_attr, 0, sizeof(qp_init_attr));
//This is used to set endpoint address with qp /*
* Endpoint address is setted in QP context to get endpoint at run time with qp
* without using any map to map qp_num to endpoint
*/
qp_init_attr.qp_context = (void *)this; qp_init_attr.qp_context = (void *)this;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion // if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr.sq_sig_all = 1; qp_init_attr.sq_sig_all = 1;
...@@ -49,107 +55,120 @@ void RdmaEndpoint::createResources() ...@@ -49,107 +55,120 @@ void RdmaEndpoint::createResources()
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr); int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : ibv_create_cq failed\n"; CPPLog::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed");
} }
if (_cm_id->pd == NULL) if (_cm_id->pd == NULL)
{ {
std::cout << "RdmaEndpoint : pd not set" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : pd not set");
_cm_id->pd = _protectionDomain; _cm_id->pd = _protectionDomain;
} }
_sendBuff = (char*)malloc(_sendMsgSize * _sendQueueSize); /*
* Step 3 register memory for send and recv queue
*/
_sendBuff = new char[(_sendMsgSize * _sendQueueSize)];
if (_sendBuff == NULL) if (_sendBuff == NULL)
std::cout << "RdmaEndpoint : sendBuff malloc failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : sendBuff allocation failed");
_recvBuff = (char*)malloc(_recvMsgSize * _recvQueueSize);
_sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize); _sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL) if (_sendMr == NULL)
std::cout << "RdmaEndpoint : sendMr reg failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : sendMr reg failed");
_recvBuff = new char[(_recvMsgSize * _recvQueueSize)];
if (_recvBuff == NULL) if (_recvBuff == NULL)
std::cout << "RdmaEndpoint : recvBuff malloc failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : recvBuff allocation failed");
_recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize); _recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL) if (_recvMr == NULL)
std::cout << "RdmaEndpoint : recvMr reg failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : recvMr reg failed");
/*
* adding buffers for recving rdma data
*/
for (int i = 0; i < _recvQueueSize; i++) for (int i = 0; i < _recvQueueSize; i++)
{ {
char *const location = _recvBuff + i * _recvMsgSize; char* location = _recvBuff + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location), rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr); _recvMsgSize, _recvMr);
} }
/*
* Adding buffers to queue for receving data
*/
for (int i = 0; i < _sendQueueSize; i++) for (int i = 0; i < _sendQueueSize; i++)
{ {
void* const location = _sendBuff + i * _sendMsgSize; char* location = _sendBuff + i * _sendMsgSize;
_sendBuffers->push(location); std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)location);
} }
_state = CONN_STATE_RESOURCES_ALLOCATED; _state = CONN_STATE_RESOURCES_ALLOCATED;
} }
void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event) void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event)
{ {
std::cout << "RdmaEndpoint : Event " << rdma_event_str(event->event) << std::endl; std::ostringstream ss;
ss<<"RdmaEndpoint : Event "<<rdma_event_str(event->event);
CPPLog::LOG_ALWAYS(ss);
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{ {
std::cout << "RdmaEndpoint : Connect request"; CPPLog::LOG_ALWAYS("RdmaEndpoint : Connect request");
} }
else if (event->event == RDMA_CM_EVENT_ESTABLISHED) else if (event->event == RDMA_CM_EVENT_ESTABLISHED)
{ {
if (_state != CONN_STATE_RESOURCES_ALLOCATED) if (_state != CONN_STATE_RESOURCES_ALLOCATED)
{ {
std::cout << "RdmaEndpoint : EstablishedEvent invalid state " << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : Established_Event but resource not alloted");
} }
std::cout << "RdmaEndpoint : step 6 Connected" << std::endl; CPPLog::LOG_INFO("RdmaEndpoint : step 6 Connected");
_state = CONN_STATE_CONNECTED; _state = CONN_STATE_CONNECTED;
} }
else if (event->event == RDMA_CM_EVENT_DISCONNECTED) else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{ {
std::cout << "RdmaEndpoint : step 7 disconnected" << std::endl; CPPLog::LOG_INFO("RdmaEndpoint : step 7 disconnected");
clientClose(); close();
} }
} }
void RdmaEndpoint::clientClose() void RdmaEndpoint::close()
{ {
if (_state != CONN_STATE_CONNECTED) if (_state < CONN_STATE_RESOURCES_ALLOCATED)
{ {
std::cout << "RdmaEndpoint : clientClose invalid state" << std::endl;
return; return;
} }
std::cout<<"RdmaEndpoint : closing connection qp "<<_cm_id->qp->qp_num<< std::endl; CPPLog::LOG_INFO("RdmaEndpoint : closing connection");
int ret; int ret;
ret = rdma_disconnect(_cm_id); ret = rdma_disconnect(_cm_id);
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : rdma_disconnect failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : rdma_disconnect failed");
} }
ret = rdma_dereg_mr(_sendMr); ret = rdma_dereg_mr(_sendMr);
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : rdma_dereg_mr send failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed");
} }
free(_sendBuff); delete[] _sendBuff;
ret = rdma_dereg_mr(_recvMr); ret = rdma_dereg_mr(_recvMr);
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : rdma_dereg_mr recv failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed");
} }
free(_recvBuff); delete[] _recvBuff;
rdma_destroy_qp(_cm_id); rdma_destroy_qp(_cm_id);
std::cout<<"des qp"<<std::endl; CPPLog::LOG_INFO("des qp");
// rdma_destroy_id(_cm_id);
// ret = rdma_destroy_id(_cm_id); // ret = rdma_destroy_id(_cm_id);
std::cout<<"des mr"<<std::endl; CPPLog::LOG_INFO("des mr");
if (ret) if (ret)
{ {
std::cout << "RdmaEndpoint : rdma_destroy_id failed" << std::endl; CPPLog::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed");
} }
_state = CONN_STATE_CLOSED; _state = CONN_STATE_CLOSED;
std::cout<<"closed"<<std::endl; CPPLog::LOG_INFO("closed");
} }
RdmaEndpoint::~RdmaEndpoint()
{}
\ No newline at end of file
#include "RdmaRepEndpoint.hpp"
RdmaRepEndpoint::RdmaRepEndpoint(struct rdma_cm_id *id, int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: _cm_id(id), _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _sendMsgSize(sendMsgSize),
_recvMsgSize(recvMsgSize), _db(db)
{
}
void RdmaRepEndpoint::processSendCompletion(struct ibv_wc *data)
{
/*means data has been send to other side we can use this buffer*/
std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)data->wr_id);
}
int RdmaRepEndpoint::sendMessage(const char *buffer, uint32_t size)
{
if (size > (uint32_t)_sendMsgSize)
return -1;
char *sendBuffer = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0)
return -1;
sendBuffer = (char*)_sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
memcpy((void *)sendBuffer, buffer, size);
return rdma_post_send(_cm_id, (void *)sendBuffer, (void *)sendBuffer, size, _sendMr, 0);
}
void RdmaRepEndpoint::processRecvCompletion(struct ibv_wc *data)
{
char *request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id, data->byte_len);
struct MessageHeader *req = (struct MessageHeader *)request;
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
if (req->type == MessageType::DELETE)
processDelete(req);
if (req->type == MessageType::GET)
processGet(req);
if (req->type == MessageType::PUT)
processPut(req);
delete[] request;
}
void RdmaRepEndpoint::processDelete(struct MessageHeader *req)
{
rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize});
void *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0)
return;
sendBuf = _sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/
response->type = MessageType::FAILURE;
response->id = req->id;
if (s.ok())
{
response->type = MessageType::SUCCESS;
}
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
}
void RdmaRepEndpoint::processGet(struct MessageHeader *req)
{
char *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0)
{
return;
}
sendBuf = (char*)_sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
std::string value;
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + MessageHeaderSize, req->keySize}, &value);
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/
response->type = MessageType::FAILURE;
response->id = req->id;
if (s.ok())
{
response->type = MessageType::SUCCESS;
response->valueSize = value.size();
memcpy(response + MessageHeaderSize, value.c_str(), value.size());
}
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize + value.size(), _sendMr, 0);
}
void RdmaRepEndpoint::processPut(struct MessageHeader *req)
{
rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize},
{(char *)req + MessageHeaderSize + req->keySize, req->valueSize});
char *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0)
{
CPPLog::LOG_ERROR("No send Buffer");
return;
}
sendBuf = (char*)_sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/
response->type = MessageType::FAILURE;
response->id = req->id;
if (s.ok())
response->type = MessageType::SUCCESS;
rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
}
#include "RdmaClientEndpointGroup.hpp" #include "RdmaRepEndpointGroup.hpp"
RdmaClientEndpointGroup::RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, RdmaRepEndpointGroup::RdmaRepEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
int recvMsgSize,int maxInLine, int timeout) int recvMsgSize, int maxInLine, int timeout, rocksdb::DB *db)
: _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize), : _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize), _sendMsgSize(sendMsgSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine),_timeoutMs(timeout) _recvMsgSize(recvMsgSize), _maxInLine(maxInLine), _timeoutMs(timeout), _db(db)
{ {
std::cout << "RdmaClientEndpointGroup : Step 1 creating event channel" << std::endl; std::cout << "RdmaRepEndpointGroup : Step 1 creating event channel" << std::endl;
_eventChannel = rdma_create_event_channel(); _eventChannel = rdma_create_event_channel();
_stop = false; _stopCMThread = false;
if (_eventChannel == NULL) if (_eventChannel == NULL)
{ {
std::cout << "RdmaClientEndpointGroup : error creating event channel"; std::cout << "RdmaRepEndpointGroup : error creating event channel";
} }
_qpRepEndpointMap = new std::unordered_map<uint32_t, RdmaRepEndpoint *>();
} }
void RdmaClientEndpointGroup::start() void RdmaRepEndpointGroup::startCmProcessor()
{ {
_cmEventThread = new std::thread(&RdmaClientEndpointGroup::processCmEvents, this); _cmEventThread = new std::thread(&RdmaRepEndpointGroup::processCmEvents, this);
pthread_setname_np(_cmEventThread->native_handle(),"ClientCMProcessor"); pthread_setname_np(_cmEventThread->native_handle(), "RepCMProcessor");
} }
void RdmaClientEndpointGroup::processCmEvents() struct ibv_cq *RdmaRepEndpointGroup::createCq(struct rdma_cm_id *id)
{
if (_cqProcessor == NULL)
{
CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : step 5 create repcq processor");
_cqProcessor = new RdmaCqProcessor(_executor, id->verbs, _compQueueSize);
_cqProcessor->start();
}
return _cqProcessor->getCq();
}
void RdmaRepEndpointGroup::processCmEvents()
{ {
int ret; int ret;
struct rdma_cm_event *event; struct rdma_cm_event *event;
std::cout << "RdmaClientEndpointGroup : starting cm processing thread" << std::endl; std::cout << "RdmaRepEndpointGroup : starting cm processing thread" << std::endl;
while (!_stop) while (!_stopCMThread)
{ {
ret = rdma_get_cm_event(_eventChannel, &event); ret = rdma_get_cm_event(_eventChannel, &event);
if (ret) if (ret)
{ {
std::cout << "RdmaClientEndpointGroup : rdma_get_cm_event failed" << std::endl; std::cout << "RdmaRepEndpointGroup : rdma_get_cm_event failed" << std::endl;
continue; continue;
} }
processCmEvent(event); processCmEvent(event);
ret = rdma_ack_cm_event(event); ret = rdma_ack_cm_event(event);
if (ret) if (ret)
{ {
std::cout << "RdmaClientEndpointGroup : rdma_ack_cm_event failed"; std::cout << "RdmaRepEndpointGroup : rdma_ack_cm_event failed";
} }
} }
} }
void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event) void RdmaRepEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{ {
std::cout << "RdmaClientEndpointGroup : event" << rdma_event_str(event->event) << std::endl; std::cout << "RdmaRepEndpointGroup : event" << rdma_event_str(event->event) << std::endl;
if (event->id != NULL && event->id->context != NULL) if (event->id != NULL && event->id->context != NULL)
{ {
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL) if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL)
{ {
((RdmaClientEndpoint *)event->id->context)->createResources(createCq(event->id)); ((RdmaClientRepEndpoint *)event->id->context)->createResources(createCq(event->id));
} }
((RdmaClientEndpoint *)event->id->context)->processCmEvent(event); ((RdmaClientRepEndpoint *)event->id->context)->processCmEvent(event);
if(event->event == RDMA_CM_EVENT_ADDR_RESOLVED) if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED)
{ {
_cqProcessor->registerEp(event->id->qp->qp_num,((RdmaClientEndpoint *)event->id->context)); _qpRepEndpointMap->emplace(event->id->qp->qp_num, ((RdmaRepEndpoint *)event->id->context));
}
if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
if (_qpRepEndpointMap->find(event->id->qp->qp_num) != _qpRepEndpointMap->end())
_qpRepEndpointMap->erase(event->id->qp->qp_num);
delete ((RdmaRepEndpoint *)event->id->context);
} }
} }
else else
{ {
std::cout << "RdmaClientEndpointGroup : Not able to procces CM EVent"; std::cout << "RdmaRepEndpointGroup : Not able to procces CM EVent";
std::cout << rdma_event_str(event->event) << event->id << " "; std::cout << rdma_event_str(event->event) << event->id << " ";
std::cout << event->listen_id << std::endl; std::cout << event->listen_id << std::endl;
} }
} }
RdmaClientEndpoint *RdmaClientEndpointGroup::createEndpoint() RdmaClientRepEndpoint *RdmaRepEndpointGroup::createEndpoint()
{ {
struct rdma_cm_id *id = NULL; struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP); int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
std::cout<<"id "<<id<<std::endl;
if (ret == -1) if (ret == -1)
std::cout << "CMProcesor : rdma_create_id failed" << std::endl; std::cout << "CMProcesor : rdma_create_id failed" << std::endl;
RdmaClientEndpoint *endpoint = new RdmaClientEndpoint(id, RdmaClientRepEndpoint *endpoint = new RdmaClientRepEndpoint(id, _sendQueueSize, _recvQueueSize, _sendMsgSize,
_sendQueueSize, _recvQueueSize, _recvMsgSize, _maxInLine, _timeoutMs, _db);
_sendMsgSize, _recvMsgSize,_maxInLine, _timeoutMs);
id->context = (void *)endpoint; id->context = (void *)endpoint;
return endpoint; return endpoint;
} }
struct ibv_cq *RdmaClientEndpointGroup::createCq(struct rdma_cm_id *id) void RdmaRepEndpointGroup::setExecutor(Executor *ex)
{ {
if (_cqProcessor == NULL) _executor = ex;
{
std::cout << "RdmaClientEndpointGroup : Creating CQ processor" << std::endl;
_cqProcessor = new RdmaCqProcessor(id->verbs, _compQueueSize);
_cqProcessor->start();
}
return _cqProcessor->getCq();
} }
void RdmaClientEndpointGroup::close() void RdmaRepEndpointGroup::close()
{ {
_stop = true; _stopCMThread = true;
_cmEventThread->join(); _cmEventThread->join();
_cqProcessor->close();
delete _cmEventThread;
delete _cqProcessor;
delete _qpRepEndpointMap;
rdma_destroy_event_channel(_eventChannel); rdma_destroy_event_channel(_eventChannel);
} }
\ No newline at end of file
#include "RdmaSalCqProcessor.hpp"
RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize)
: _executor(ex)
{
_compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL)
{
std::cout << "SalCqProcessr : ibv_create_comp_channel failed\n";
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
std::cout << "SalCqProcessr : ibv_create_cq failed" << std::endl;
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "SalCqProcessr : ibv_req_notify_cq failed\n";
}
}
struct ibv_cq *RdmaSalCqProcessor::getCq()
{
return _completionQueue;
}
void RdmaSalCqProcessor::start()
{
std::cout << "SalCqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this);
}
void RdmaSalCqProcessor::processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc wc_array[nevent];
while (!_stop)
{
/*
* get_CQ_event is a blocking call and it wait save some cpu cycles but.
* it might not be that efficient compared to polling
*/
ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1)
{
std::cout << "SalCqProcessr : ibv_get_cq_event failed\n";
close();
}
ibv_ack_cq_events(cq, 1);
/*
* Create a request for next completion cycle
*/
ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "SalCqProcessr : ibv_req_notify_cq failed\n";
close();
}
ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0)
{
std::cout << "SalCqProcessr : ibv_poll_cq failed\n";
close();
}
if (ret == 0)
continue;
for (int i = 0; i < ret; i++)
{
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
/*
* vendor_err is set to check whether the request came from sal or followers
* data->vendor_err = 0;
*/
_executor->submit(data);
}
// _executor->dispatchSalCqEvents(wc_array, ret);
}
}
void RdmaSalCqProcessor::close()
{
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
}
\ No newline at end of file
#include "RdmaSalEndpoint.hpp" #include "RdmaSalEndpoint.hpp"
#include "MessageFormats.hpp"
int RdmaSalEndpoint::CONN_STATE_INITIALIZED = 1;
int RdmaSalEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2;
int RdmaSalEndpoint::CONN_STATE_CONNECTED = 3;
int RdmaSalEndpoint::CONN_STATE_CLOSED = 4;
RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize, RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize, rocksdb::DB *db) int recvQueueSize, int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize), _db(db) : _cm_id(id), _completionQueue(completionQueue), _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _db(db)
{
_state = CONN_STATE_INITIALIZED;
}
void RdmaSalEndpoint::createResources()
{ {
/* These states are used to avoid errors in lifetime of rdma connection
* more erros can be tracked in future using these lifecycle states
*/
if (_state != CONN_STATE_INITIALIZED)
{
CPPLog::LOG_ERROR("RdmaEndpoint : createResource invalid state");
}
// Step 1 to create endpoint
_protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL)
{
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed");
return;
}
// step 2 Creating Queue pair with completion queueu setted for send and recieve
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
/*
* Endpoint address is setted in QP context to get endpoint at run time with qp
* without using any map to map qp_num to endpoint
*/
qp_init_attr.qp_context = (void *)this;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr.sq_sig_all = 1;
// completion queue can be shared or you can use distinct completion queues.
qp_init_attr.send_cq = _completionQueue;
qp_init_attr.recv_cq = _completionQueue;
qp_init_attr.qp_type = IBV_QPT_RC;
// increase if you want to keep more send work requests in the SQ.
qp_init_attr.cap.max_send_wr = _sendQueueSize;
// increase if you want to keep more receive work requests in the RQ.
qp_init_attr.cap.max_recv_wr = _recvQueueSize;
// increase if you allow send work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_send_sge = 1;
// increase if you allow receive work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_recv_sge = 1;
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed");
}
if (_cm_id->pd == NULL)
{
CPPLog::LOG_ERROR("RdmaEndpoint : pd not set");
_cm_id->pd = _protectionDomain;
}
/*
* Step 3 register memory for send and recv queue
*/
_sendBuff = new char[(_sendMsgSize * _sendQueueSize)];
if (_sendBuff == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : sendBuff allocation failed");
_sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : sendMr reg failed");
_recvBuff = new char[(_recvMsgSize * _recvQueueSize)];
if (_recvBuff == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : recvBuff allocation failed");
_recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : recvMr reg failed");
/*
* adding buffers for recving rdma data
*/
for (int i = 0; i < _recvQueueSize; i++)
{
char *location = _recvBuff + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr);
}
/*
* Adding buffers to queue for receving data
*/
for (int i = 0; i < _sendQueueSize; i++)
{
char *location = _sendBuff + i * _sendMsgSize;
std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)location);
}
_state = CONN_STATE_RESOURCES_ALLOCATED;
}
void RdmaSalEndpoint::processCmEvent(struct rdma_cm_event *event)
{
std::ostringstream ss;
ss << "RdmaEndpoint : Event " << rdma_event_str(event->event);
CPPLog::LOG_ALWAYS(ss);
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
CPPLog::LOG_ALWAYS("RdmaEndpoint : Connect request");
}
else if (event->event == RDMA_CM_EVENT_ESTABLISHED)
{
if (_state != CONN_STATE_RESOURCES_ALLOCATED)
{
CPPLog::LOG_ERROR("RdmaEndpoint : Established_Event but resource not alloted");
}
CPPLog::LOG_INFO("RdmaEndpoint : step 6 Connected");
_state = CONN_STATE_CONNECTED;
}
else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
CPPLog::LOG_INFO("RdmaEndpoint : step 7 disconnected");
close();
}
}
void RdmaSalEndpoint::close()
{
if (_state < CONN_STATE_RESOURCES_ALLOCATED)
{
return;
}
CPPLog::LOG_INFO("RdmaEndpoint : closing connection");
int ret;
ret = rdma_disconnect(_cm_id);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_disconnect failed");
}
ret = rdma_dereg_mr(_sendMr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed");
}
delete[] _sendBuff;
ret = rdma_dereg_mr(_recvMr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed");
}
delete[] _recvBuff;
rdma_destroy_qp(_cm_id);
CPPLog::LOG_INFO("des qp");
// rdma_destroy_id(_cm_id);
// ret = rdma_destroy_id(_cm_id);
CPPLog::LOG_INFO("des mr");
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed");
}
_state = CONN_STATE_CLOSED;
CPPLog::LOG_INFO("closed");
} }
void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data) void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data)
{ {
/*means data has been send to other side we can use this buffer*/ /*means data has been send to other side we can use this buffer*/
_sendBuffers->push((void *)data->wr_id); std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)data->wr_id);
} }
int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size) int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size)
{ {
if (size > _sendMsgSize) if (size > (uint32_t)_sendMsgSize)
return -1; return -1;
void *sendBuffer = nullptr; char *sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (sendBuffer == nullptr) if (_sendBuffers.size() == 0)
return -1; return -1;
memcpy(sendBuffer, buffer, size); sendBuffer = _sendBuffers.front();
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0); _sendBuffers.pop();
lock.unlock();
memcpy((void *)sendBuffer, buffer, size);
return rdma_post_send(_cm_id, (void *)sendBuffer, (void *)sendBuffer, size, _sendMr, 0);
} }
void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data) void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data)
{ {
char *request = new char[data->byte_len]; char *request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id, data->byte_len); memcpy(request, (void *)data->wr_id, data->byte_len);
struct SalRequestHeader *req = (struct SalRequestHeader *)request; struct MessageHeader *req = (struct MessageHeader *)request;
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr); rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
if (req->type == RequestType::DELETE) if (req->type == MessageType::DELETE)
processDelete(req); processDelete(req);
if (req->type == RequestType::GET) if (req->type == MessageType::GET)
processGet(req); processGet(req);
if (req->type == RequestType::PUT) if (req->type == MessageType::PUT)
processPut(req); processPut(req);
delete[] request; delete[] request;
} }
void RdmaSalEndpoint::processDelete(struct SalRequestHeader *req) void RdmaSalEndpoint::processDelete(struct MessageHeader *req)
{ {
rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}); rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize});
void *sendBuf = nullptr; void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (sendBuf == nullptr) if (_sendBuffers.size() == 0)
{
return; return;
} sendBuf = _sendBuffers.front();
SalResponseHeader *response = (SalResponseHeader *)sendBuf; _sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->status = ResponseStatus::FAILURE; response->type = MessageType::FAILURE;
response->id = req->id; response->id = req->id;
if (s.ok()) if (s.ok())
{ {
response->status = ResponseStatus::SUCCESS; response->type = MessageType::SUCCESS;
} }
rdma_post_send(_cm_id, sendBuf, sendBuf, SalResponseHeaderSize, _sendMr, 0); rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
} }
void RdmaSalEndpoint::processGet(struct SalRequestHeader *req) void RdmaSalEndpoint::processGet(struct MessageHeader *req)
{ {
std::string value; char *sendBuf = nullptr;
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, &value); std::unique_lock<std::mutex> lock(_sendBuffersM);
void *sendBuf = nullptr; if (_sendBuffers.size() == 0)
_sendBuffers->pop(sendBuf);
if (sendBuf == nullptr)
{ {
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf; sendBuf = _sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
std::string value;
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + MessageHeaderSize, req->keySize}, &value);
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->status = ResponseStatus::FAILURE; response->type = MessageType::FAILURE;
response->id = req->id; response->id = req->id;
if (s.ok()) if (s.ok())
{ {
response->status = ResponseStatus::SUCCESS; response->type = MessageType::SUCCESS;
response->valueSize = value.size(); response->valueSize = value.size();
memcpy(response + SalResponseHeaderSize, value.c_str(), value.size()); memcpy(response + MessageHeaderSize, value.c_str(), value.size());
} }
rdma_post_send(_cm_id, sendBuf, sendBuf, SalRequestHeaderSize + value.size(), _sendMr, 0); rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize + value.size(), _sendMr, 0);
} }
void RdmaSalEndpoint::processPut(struct SalRequestHeader *req) void RdmaSalEndpoint::processPut(struct MessageHeader *req)
{ {
rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize},
{(char *)req + SalRequestHeaderSize + req->keySize, req->valueSize}); {(char *)req + MessageHeaderSize + req->keySize, req->valueSize});
void *sendBuf = nullptr; char *sendBuf = nullptr;
_sendBuffers->pop(sendBuf); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (sendBuf != nullptr) if (_sendBuffers.size() == 0)
{ {
CPPLog::LOG_ERROR("No send Buffer");
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf; sendBuf = _sendBuffers.front();
_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/ /*This id done to avoid else case*/
response->status = ResponseStatus::FAILURE; response->type = MessageType::FAILURE;
response->id = req->id; response->id = req->id;
if (s.ok()) if (s.ok())
response->status = ResponseStatus::FAILURE; response->type = MessageType::SUCCESS;
rdma_post_send(_cm_id, sendBuf, sendBuf, SalResponseHeaderSize, _sendMr, 0); rdma_post_send(_cm_id, sendBuf, sendBuf, MessageHeaderSize, _sendMr, 0);
} }
...@@ -5,41 +5,33 @@ int RdmaServerEndpointGroup::CONN_STATE_BINDED = 3; ...@@ -5,41 +5,33 @@ int RdmaServerEndpointGroup::CONN_STATE_BINDED = 3;
int RdmaServerEndpointGroup::CONN_STATE_CONNECTED = 4; int RdmaServerEndpointGroup::CONN_STATE_CONNECTED = 4;
int RdmaServerEndpointGroup::CONN_STATE_CLOSED = 5; int RdmaServerEndpointGroup::CONN_STATE_CLOSED = 5;
RdmaServerEndpointGroup::RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, int recvMsgSize) RdmaServerEndpointGroup::RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize,
int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize), : _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize) _sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _db(db)
{ {
std::cout << "CMProcessor : Step 1 creating event channel" << std::endl; CPPLog::LOG_INFO("SalEndpointGroup : Step 1 creating event channel");
_eventChannel = rdma_create_event_channel(); _eventChannel = rdma_create_event_channel();
if (_eventChannel == NULL) if (_eventChannel == NULL)
{ {
std::cout << "CMProcesor : error creating event channel"; CPPLog::LOG_ERROR("SalEndpointGroup : error creating event channel");
} }
_cm_id = NULL;
int ret = rdma_create_id(_eventChannel, &_cm_id, NULL, RDMA_PS_TCP); int ret = rdma_create_id(_eventChannel, &_cm_id, NULL, RDMA_PS_TCP);
if (ret == -1) if (ret == -1)
std::cout << "CMProcesor : rdma_create_id failed" << std::endl;
_qpSalEndpointMap = new std::unordered_map<uint32_t, RdmaSalEndpoint *>();
_salEps = new std::vector<RdmaSalEndpoint *>();
rocksdb::Options options;
options.create_if_missing = true;
// open a database with a name which corresponds to a file system directory
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb1", &_db);
std::cout << "Rocks started" << std::endl;
if (!status.ok())
{ {
std::cout << status.ToString() << std::endl; CPPLog::LOG_ERROR("CMProcesor : rdma_create_id failed");
exit(1); exit(-1);
} }
_qpSalEndpointMap = new std::unordered_map<uint32_t, RdmaSalEndpoint *>();
_qpRepEndpointMap = new std::unordered_map<uint32_t, RdmaRepEndpoint *>();
} }
void RdmaServerEndpointGroup::createExecutor(int threadSize) void RdmaServerEndpointGroup::setExecutor(Executor *ex)
{ {
_executor = new Executor(threadSize,_qpSalEndpointMap); _executor = ex;
} }
void RdmaServerEndpointGroup::startCmProcessor(bool newThread) void RdmaServerEndpointGroup::startCmProcessor(bool newThread)
{ {
if (newThread) if (newThread == true)
_cmEventThread = new std::thread(&RdmaServerEndpointGroup::processCmEvents, this); _cmEventThread = new std::thread(&RdmaServerEndpointGroup::processCmEvents, this);
else else
processCmEvents(); processCmEvents();
...@@ -48,20 +40,20 @@ void RdmaServerEndpointGroup::processCmEvents() ...@@ -48,20 +40,20 @@ void RdmaServerEndpointGroup::processCmEvents()
{ {
int ret; int ret;
struct rdma_cm_event *event; struct rdma_cm_event *event;
std::cout << "CMProcessor : starting cm processing thread" << std::endl; CPPLog::LOG_INFO("SalEndpointGroup : starting cm processing thread");
while (!_stop) while (!_stopCmProcessor)
{ {
ret = rdma_get_cm_event(_eventChannel, &event); ret = rdma_get_cm_event(_eventChannel, &event);
if (ret) if (ret)
{ {
std::cout << "CMProcesor : rdma_get_cm_event failed" << std::endl; CPPLog::LOG_ERROR("SalEndpointGroup : rdma_get_cm_event failed");
continue; continue;
} }
processCmEvent(event); processCmEvent(event);
ret = rdma_ack_cm_event(event); ret = rdma_ack_cm_event(event);
if (ret) if (ret)
{ {
std::cout << "CMProcesor : rdma_ack_cm_event failed"; CPPLog::LOG_ERROR("SalEndpointGroup : rdma_ack_cm_event failed");
} }
} }
} }
...@@ -69,63 +61,86 @@ void RdmaServerEndpointGroup::processCmEvents() ...@@ -69,63 +61,86 @@ void RdmaServerEndpointGroup::processCmEvents()
void RdmaServerEndpointGroup::bind(const char *ip, const char *port, int backlog) void RdmaServerEndpointGroup::bind(const char *ip, const char *port, int backlog)
{ {
int ret; int ret;
std::cout << "RdmaServerEndpointGroup : Step 2 bind_addr" << std::endl; std::ostringstream ss;
ss<<"RdmaServerEndpointGroup : Step 2 bind_addr"<<ip<<" "<<port<<"\n";
CPPLog::LOG_ALWAYS(ss);
struct addrinfo *addr; struct addrinfo *addr;
ret = getaddrinfo(ip, port, NULL, &addr); ret = getaddrinfo(ip, port, NULL, &addr);
if (ret) if (ret)
{ {
std::cout << "RdmaServerEndpointGroup : get_addr_info failed" << std::endl; CPPLog::LOG_ERROR("RdmaServerEndpointGroup : get_addr_info failed");
} }
ret = rdma_bind_addr(_cm_id, addr->ai_addr); ret = rdma_bind_addr(_cm_id, addr->ai_addr);
if (ret) if (ret)
{ {
std::cout << "RdmaServerEndpointGroup : rdma_bind_addr failed" << std::endl; CPPLog::LOG_ERROR("RdmaServerEndpointGroup : rdma_bind_addr failed");
return; return;
} }
std::cout << "RdmaServerEndpointGroup : rdma_bind_addr successful\n"; CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : rdma_bind_addr successful");
ret = rdma_listen(_cm_id, backlog); ret = rdma_listen(_cm_id, backlog);
if (ret) if (ret)
{ {
std::cout << "RdmaServerEndpointGroup : rdma_listen failed" << std::endl; CPPLog::LOG_ERROR("RdmaServerEndpointGroup : rdma_listen failed");
return; return;
} }
} }
struct ibv_cq *RdmaServerEndpointGroup::createSalCq(struct rdma_cm_id *id) struct ibv_cq *RdmaServerEndpointGroup::createCq(struct rdma_cm_id *id)
{ {
if (_salCqProcessor == NULL) if (_cqProcessor == NULL)
{ {
std::cout << "RdmaServerEndpointGroup : step 5 create salcq processor" << std::endl; CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : step 5 create cq processor");
_salCqProcessor = new RdmaSalCqProcessor(_executor, _cm_id->verbs, _compQueueSize); _cqProcessor = new RdmaCqProcessor(_executor, _cm_id->verbs, _compQueueSize);
_salCqProcessor->start(); _cqProcessor->start();
} }
return _salCqProcessor->getCq(); return _cqProcessor->getCq();
} }
void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event) void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event)
{ {
std::cout << "RdmaServerEndpointGroup : step 4 Got Connect Request Server Endpoint" << std::endl; CPPLog::LOG_ALWAYS("RdmaServerEndpointGroup : step 4 Got Connect Request Sal Endpoint");
/* /*
* create and add to vectors for replication and invalidation processing * create and add to vectors for replication and invalidation processing
* connData is used to identify whether connection came for client or from other Sal;
*/ */
//const char *connData = reinterpret_cast<const char *>(event->param.conn.private_data);
//std::cout << "sal" << std::endl; const char *connData = reinterpret_cast<const char *>(event->param.conn.private_data);
RdmaSalEndpoint *endpoint = new RdmaSalEndpoint(event->id, createSalCq(event->id), _sendQueueSize, _recvQueueSize,
if (strcmp(connData, "sal") == 0)
{
RdmaSalEndpoint *endpoint = nullptr;
endpoint = new RdmaSalEndpoint(event->id, createCq(event->id), _sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize, _db); _sendMsgSize, _recvMsgSize, _db);
event->id->context = (void *)endpoint; event->id->context = (void *)endpoint;
endpoint->createResources(); endpoint->createResources();
std::unique_lock lock(_salMutex); // std::unique_lock lock(_salMutex);
_salEps->push_back(endpoint); std::cout<<"Sal Map Size"<<_qpSalEndpointMap->size()<<"qp "<<event->id->qp->qp_num<<"\n";
std::cout << "qp num" << event->id->qp->qp_num << " " << _qpSalEndpointMap->size();
_qpSalEndpointMap->emplace(event->id->qp->qp_num, endpoint); _qpSalEndpointMap->emplace(event->id->qp->qp_num, endpoint);
std::cout << _qpSalEndpointMap->size() << std::endl;
std::cout<<"Sal Map Size"<<_qpSalEndpointMap->size()<<"\n";
}
else
{
RdmaServerRepEndpoint *endpoint = nullptr;
endpoint = new RdmaServerRepEndpoint(event->id, createCq(event->id), _sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize, _db);
event->id->context = (void *)endpoint;
endpoint->createResources();
// std::unique_lock lock(_salMutex);
_qpRepEndpointMap->emplace(event->id->qp->qp_num, endpoint);
}
rdma_accept(event->id, NULL); rdma_accept(event->id, NULL);
} }
void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{ {
std::cout << "RdmaServerEndpointGroup : event " << rdma_event_str(event->event) << " id " << event->id << " " << std::endl; std::ostringstream ss;
ss << "RdmaServerEndpointGroup : event " << rdma_event_str(event->event);
ss << " id " << event->id;
CPPLog::LOG_ALWAYS(ss);
/* /*
* Connect request came on listener ie endpointgroup * Connect request came on listener ie endpointgroup
* in this listen_id contains cm_id of listener and id contains the new id of connection request * in this listen_id contains cm_id of listener and id contains the new id of connection request
...@@ -134,14 +149,11 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -134,14 +149,11 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{ {
createEpCmEvent(event); createEpCmEvent(event);
} }
/*
* Event came for listener ie endpointgroup
*/
else if (event->id != NULL && _cm_id == event->id) else if (event->id != NULL && _cm_id == event->id)
{ {
if (event->event == RDMA_CM_EVENT_DISCONNECTED) if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{ {
std::cout << "RdmaServerEndpointGroup : Disconnect Server Endpoint" << std::endl; std::cout << "RdmaServerEndpointGroup : Disconnect Sal Endpoint" << std::endl;
close(); close();
} }
else else
...@@ -154,57 +166,82 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -154,57 +166,82 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
*/ */
else if (event->id != NULL && event->id->context != NULL) else if (event->id != NULL && event->id->context != NULL)
{ {
RdmaEndpoint *ep = ((RdmaEndpoint *)event->id->context); /*
ep->processCmEvent(event); * TIMEWAIT_EXIT happens only when client is already closed and we are not able to send
if (event->event == RDMA_CM_EVENT_DISCONNECTED) * rdma_disconnect()
* Since endpoint already closed with RDMA_CM_EVENT_DISCONNECTED
* we need to return for this event
*/
if (event->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
{ {
std::unique_lock lock(_salMutex); CPPLog::LOG_ALWAYS("Recieved TIMEWAIT_EXIT");
auto it = std::find(_salEps->begin(), _salEps->end(), (RdmaSalEndpoint *)ep); return;
if (it != _salEps->end()) }
uint32_t qp = event->id->qp->qp_num;
auto it = _qpSalEndpointMap->find(qp);
if (it != _qpSalEndpointMap->end())
{ {
_salEps->erase(it); ((RdmaSalEndpoint *)event->id->context)->processCmEvent(event);
delete ((RdmaSalEndpoint *)event->id->context);
} }
auto it2 = _qpRepEndpointMap->find(qp);
if (it2 != _qpRepEndpointMap->end())
{
((RdmaServerRepEndpoint *)event->id->context)->processCmEvent(event);
} }
/*if (event->event == RDMA_CM_EVENT_TIMEWAIT_EXIT) if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
// std::unique_lock lock(_salMutex);
auto it = _qpSalEndpointMap->find(qp);
if (it != _qpSalEndpointMap->end())
{
_qpSalEndpointMap->erase(qp);
delete ((RdmaSalEndpoint *)event->id->context);
}
auto it2 = _qpRepEndpointMap->find(qp);
if (it2 != _qpRepEndpointMap->end())
{ {
std::cout<<"des"<<std::endl; _qpRepEndpointMap->erase(qp);
rdma_destroy_id(event->id); delete ((RdmaServerRepEndpoint *)event->id->context);
std::cout<<"des"<<std::endl; }
} }
*/
} }
else else
{ {
std::cout << "RdmaServerEndpointGroup : Not able to procces CM EVent" << rdma_event_str(event->event) << event->id << " " << event->listen_id << std::endl; std::ostringstream ss;
ss << "RdmaServerEndpointGroup : Not able to procces CM EVent";
ss << rdma_event_str(event->event) << event->id << " ";
ss << event->listen_id << std::endl;
CPPLog::LOG_ERROR(ss);
} }
} }
void RdmaServerEndpointGroup::clientClose() void RdmaServerEndpointGroup::close()
{ {
_stop = true; _cqProcessor->close();
//_executor->stop();
delete _cqProcessor;
CPPLog::LOG_ALWAYS("Closing CM Processor");
_stopCmProcessor = true;
if (_cmEventThread != NULL) if (_cmEventThread != NULL)
_cmEventThread->join(); _cmEventThread->join();
delete _cmEventThread;
rdma_destroy_event_channel(_eventChannel); rdma_destroy_event_channel(_eventChannel);
_salCqProcessor->close();
delete _salCqProcessor; for (auto it = _qpSalEndpointMap->begin(); it != _qpSalEndpointMap->end(); it++)
/* for(int i = 0 ;i < _endPoints->size();i++)
{ {
_endPoints->at(i)->close(); ((RdmaSalEndpoint *)it->second)->close();
delete ((RdmaSalEndpoint *)it->second);
} }
delete _endPoints; delete _qpSalEndpointMap;
*/
for (auto it = _qpRepEndpointMap->begin(); it != _qpRepEndpointMap->end(); it++)
{
((RdmaServerRepEndpoint *)it->second)->close();
delete ((RdmaServerRepEndpoint *)it->second);
}
delete _qpRepEndpointMap;
rdma_disconnect(_cm_id); rdma_disconnect(_cm_id);
rdma_destroy_id(_cm_id); rdma_destroy_id(_cm_id);
} }
\ No newline at end of file
void RdmaServerEndpointGroup::close()
{
_stop = true;
if (_cmEventThread != NULL)
_cmEventThread->join();
rdma_destroy_event_channel(_eventChannel);
_salCqProcessor->close();
delete _salCqProcessor;
}
#include "RdmaServerRepEndpoint.hpp"
int RdmaServerRepEndpoint::CONN_STATE_INITIALIZED = 1;
int RdmaServerRepEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2;
int RdmaServerRepEndpoint::CONN_STATE_CONNECTED = 3;
int RdmaServerRepEndpoint::CONN_STATE_CLOSED = 4;
RdmaServerRepEndpoint::RdmaServerRepEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: RdmaRepEndpoint(id,sendQueueSize, recvQueueSize, sendMsgSize ,recvMsgSize ,db),_completionQueue(completionQueue)
{
_state = CONN_STATE_INITIALIZED;
}
void RdmaServerRepEndpoint::createResources()
{
/* These states are used to avoid errors in lifetime of rdma connection
* more erros can be tracked in future using these lifecycle states
*/
if (_state != CONN_STATE_INITIALIZED)
{
CPPLog::LOG_ERROR("RdmaEndpoint : createResource invalid state");
}
// Step 1 to create endpoint
_protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL)
{
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_alloc_pd failed");
return;
}
// step 2 Creating Queue pair with completion queueu setted for send and recieve
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
/*
* Endpoint address is setted in QP context to get endpoint at run time with qp
* without using any map to map qp_num to endpoint
*/
qp_init_attr.qp_context = (void *)this;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr.sq_sig_all = 1;
// completion queue can be shared or you can use distinct completion queues.
qp_init_attr.send_cq = _completionQueue;
qp_init_attr.recv_cq = _completionQueue;
qp_init_attr.qp_type = IBV_QPT_RC;
// increase if you want to keep more send work requests in the SQ.
qp_init_attr.cap.max_send_wr = _sendQueueSize;
// increase if you want to keep more receive work requests in the RQ.
qp_init_attr.cap.max_recv_wr = _recvQueueSize;
// increase if you allow send work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_send_sge = 1;
// increase if you allow receive work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_recv_sge = 1;
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : ibv_create_cq failed");
}
if (_cm_id->pd == NULL)
{
CPPLog::LOG_ERROR("RdmaEndpoint : pd not set");
_cm_id->pd = _protectionDomain;
}
/*
* Step 3 register memory for send and recv queue
*/
_sendBuff = new char[(_sendMsgSize * _sendQueueSize)];
if (_sendBuff == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : sendBuff allocation failed");
_sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : sendMr reg failed");
_recvBuff = new char[(_recvMsgSize * _recvQueueSize)];
if (_recvBuff == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : recvBuff allocation failed");
_recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL)
CPPLog::LOG_ERROR("RdmaEndpoint : recvMr reg failed");
/*
* adding buffers for recving rdma data
*/
for (int i = 0; i < _recvQueueSize; i++)
{
char *location = _recvBuff + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr);
}
/*
* Adding buffers to queue for receving data
*/
for (int i = 0; i < _sendQueueSize; i++)
{
char *location = _sendBuff + i * _sendMsgSize;
std::unique_lock<std::mutex> lock(_sendBuffersM);
_sendBuffers.push((char *)location);
}
_state = CONN_STATE_RESOURCES_ALLOCATED;
}
void RdmaServerRepEndpoint::processCmEvent(struct rdma_cm_event *event)
{
std::ostringstream ss;
ss << "RdmaEndpoint : Event " << rdma_event_str(event->event);
CPPLog::LOG_ALWAYS(ss);
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
CPPLog::LOG_ALWAYS("RdmaEndpoint : Connect request");
}
else if (event->event == RDMA_CM_EVENT_ESTABLISHED)
{
if (_state != CONN_STATE_RESOURCES_ALLOCATED)
{
CPPLog::LOG_ERROR("RdmaEndpoint : Established_Event but resource not alloted");
}
CPPLog::LOG_INFO("RdmaEndpoint : step 6 Connected");
_state = CONN_STATE_CONNECTED;
}
else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
CPPLog::LOG_INFO("RdmaEndpoint : step 7 disconnected");
close();
}
}
void RdmaServerRepEndpoint::close()
{
if (_state < CONN_STATE_RESOURCES_ALLOCATED)
{
return;
}
CPPLog::LOG_INFO("RdmaEndpoint : closing connection");
int ret;
ret = rdma_disconnect(_cm_id);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_disconnect failed");
}
ret = rdma_dereg_mr(_sendMr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr send failed");
}
delete[] _sendBuff;
ret = rdma_dereg_mr(_recvMr);
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_dereg_mr recv failed");
}
delete[] _recvBuff;
rdma_destroy_qp(_cm_id);
CPPLog::LOG_INFO("des qp");
// rdma_destroy_id(_cm_id);
// ret = rdma_destroy_id(_cm_id);
CPPLog::LOG_INFO("des mr");
if (ret)
{
CPPLog::LOG_ERROR("RdmaEndpoint : rdma_destroy_id failed");
}
_state = CONN_STATE_CLOSED;
CPPLog::LOG_INFO("closed");
}
#include "TaskThread.hpp" #include "TaskThread.hpp"
#include "MessageFormats.hpp"
TaskThread::TaskThread(int id, int cpu, ConcurrentQueue<struct ibv_wc *> *taskqueue, std::unordered_map<uint32_t, RdmaSalEndpoint *> *qpSalEndpointMap) TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue,
: _id(id), _qpSalEndpointMap(qpSalEndpointMap) std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap,
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap)
: _id(id), _clientRepMap(clientRepMap), _serverRepMap(serverRepMap), _salMap(salMap)
{ {
_taskQueue = taskqueue; _taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this)) if (pthread_create(&thread, NULL, &TaskThread::run, this))
{ {
std::cout << "pthread create has been failed while creating taskthread " << std::endl; CPPLog::LOG_ERROR("pthread create has been failed while creating taskthread");
exit(0); exit(0);
} }
cpu_set_t cpuset; cpu_set_t cpuset;
CPU_ZERO(&cpuset); CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset); CPU_SET(cpu, &cpuset);
std::ostringstream ss;
ss << "New Thread Setting CPU affinty " << cpu;
CPPLog::LOG_ALWAYS(ss);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0) if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0)
{ {
std::cerr << "Error calling pthread_setaffinity_np: " CPPLog::LOG_ERROR("Error calling pthread_setaffinity_np ");
<< "\n";
}
}
TaskThread::TaskThread(int id, ConcurrentQueue<struct ibv_wc *> *taskqueue, std::unordered_map<uint32_t, RdmaSalEndpoint *> *qpSalEndpointMap)
: _id(id), _qpSalEndpointMap(qpSalEndpointMap)
{
_taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this))
{
std::cout << "pthread create has been failed while creating taskthread " << std::endl;
exit(0);
} }
pthread_setname_np(thread, "TaskThread");
} }
TaskThread::~TaskThread() TaskThread::~TaskThread()
{ {
std::cout << "Task Destructed" << std::endl; CPPLog::LOG_INFO("TaskThread Destructed");
stop(); stop();
} }
void TaskThread::stop() void TaskThread::stop()
{ {
_stop = true; _stop = true;
if (pthread_join(thread, NULL) == 0) if (pthread_join(thread, NULL) == 0)
{ {
std::cout << "pthread join failed" << std::endl; CPPLog::LOG_ERROR("pthread join failed");
} }
} }
inline void *TaskThread::run(void *object) inline void *TaskThread::run(void *object)
{ {
/*
* This is because data from pthread_create is passed as void*
*/
TaskThread *thread = reinterpret_cast<TaskThread *>(object); TaskThread *thread = reinterpret_cast<TaskThread *>(object);
std::cout << "running task thread" << thread->_id << std::endl; std::ostringstream ss;
ss<<"Running task thread"<<thread->_id;
CPPLog::LOG_ALWAYS(ss);
while (!thread->_stop) while (!thread->_stop)
{ {
struct ibv_wc *data = NULL; struct ibv_wc *data = NULL;
thread->_taskQueue->wait_and_pop(data); std::cout << "Get start\n";
thread->processEvent(data); data = thread->_taskQueue->try_pop();
if (data != NULL)
{
std::cout << "TaskThread:: got data";
if (data == NULL || data->status != IBV_WC_SUCCESS)
{
std::ostringstream ss;
ss << "TaskThread : failed work completion : ";
ss << ibv_wc_status_str(data->status) << " on qp " << data->qp_num;
CPPLog::LOG_ERROR(ss);
continue;
}
RdmaSalEndpoint *salEp = nullptr;
RdmaRepEndpoint *repEp = nullptr;
auto it = thread->_salMap->find(data->qp_num);
if (it != thread->_salMap->end())
{
salEp = it->second;
}
auto it2 = thread->_clientRepMap->find(data->qp_num);
if (it2 != thread->_clientRepMap->end())
{
repEp = it2->second;
}
else
{
auto it3 = thread->_serverRepMap->find(data->qp_num);
if(it3 != thread->_serverRepMap->end())
repEp = it3->second;
}
if (salEp == nullptr && repEp == nullptr)
{
std::ostringstream ss;
ss<<"RdmaSal : endpoint not registered for qp"<<data->qp_num;
CPPLog::LOG_INFO(ss);
continue;
}
else if (salEp != nullptr)
thread->processEvent(salEp, data);
else if (repEp != nullptr)
thread->processRepEvent(repEp, data);
thread->_taskQueue->removeFromSet(data);
delete data;
}
} }
return NULL; return NULL;
} }
void TaskThread::processEvent(struct ibv_wc *data) void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
{ {
if (data == NULL || data->status != IBV_WC_SUCCESS) std::cout<<"processing sal event\n";
/* sal Request*/
switch (data->opcode)
{ {
std::cout << "TaskThread : failed work completion : "; case IBV_WC_SEND:
std::cout << ibv_wc_status_str(data->status) << " on qp " << data->qp_num << std::endl; ep->processSendCompletion(data);
return; break;
} case IBV_WC_RECV:
/*
* Process Request from client
*/
auto it = _qpSalEndpointMap->find(data->qp_num);
if (it == _qpSalEndpointMap->end())
{ {
std::cout << data->qp_num << "RdmaSal : endpoint not registered for qp num\n"; // it->second->processRecvCompletion(data);
return; char *buffer = new char[data->byte_len];
memcpy(buffer, (void *)data->wr_id, data->byte_len);
rdma_post_recv(ep->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
ep->_recvMsgSize, ep->_recvMr);
struct MessageHeader *req = (struct MessageHeader *)buffer;
std::cout << "TaskThread 1\n";
switch (req->type)
{
case MessageType::GET:
ep->processGet(req);
break;
case MessageType::DELETE:
replicateSalRequest(buffer, data->byte_len);
ep->processDelete(req);
break;
case MessageType::PUT:
replicateSalRequest(buffer, data->byte_len);
ep->processPut(req);
break;
default:
CPPLog::LOG_ERROR("SalRequest invalid req type");
break;
}
delete[] buffer;
}
break;
default:
std::ostringstream ss;
ss << "TaskThread default opcode : " << data->opcode;
CPPLog::LOG_INFO(ss);
break;
} }
}
void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
{
switch (data->opcode) switch (data->opcode)
{ {
case IBV_WC_SEND: case IBV_WC_SEND:
it->second->processSendCompletion(data); ep->processSendCompletion(data);
break; break;
case IBV_WC_RECV: case IBV_WC_RECV:
{ {
// it->second->processRecvCompletion(data); // it->second->processRecvCompletion(data);
char *buffer = new char[data->byte_len]; char *buffer = new char[data->byte_len];
memcpy(buffer, (void *)data->wr_id, data->byte_len); memcpy(buffer, (void *)data->wr_id, data->byte_len);
rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id, rdma_post_recv(ep->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
it->second->_recvMsgSize, it->second->_recvMr); ep->_recvMsgSize, ep->_recvMr);
struct SalRequestHeader *req = (struct SalRequestHeader *)buffer; struct MessageHeader *req = (struct MessageHeader *)buffer;
std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize; std::cout << "TaskThread 1\n";
std::cout << " " << req->type << "size" << data->byte_len << "\n";
switch (req->type) switch (req->type)
{ {
case RequestType::GET: case MessageType::GET:
it->second->processGet(req); ep->processGet(req);
break; break;
case RequestType::DELETE: case MessageType::DELETE:
std::cout<<"TaskThread:: incorrect delete request from client to follower"<<std::endl; ep->processDelete(req);
break; break;
case RequestType::PUT: case MessageType::PUT:
std::cout<<"TaskThread:: incorrect put request from client to follower"<<std::endl; ep->processPut(req);
break; break;
default: default:
std::cout << "SalRequest invalid req type"; CPPLog::LOG_ERROR("SalRequest invalid req type");
break; break;
} }
delete[] buffer; delete[] buffer;
} }
break; break;
case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n";
break;
case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n";
break;
default: default:
std::cout << "TaskThread default opcode : " << data->opcode << std::endl; std::ostringstream ss;
ss << "TaskThread default opcode : " << data->opcode;
CPPLog::LOG_INFO(ss);
break; break;
} }
} }
void TaskThread::replicateSalRequest(char *req, uint32_t size)
{
auto repIt = _clientRepMap->begin();
for (; repIt != _clientRepMap->end(); repIt++)
{
repIt->second->sendMessage(req, size);
}
auto serverRepIt = _serverRepMap->begin();
for (; serverRepIt != _serverRepMap->end(); serverRepIt++)
{
serverRepIt->second->sendMessage(req, size);
}
MessageHeader *salReq = (MessageHeader *)req;
char *buffer = new char[MessageHeaderSize + salReq->keySize];
MessageHeader *invRequest = (MessageHeader *)(buffer);
invRequest->type = MessageType::INVALIDATE;
invRequest->id = salReq->id;
invRequest->keySize = salReq->keySize;
memcpy(buffer + MessageHeaderSize, salReq + MessageHeaderSize, salReq->keySize);
// Send Invalidation to sal's
auto salIt = _salMap->begin();
for (;salIt != _salMap->end();salIt++)
{
salIt->second->sendMessage(buffer, MessageHeaderSize + salReq->keySize);
}
delete[] buffer;
}
\ No newline at end of file
#include <iostream> #include <iostream>
#include "RdmaServerEndpointGroup.hpp" #include "RdmaServerEndpointGroup.hpp"
#include "RdmaClientEndpointGroup.hpp" #include "RdmaRepEndpointGroup.hpp"
#include "RdmaEndpoint.hpp" #include "RdmaEndpoint.hpp"
#include "Executor.hpp" #include "Executor.hpp"
#include "Properties.hpp"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
int connectToRepServer(Properties &prop, RdmaRepEndpointGroup* repGroup)
{
int followers = stoi(prop.getValue("FOLLOWERS"));
std::cout<<"followers "<<followers<<"\n";
RdmaClientRepEndpoint* clientEPs[followers];
for(int i = 0 ;i< followers ;i++)
{
clientEPs[i] = repGroup->createEndpoint();
std::string ip = prop.getValue("FOLLOWER"+std::to_string(i+1)+"_IP");
std::string port = prop.getValue("FOLLOWER"+std::to_string(i+1)+"_PORT");
std::cout<<"Connecting to follower "<<ip<<":"<<port<<"\n";
clientEPs[i]->connect(ip.c_str(), port.c_str(), "fol");
}
return 0;
}
int main() int main()
{ {
RdmaClientEndpointGroup *clgroup = new RdmaClientEndpointGroup(5, 5, 5, 50, 50, 0, 1000); std::cout << "Starting Server Main Thread\n";
clgroup->start(); Properties prop("prop.config");
RdmaClientEndpoint *clientEp = clgroup->createEndpoint(); int sendQS = stoi(prop.getValue("sendQS"));
clientEp->connect("192.168.200.20", "1921", "fol"); int recvQS = stoi(prop.getValue("recvQS"));
while (!clientEp->isConnected()); int compQS = stoi(prop.getValue("compQS"));
std::cout << "client : connected" << std::endl; int sendMS = stoi(prop.getValue("sendMS"));
int recvMS = stoi(prop.getValue("recvMS"));
std::string dbpath = prop.getValue("DB_PATH");
std::string serverIP = prop.getValue("SERVER_IP");
std::string serverPort = prop.getValue("SERVER_PORT");
int executorPoolSize = stoi(prop.getValue("EXECUTOR_POOL_SIZE"));
int enableLogging = stoi(prop.getValue("ENABLE_LOGGING"));
if (enableLogging == 0)
CPPLog::Logger::getInstance()->updateLogLevel(CPPLog::DISABLE_LOG);
rocksdb::DB *db;
rocksdb::Options options;
options.create_if_missing = true;
// open a database with a name which corresponds to a file system directory
rocksdb::Status status = rocksdb::DB::Open(options, dbpath, &db);
if (!status.ok())
{
CPPLog::LOG_ERROR(status.ToString().c_str());
exit(1);
}
CPPLog::LOG_ALWAYS("Rocks started");
Executor *executor = new Executor(executorPoolSize);
RdmaRepEndpointGroup *repgroup = new RdmaRepEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, 1, 100000, db);
RdmaServerEndpointGroup *sgroup = new RdmaServerEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, db);
executor->createThreads(repgroup->_qpRepEndpointMap, sgroup->_qpRepEndpointMap, sgroup->_qpSalEndpointMap);
repgroup->setExecutor(executor);
sgroup->setExecutor(executor);
repgroup->startCmProcessor();
sgroup->bind(serverIP.c_str(), serverPort.c_str(), 2);
std::thread t(connectToRepServer, std::ref(prop), (repgroup));
RdmaServerEndpointGroup *sgroup = new RdmaServerEndpointGroup(5, 5, 5, 50, 50);
sgroup->createExecutor(4);
sgroup->bind("192.168.200.20", "1920", 2);
sgroup->startCmProcessor(false); sgroup->startCmProcessor(false);
std::cout << "rhdhj" << std::endl; std::cout << "rhdhj" << std::endl;
while (1) while (1)
; ;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment