Commit 2e661de3 authored by p's avatar p

Added Intial followers code

parent 33075e8b
SRCS := $(shell ls src/)
SRC_DIR := src
OBJS := $(SRCS:.cpp=.o)
BUILD_DIR := .build
OBJS := $(addprefix $(BUILD_DIR)/, $(OBJS))
CXX = g++
CXXFLAGS += -O3 -Wall -std=c++17 -I header
CXXFLAGS += -g
LIBS += -libverbs
LIBS += -lrdmacm
LIBS += -pthread
LIBS += -lrocksdb
Target := fol
.phony = clean
$(BUILD_DIR)/%.o: $(SRC_DIR)/%.cpp
$(CXX) -o $@ $(CXXFLAGS) -c $< $(LIBS)
$(Target) : $(OBJS) | $(BUILD_DIR)
$(CXX) -o $@ $^ $(LIBS)
$(BUILD_DIR) :
mkdir -p $@
\ No newline at end of file
File added
#ifndef __ConQueue__
#define __ConQueue__
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename Data>
class ConcurrentQueue
{
private:
std::queue<Data> queue;
std::mutex queueMutex;
std::condition_variable queueCv;
public:
void push(Data const &data)
{
std::unique_lock<std::mutex> lock(queueMutex);
queue.push(data);
lock.unlock();
queueCv.notify_one();
}
bool empty()
{
std::unique_lock<std::mutex> lock(queueMutex);
return queue.empty();
}
bool try_pop(Data &popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
if (queue.empty())
{
return false;
}
popped_value = queue.front();
queue.pop();
return true;
}
void wait_and_pop(Data &popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCv.wait(lock, [&]{return queue.size() > 0;});
popped_value = queue.front();
queue.pop();
}
};
#endif
\ No newline at end of file
#ifndef __Executor__
#define __Executor__
#include <vector>
#include <map>
#include "CqEventData.hpp"
#include "ConcurrentQueue.hpp"
#include "TaskThread.hpp"
#include "RdmaSalEndpoint.hpp"
class Executor
{
int _size{0};
std::vector<TaskThread *> *_taskThreads{NULL};
ConcurrentQueue<struct ibv_wc *> *_taskQueue{NULL};
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap{NULL};
public:
Executor(int size,std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap);
void submit(struct ibv_wc *task);
void getTask();
};
//long affinities[]
#endif
\ No newline at end of file
#ifndef __MessageFormats__
#define __MessageFormats__
enum RequestType
{
GET,
PUT,
DELETE,
INVALIDATE
};
struct __attribute__ ((__packed__)) SalRequest
{
uint32_t id;
enum RequestType type;
uint32_t keySize;
uint32_t valueSize;
};
struct __attribute__ ((__packed__)) SalResponse
{
//private:
uint32_t id;
enum RequestType type;
//public:
uint32_t size;
};
struct __attribute__ ((__packed__)) InvRequest
{
//private:
uint32_t id;
enum RequestType type;
//public:
uint32_t keySize;
};
static uint32_t SUCCESS = 0;
static uint32_t FAILURE = 1;
static int32_t SalRequestHeaderSize = sizeof(SalRequest);
static int32_t SalResponseSize = sizeof(SalResponse);
static uint32_t InvRequestHeaderSize = sizeof(InvRequest);
#endif
\ No newline at end of file
#ifndef __RDMAENDPOINT__
#define __RDMAENDPOINT__
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <netdb.h>
#include <arpa/inet.h>
#include <map>
#include <vector>
#include <mutex>
#include <shared_mutex>
#include "CqEventData.hpp"
class RdmaEndpoint
{
public:
static int CONN_STATE_INITIALIZED;
static int CONN_STATE_RESOURCES_ALLOCATED;
static int CONN_STATE_CONNECTED;
static int CONN_STATE_CLOSED;
struct rdma_cm_id *_cm_id{NULL};
struct ibv_cq *_completionQueue{NULL};
struct ibv_pd *_protectionDomain{NULL};
int _sendQueueSize{0};
int _recvQueueSize{0};
int _sendMsgSize{0};
int _recvMsgSize{0};
int _state{0};
char *_sendBuff{NULL};
char *_recvBuff{NULL};
struct ibv_mr *_sendMr{NULL};
struct ibv_mr *_recvMr{NULL};
boost::lockfree::queue<void *> *_sendBuffers{NULL};
RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize);
void createResources();
void processCmEvent(struct rdma_cm_event *event);
void clientClose();
virtual void processSendCompletion(struct ibv_wc *data) = 0;
virtual void processRecvCompletion(struct ibv_wc *data) = 0;
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
#include <thread>
#include <unordered_map>
#ifndef __RDMAREPCQPROCESSOR__
#define __RDMAREPCQPROCESSOR__
#include "Executor.hpp"
class RdmaRepCqProcessor
{
public:
struct ibv_comp_channel *_compChannel{NULL};
struct ibv_cq *_completionQueue{NULL};
std::thread *_compQueueThread{NULL};
bool _stop{false};
Executor *_executor{NULL};
RdmaRepCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize)
: _executor(ex)
{
_compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL)
{
std::cout << "CqProcessr : ibv_create_comp_channel failed\n";
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl;
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
}
}
struct ibv_cq *getCq()
{
return _completionQueue;
}
void start()
{
std::cout << "CqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaRepCqProcessor::processCQEvents, this);
}
void processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc *wc_array = new struct ibv_wc[nevent];
while (!_stop)
{
ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1)
{
std::cout << "CqProcessr : ibv_get_cq_event failed\n";
close();
}
ibv_ack_cq_events(cq, 1);
ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
close();
}
ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0)
{
std::cout << "CqProcessr : ibv_poll_cq failed\n";
close();
}
if (ret == 0)
continue;
for (int i = 0; i < ret; i++)
{
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
//data->vendor_err = 1;
//_executor->submit(data);
new std::thread(&RdmaRepCqProcessor::processRepEvent, this,data);
}
//_executor->dispatchRepCqEvents(wc_array, ret);
}
}
void processRepEvent(struct ibv_wc* data)
{
std::cout<<"procesing Replication request"<<std::endl;
}
void close()
{
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
}
};
#endif
#ifndef __RDMASERVERENDPOINT__
#define __RDMASERVERENDPOINT__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <errno.h>
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include "RdmaEndpoint.hpp"
#include "CqEventData.hpp"
#include <rocksdb/db.h>
class RdmaReplicationEndpoint : public RdmaEndpoint
{
rocksdb::DB *_db;
std::atomic<uint64_t> _requestId{12};
public:
RdmaReplicationEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db);
void processCqEvent(struct ibv_wc wc);
void processSendCompletion(struct ibv_wc* data);
void processRecvCompletion(struct ibv_wc* data);
int sendMessage(const char *buffer, uint32_t size);
void close();
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
#include <thread>
#include <unordered_map>
#ifndef __RDMASALCQPROCESSOR__
#define __RDMASALCQPROCESSOR__
#include "Executor.hpp"
class RdmaSalCqProcessor
{
public:
struct ibv_comp_channel *_compChannel{NULL};
struct ibv_cq *_completionQueue{NULL};
std::thread *_compQueueThread{NULL};
bool _stop{false};
Executor *_executor{NULL};
RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize);
struct ibv_cq *getCq();
void start();
void processCQEvents();
void close();
};
#endif
#ifndef __RdmaInvEndpoint__
#define __RdmaInvEndpoint__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <errno.h>
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include "RdmaEndpoint.hpp"
#include "MessageFormats.hpp"
#include <rocksdb/db.h>
class RdmaSalEndpoint : public RdmaEndpoint
{
public:
rocksdb::DB *_db;
RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db);
void processCqEvent(struct ibv_wc wc);
void processSendCompletion(struct ibv_wc *data);
void processRecvCompletion(struct ibv_wc *data);
void processDelete(struct SalRequest *);
void processGet(struct SalRequest *req);
void processPut(struct SalRequest *req);
int sendMessage(const char *buffer, uint32_t size);
void close();
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <netdb.h>
#include <arpa/inet.h>
#include <map>
#include <vector>
#include <mutex>
#include <shared_mutex>
#ifndef __RDMASERVERENDPOINTGROUP__
#define __RDMASERVERENDPOINTGROUP__
#include "RdmaReplicationEndpoint.hpp"
#include "RdmaSalEndpoint.hpp"
#include "CqEventData.hpp"
#include "Executor.hpp"
#include "RdmaSalCqProcessor.hpp"
#include "RdmaRepCqProcessor.hpp"
class RdmaServerEndpointGroup
{
/*
* Variables to maintain Group state
*/
static int CONN_STATE_INITIALIZED;
static int CONN_STATE_BINDED;
static int CONN_STATE_CONNECTED;
static int CONN_STATE_CLOSED;
bool _stop{false};
std::thread *_cmEventThread{NULL};
struct rdma_event_channel *_eventChannel{NULL};
Executor *_executor;
struct rdma_cm_id *_cm_id{NULL};
RdmaSalCqProcessor *_salCqProcessor{NULL};
std::vector<RdmaSalEndpoint *> *_salEps{NULL};
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap{NULL};
/*
* variables to maintain queue state
*/
int _sendQueueSize{0};
int _recvQueueSize{0};
int _compQueueSize{0};
int _sendMsgSize{0};
int _recvMsgSize{0};
rocksdb::DB *_db;
mutable std::shared_mutex _salMutex;
void clientClose();
public:
RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, int recvMsgSize);
void bind(const char *ip, const char *port, int backlog);
struct ibv_cq *createSalCq(struct rdma_cm_id *id);
void dispatchSalCqEvents(ibv_wc wc[], int size);
void createExecutor(int threadSize);
/*
* Sending false will run cmProcessor on calling thread used to save creation of new thread
* when main thread can also do same work
*/
void startCmProcessor(bool newThread);
void processCmEvents();
void processCmEvent(struct rdma_cm_event *event);
void createEpCmEvent(struct rdma_cm_event *event);
void close();
};
#endif
\ No newline at end of file
#ifndef __Runnable__
#define __Runnable__
#include <iostream>
class Runnable
{
public:
virtual void run() = 0;
};
#endif
\ No newline at end of file
#ifndef __TaskThread__
#define __TaskThread__
#include "Runnable.hpp"
#include "CqEventData.hpp"
#include "ConcurrentQueue.hpp"
#include <pthread.h>
#include <iostream>
#include <queue>
#include <map>
#include "RdmaSalEndpoint.hpp"
class TaskThread
{
private:
ConcurrentQueue<struct ibv_wc *> *_taskQueue;
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap;
bool _stop{false};
int _id;
pthread_t thread;
public:
TaskThread(int id, int cpu, ConcurrentQueue<struct ibv_wc *> *, std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap);
TaskThread(int id, ConcurrentQueue<struct ibv_wc *> *, std::unordered_map<uint32_t, RdmaSalEndpoint *> *_qpSalEndpointMap);
void replicateSalRequest(char *salRequest, uint32_t size);
static void *run(void *object);
void stop();
void processEvent(struct ibv_wc *data);
~TaskThread();
};
#endif
\ No newline at end of file
// https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/
// https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html
// https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.
//https://docs.microsoft.com/en-us/cpp/cpp/delegating-constructors?view=msvc-170
https://www.toptal.com/c-plus-plus/c-plus-plus-understanding-compilation
/*
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
int rc = pthread_setaffinity_np(threads[i].native_handle(),
sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
std::cerr << "Error calling pthread_setaffinity_np: " << rc << "\n";
}
*/
\ No newline at end of file
#include "Executor.hpp"
Executor::Executor(int size,std::unordered_map<uint32_t, RdmaSalEndpoint *> *qpSalEndpointMap)
: _size(size), _qpSalEndpointMap(qpSalEndpointMap)
{
_taskQueue = new ConcurrentQueue<struct ibv_wc *>();
_taskThreads = new std::vector<TaskThread *>();
_taskThreads->reserve(size);
for (int i = 0; i < _size; i++)
{
TaskThread *thread = new TaskThread(i, _taskQueue, _qpSalEndpointMap);
_taskThreads->push_back(thread);
}
}
void Executor::submit(struct ibv_wc *task)
{
_taskQueue->push(task);
}
void Executor::getTask()
{
}
\ No newline at end of file
#include "RdmaEndpoint.hpp"
int RdmaEndpoint::CONN_STATE_INITIALIZED = 1;
int RdmaEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 2;
int RdmaEndpoint::CONN_STATE_CONNECTED = 3;
int RdmaEndpoint::CONN_STATE_CLOSED = 4;
RdmaEndpoint::RdmaEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize)
: _cm_id(id), _completionQueue(completionQueue), _sendQueueSize(sendQueueSize),
_recvQueueSize(recvQueueSize), _sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize)
{
_state = CONN_STATE_INITIALIZED;
_sendBuffers = new boost::lockfree::queue<void*>(_sendMsgSize);
}
void RdmaEndpoint::createResources()
{
if (_state != CONN_STATE_INITIALIZED)
{
std::cout << "RdmaEndpoint : createResource invalid satte" << std::endl;
}
_protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL)
{
std::cout << "RdmaEndpoint : ibv_alloc_pd failed " << std::endl;
return;
}
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
//This is used to set endpoint address with qp
qp_init_attr.qp_context = (void *)this;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr.sq_sig_all = 1;
// completion queue can be shared or you can use distinct completion queues.
qp_init_attr.send_cq = _completionQueue;
qp_init_attr.recv_cq = _completionQueue;
qp_init_attr.qp_type = IBV_QPT_RC;
// increase if you want to keep more send work requests in the SQ.
qp_init_attr.cap.max_send_wr = _sendQueueSize;
// increase if you want to keep more receive work requests in the RQ.
qp_init_attr.cap.max_recv_wr = _recvQueueSize;
// increase if you allow send work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_send_sge = 1;
// increase if you allow receive work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_recv_sge = 1;
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret)
{
std::cout << "RdmaEndpoint : ibv_create_cq failed\n";
}
if (_cm_id->pd == NULL)
{
std::cout << "RdmaEndpoint : pd not set" << std::endl;
_cm_id->pd = _protectionDomain;
}
_sendBuff = (char*)malloc(_sendMsgSize * _sendQueueSize);
if (_sendBuff == NULL)
std::cout << "RdmaEndpoint : sendBuff malloc failed" << std::endl;
_recvBuff = (char*)malloc(_recvMsgSize * _recvQueueSize);
_sendMr = rdma_reg_write(_cm_id, reinterpret_cast<void *>(_sendBuff), _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL)
std::cout << "RdmaEndpoint : sendMr reg failed" << std::endl;
if (_recvBuff == NULL)
std::cout << "RdmaEndpoint : recvBuff malloc failed" << std::endl;
_recvMr = rdma_reg_read(_cm_id, reinterpret_cast<void *>(_recvBuff), _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL)
std::cout << "RdmaEndpoint : recvMr reg failed" << std::endl;
for (int i = 0; i < _recvQueueSize; i++)
{
char *const location = _recvBuff + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr);
}
for (int i = 0; i < _sendQueueSize; i++)
{
void* const location = _sendBuff + i * _sendMsgSize;
_sendBuffers->push(location);
}
_state = CONN_STATE_RESOURCES_ALLOCATED;
}
void RdmaEndpoint::processCmEvent(struct rdma_cm_event *event)
{
std::cout << "RdmaEndpoint : Event " << rdma_event_str(event->event) << std::endl;
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST)
{
std::cout << "RdmaEndpoint : Connect request";
}
else if (event->event == RDMA_CM_EVENT_ESTABLISHED)
{
if (_state != CONN_STATE_RESOURCES_ALLOCATED)
{
std::cout << "RdmaEndpoint : EstablishedEvent invalid state " << std::endl;
}
std::cout << "RdmaEndpoint : step 6 Connected" << std::endl;
_state = CONN_STATE_CONNECTED;
}
else if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
std::cout << "RdmaEndpoint : step 7 disconnected" << std::endl;
clientClose();
}
}
void RdmaEndpoint::clientClose()
{
if (_state != CONN_STATE_CONNECTED)
{
std::cout << "RdmaEndpoint : clientClose invalid state" << std::endl;
return;
}
std::cout<<"RdmaEndpoint : closing connection qp "<<_cm_id->qp->qp_num<< std::endl;
int ret;
ret = rdma_disconnect(_cm_id);
if (ret)
{
std::cout << "RdmaEndpoint : rdma_disconnect failed" << std::endl;
}
ret = rdma_dereg_mr(_sendMr);
if (ret)
{
std::cout << "RdmaEndpoint : rdma_dereg_mr send failed" << std::endl;
}
free(_sendBuff);
ret = rdma_dereg_mr(_recvMr);
if (ret)
{
std::cout << "RdmaEndpoint : rdma_dereg_mr recv failed" << std::endl;
}
free(_recvBuff);
rdma_destroy_qp(_cm_id);
std::cout<<"des qp"<<std::endl;
// ret = rdma_destroy_id(_cm_id);
std::cout<<"des mr"<<std::endl;
if (ret)
{
std::cout << "RdmaEndpoint : rdma_destroy_id failed" << std::endl;
}
_state = CONN_STATE_CLOSED;
std::cout<<"closed"<<std::endl;
}
#include "RdmaRepCqProcessor.hpp"
\ No newline at end of file
#include "RdmaReplicationEndpoint.hpp"
RdmaReplicationEndpoint::RdmaReplicationEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *db)
: RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize)
,_db(db)
{
}
void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data)
{
std::cout << "send completion\n";
_sendBuffers->push((void *)data->wr_id);
}
void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data)
{
std::cout << "recv completion\n";
std::cout << "recieve" << (char *)(data->wr_id) << "\n";
char* request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id,data->byte_len);
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
}
int RdmaReplicationEndpoint::sendMessage(const char *buffer, uint32_t size)
{
if (size > _sendMsgSize)
return -1;
void* sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr)
return -1;
memcpy(sendBuffer, buffer, size);
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0);
}
#include "RdmaSalCqProcessor.hpp"
RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize)
: _executor(ex)
{
_compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL)
{
std::cout << "SalCqProcessr : ibv_create_comp_channel failed\n";
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
std::cout << "SalCqProcessr : ibv_create_cq failed" << std::endl;
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "SalCqProcessr : ibv_req_notify_cq failed\n";
}
}
struct ibv_cq *RdmaSalCqProcessor::getCq()
{
return _completionQueue;
}
void RdmaSalCqProcessor::start()
{
std::cout << "SalCqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaSalCqProcessor::processCQEvents, this);
}
void RdmaSalCqProcessor::processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc wc_array[nevent];
while (!_stop)
{
/*
* get_CQ_event is a blocking call and it wait save some cpu cycles but.
* it might not be that efficient compared to polling
*/
ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1)
{
std::cout << "SalCqProcessr : ibv_get_cq_event failed\n";
close();
}
ibv_ack_cq_events(cq, 1);
/*
* Create a request for next completion cycle
*/
ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "SalCqProcessr : ibv_req_notify_cq failed\n";
close();
}
ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0)
{
std::cout << "SalCqProcessr : ibv_poll_cq failed\n";
close();
}
if (ret == 0)
continue;
for (int i = 0; i < ret; i++)
{
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
/*
* vendor_err is set to check whether the request came from sal or followers
* data->vendor_err = 0;
*/
_executor->submit(data);
}
// _executor->dispatchSalCqEvents(wc_array, ret);
}
}
void RdmaSalCqProcessor::close()
{
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
}
\ No newline at end of file
#include "RdmaSalEndpoint.hpp"
#include "MessageFormats.hpp"
RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *db)
: RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize)
,_db(db)
{
}
void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data)
{
std::cout << "send completion\n";
_sendBuffers->push((void *)data->wr_id);
}
int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size)
{
if (size > _sendMsgSize)
return -1;
void* sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr)
return -1;
memcpy(sendBuffer, buffer, size);
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0);
}
void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data)
{
char *request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id, data->byte_len);
struct SalRequest *req = (struct SalRequest *)request;
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize << " " << req->type << "size" << data->byte_len << "\n";
if (req->type == RequestType::DELETE)
processDelete(req);
if (req->type == RequestType::GET)
processGet(req);
if (req->type == RequestType::PUT)
processPut(req);
delete[] request;
}
void RdmaSalEndpoint::processDelete(struct SalRequest *req)
{
std::cout<<"0\n";
rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize});
std::cout<<"1\n";
void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf);
if (sendBuf == nullptr)
{
return;
}
char* sendBuffer = (char*)sendBuf;
std::cout<<"2 "<<req->id<<"\n";
memcpy(sendBuffer, &(req->id), sizeof(uint32_t));
if (s.ok())
{
std::cout<<"33\n";
memcpy(sendBuffer+4, &SUCCESS, sizeof(int));
std::cout<<"44\n";
}
else
{
std::cout<<"331\n";
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
std::cout<<"441\n";
}
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
std::cout<<"3\n";
}
void RdmaSalEndpoint::processGet(struct SalRequest *req)
{
std::string value;
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, &value);
void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf);
if (sendBuf == nullptr)
{
return;
}
char* sendBuffer = (char*)sendBuf;
memcpy(sendBuffer, &(req->id), sizeof(uint32_t));
if (s.ok())
{
memcpy(sendBuffer+4, &SUCCESS, sizeof(int));
memcpy(sendBuffer+8, (void *)value.size(), sizeof(value.size()));
memcpy(sendBuffer+12, value.c_str(), value.size());
}
else
{
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
}
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
}
void RdmaSalEndpoint::processPut(struct SalRequest *req)
{
rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize},
{(char *)req + SalRequestHeaderSize + req->keySize, req->valueSize});
void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf);
if (sendBuf != nullptr)
{
return;
}
char* sendBuffer = (char*)sendBuf;
memcpy(sendBuffer, &(req->id), sizeof(uint32_t));
if (s.ok())
memcpy(sendBuffer+4, &SUCCESS, sizeof(int));
else
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
}
#include "RdmaServerEndpointGroup.hpp"
int RdmaServerEndpointGroup::CONN_STATE_INITIALIZED = 0;
int RdmaServerEndpointGroup::CONN_STATE_BINDED = 3;
int RdmaServerEndpointGroup::CONN_STATE_CONNECTED = 4;
int RdmaServerEndpointGroup::CONN_STATE_CLOSED = 5;
RdmaServerEndpointGroup::RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, int recvMsgSize)
: _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize)
{
std::cout << "CMProcessor : Step 1 creating event channel" << std::endl;
_eventChannel = rdma_create_event_channel();
if (_eventChannel == NULL)
{
std::cout << "CMProcesor : error creating event channel";
}
_cm_id = NULL;
int ret = rdma_create_id(_eventChannel, &_cm_id, NULL, RDMA_PS_TCP);
if (ret == -1)
std::cout << "CMProcesor : rdma_create_id failed" << std::endl;
_qpSalEndpointMap = new std::unordered_map<uint32_t, RdmaSalEndpoint *>();
_salEps = new std::vector<RdmaSalEndpoint *>();
rocksdb::Options options;
options.create_if_missing = true;
// open a database with a name which corresponds to a file system directory
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &_db);
std::cout << "Rocks started" << std::endl;
if (!status.ok())
{
std::cout << status.ToString() << std::endl;
exit(1);
}
}
void RdmaServerEndpointGroup::createExecutor(int threadSize)
{
_executor = new Executor(threadSize,_qpSalEndpointMap);
}
void RdmaServerEndpointGroup::startCmProcessor(bool newThread)
{
if (newThread)
_cmEventThread = new std::thread(&RdmaServerEndpointGroup::processCmEvents, this);
else
processCmEvents();
}
void RdmaServerEndpointGroup::processCmEvents()
{
int ret;
struct rdma_cm_event *event;
std::cout << "CMProcessor : starting cm processing thread" << std::endl;
while (!_stop)
{
ret = rdma_get_cm_event(_eventChannel, &event);
if (ret)
{
std::cout << "CMProcesor : rdma_get_cm_event failed" << std::endl;
continue;
}
processCmEvent(event);
ret = rdma_ack_cm_event(event);
if (ret)
{
std::cout << "CMProcesor : rdma_ack_cm_event failed";
}
}
}
void RdmaServerEndpointGroup::bind(const char *ip, const char *port, int backlog)
{
int ret;
std::cout << "RdmaServerEndpointGroup : Step 2 bind_addr" << std::endl;
struct addrinfo *addr;
ret = getaddrinfo(ip, port, NULL, &addr);
if (ret)
{
std::cout << "RdmaServerEndpointGroup : get_addr_info failed" << std::endl;
}
ret = rdma_bind_addr(_cm_id, addr->ai_addr);
if (ret)
{
std::cout << "RdmaServerEndpointGroup : rdma_bind_addr failed" << std::endl;
return;
}
std::cout << "RdmaServerEndpointGroup : rdma_bind_addr successful\n";
ret = rdma_listen(_cm_id, backlog);
if (ret)
{
std::cout << "RdmaServerEndpointGroup : rdma_listen failed" << std::endl;
return;
}
}
struct ibv_cq *RdmaServerEndpointGroup::createSalCq(struct rdma_cm_id *id)
{
if (_salCqProcessor == NULL)
{
std::cout << "RdmaServerEndpointGroup : step 5 create salcq processor" << std::endl;
_salCqProcessor = new RdmaSalCqProcessor(_executor, _cm_id->verbs, _compQueueSize);
_salCqProcessor->start();
}
return _salCqProcessor->getCq();
}
void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event)
{
std::cout << "RdmaServerEndpointGroup : step 4 Got Connect Request Server Endpoint" << std::endl;
/*
* create and add to vectors for replication and invalidation processing
*/
//const char *connData = reinterpret_cast<const char *>(event->param.conn.private_data);
//std::cout << "sal" << std::endl;
RdmaSalEndpoint *endpoint = new RdmaSalEndpoint(event->id, createSalCq(event->id), _sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize, _db);
event->id->context = (void *)endpoint;
endpoint->createResources();
std::unique_lock lock(_salMutex);
_salEps->push_back(endpoint);
std::cout << "qp num" << event->id->qp->qp_num << " " << _qpSalEndpointMap->size();
_qpSalEndpointMap->emplace(event->id->qp->qp_num, endpoint);
std::cout << _qpSalEndpointMap->size() << std::endl;
rdma_accept(event->id, NULL);
}
void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{
std::cout << "RdmaServerEndpointGroup : event " << rdma_event_str(event->event) << " id " << event->id << " " << std::endl;
/*
* Connect request came on listener ie endpointgroup
* in this listen_id contains cm_id of listener and id contains the new id of connection request
*/
if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST && event->listen_id != NULL)
{
createEpCmEvent(event);
}
/*
* Event came for listener ie endpointgroup
*/
else if (event->id != NULL && _cm_id == event->id)
{
if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
std::cout << "RdmaServerEndpointGroup : Disconnect Server Endpoint" << std::endl;
close();
}
else
{
std::cout << "RdmaServerEndpointGroup : unknown Event for listener" << rdma_event_str(event->event) << std::endl;
}
}
/*
* Event came for a endpoint
*/
else if (event->id != NULL && event->id->context != NULL)
{
RdmaEndpoint *ep = ((RdmaEndpoint *)event->id->context);
ep->processCmEvent(event);
if (event->event == RDMA_CM_EVENT_DISCONNECTED)
{
std::unique_lock lock(_salMutex);
auto it = std::find(_salEps->begin(), _salEps->end(), (RdmaSalEndpoint *)ep);
if (it != _salEps->end())
{
_salEps->erase(it);
delete ((RdmaSalEndpoint *)event->id->context);
}
}
/*if (event->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
{
std::cout<<"des"<<std::endl;
rdma_destroy_id(event->id);
std::cout<<"des"<<std::endl;
}
*/
}
else
{
std::cout << "RdmaServerEndpointGroup : Not able to procces CM EVent" << rdma_event_str(event->event) << event->id << " " << event->listen_id << std::endl;
}
}
void RdmaServerEndpointGroup::clientClose()
{
_stop = true;
if (_cmEventThread != NULL)
_cmEventThread->join();
rdma_destroy_event_channel(_eventChannel);
_salCqProcessor->close();
delete _salCqProcessor;
/* for(int i = 0 ;i < _endPoints->size();i++)
{
_endPoints->at(i)->close();
}
delete _endPoints;
*/
rdma_disconnect(_cm_id);
rdma_destroy_id(_cm_id);
}
void RdmaServerEndpointGroup::close()
{
_stop = true;
if (_cmEventThread != NULL)
_cmEventThread->join();
rdma_destroy_event_channel(_eventChannel);
_salCqProcessor->close();
delete _salCqProcessor;
}
#include "TaskThread.hpp"
#include "MessageFormats.hpp"
TaskThread::TaskThread(int id, int cpu, ConcurrentQueue<struct ibv_wc *> *taskqueue, std::unordered_map<uint32_t, RdmaSalEndpoint *> *qpSalEndpointMap)
: _id(id), _qpSalEndpointMap(qpSalEndpointMap)
{
_taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this))
{
std::cout << "pthread create has been failed while creating taskthread " << std::endl;
exit(0);
}
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0)
{
std::cerr << "Error calling pthread_setaffinity_np: "
<< "\n";
}
}
TaskThread::TaskThread(int id, ConcurrentQueue<struct ibv_wc *> *taskqueue, std::unordered_map<uint32_t, RdmaSalEndpoint *> *qpSalEndpointMap)
: _id(id), _qpSalEndpointMap(qpSalEndpointMap)
{
_taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this))
{
std::cout << "pthread create has been failed while creating taskthread " << std::endl;
exit(0);
}
}
TaskThread::~TaskThread()
{
std::cout << "Task Destructed" << std::endl;
stop();
}
void TaskThread::stop()
{
_stop = true;
if (pthread_join(thread, NULL) == 0)
{
std::cout << "pthread join failed" << std::endl;
}
}
inline void *TaskThread::run(void *object)
{
TaskThread *thread = reinterpret_cast<TaskThread *>(object);
std::cout << "running task thread" << thread->_id << std::endl;
while (!thread->_stop)
{
struct ibv_wc *data = NULL;
thread->_taskQueue->wait_and_pop(data);
thread->processEvent(data);
}
return NULL;
}
void TaskThread::processEvent(struct ibv_wc *data)
{
if (data == NULL || data->status != IBV_WC_SUCCESS)
{
std::cout << "TaskThread : failed work completion : ";
std::cout << ibv_wc_status_str(data->status) << " on qp " << data->qp_num << std::endl;
return;
}
/*
* Process Request from client
*/
auto it = _qpSalEndpointMap->find(data->qp_num);
if (it == _qpSalEndpointMap->end())
{
std::cout << data->qp_num << "RdmaSal : endpoint not registered for qp num\n";
return;
}
switch (data->opcode)
{
case IBV_WC_SEND:
it->second->processSendCompletion(data);
break;
case IBV_WC_RECV:
{
// it->second->processRecvCompletion(data);
char *buffer = new char[data->byte_len];
memcpy(buffer, (void *)data->wr_id, data->byte_len);
rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
it->second->_recvMsgSize, it->second->_recvMr);
struct SalRequest *req = (struct SalRequest *)buffer;
std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize;
std::cout << " " << req->type << "size" << data->byte_len << "\n";
switch (req->type)
{
case RequestType::GET:
it->second->processGet(req);
break;
case RequestType::DELETE:
std::cout<<"TaskThread:: incorrect delete request from client to follower"<<std::endl;
break;
case RequestType::PUT:
std::cout<<"TaskThread:: incorrect put request from client to follower"<<std::endl;
break;
default:
std::cout << "SalRequest invalid req type";
break;
}
delete[] buffer;
}
break;
case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n";
break;
case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n";
break;
default:
std::cout << "TaskThread default opcode : " << data->opcode << std::endl;
break;
}
}
\ No newline at end of file
#include <iostream>
#include "RdmaServerEndpointGroup.hpp"
#include "Executor.hpp"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
int main()
{
RdmaServerEndpointGroup *group = new RdmaServerEndpointGroup(5, 5, 5, 50, 50);
group->createExecutor(4);
group->bind("192.168.200.20", "1921", 2);
group->startCmProcessor(false);
std::cout << "rhdhj" << std::endl;
while (1)
;
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment