Commit 36aa2047 authored by Paras Garg's avatar Paras Garg

Added intial client code

parent 33075e8b
.build
.build/*
client
.vscode/*
\ No newline at end of file
SRCS := $(shell ls src/)
SRC_DIR := src
OBJS := $(SRCS:.cpp=.o)
BUILD_DIR := .build
HEADERS := $(shell find -name '*.hpp')
OBJS := $(addprefix $(BUILD_DIR)/, $(OBJS))
CXX = g++
CXXFLAGS += -g -O3 -Wall -I header
LIBS += -libverbs
LIBS += -lrdmacm
LIBS += -pthread
LIBS += -lrocksdb
Target := client
.phony = clean
$(BUILD_DIR)/%.o: $(SRC_DIR)/%.cpp
$(CXX) -o $@ $(CXXFLAGS) -c $< $(LIBS)
$(Target) : $(OBJS) | $(BUILD_DIR)
$(CXX) -o $@ $^ $(LIBS)
$(BUILD_DIR) :
mkdir -p $@
\ No newline at end of file
#include <iostream>
class Buffer
{
public:
void *buffer;
int size;
void *get();
Buffer();
Buffer(void *buffer, int size);
};
\ No newline at end of file
#ifndef __MessageFormats__
#define __MessageFormats__
enum RequestType
{
GET,
PUT,
DELETE,
INVALIDATE
};
struct __attribute__ ((__packed__)) SalRequestHeader
{
uint32_t id;
enum RequestType type;
uint32_t keySize;
uint32_t valueSize;
};
struct __attribute__ ((__packed__)) SalResponseHeader
{
uint32_t id;
enum RequestType type;
uint32_t size;
};
struct __attribute__ ((__packed__)) InvRequestHeader
{
//private:
uint32_t id;
enum RequestType type;
//public:
uint32_t keySize;
};
static uint32_t SUCCESS = 0;
static uint32_t FAILURE = 1;
static int32_t SalRequestHeaderSize = sizeof(SalRequestHeader);
static int32_t SalResponseSize = sizeof(SalResponseHeader);
static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader);
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <netdb.h>
#include <iostream>
#include <errno.h>
#include <arpa/inet.h>
#ifndef __RDMACLIENTENDPOINT__
#define __RDMACLIENTENDPOINT__
#include "Buffer.hpp"
#include "RdmaEndpointGroup.hpp"
#include "MessageFormats.hpp"
#include <boost/lockfree/queue.hpp>
class RdmaClientEndpoint
{
static int CONN_STATE_INITIALIZED;
static int CONN_STATE_ADDR_RESOLVED;
static int CONN_STATE_ROUTE_RESOLVED;
static int CONN_STATE_RESOURCES_ALLOCATED;
static int CONN_STATE_CONNECTED;
static int CONN_STATE_PARTIAL_CLOSED;
static int CONN_STATE_CLOSED;
struct rdma_cm_id *_cm_id = NULL;
struct ibv_pd *_protectionDomain;
RdmaEndpointGroup *_group;
int _sendQueueSize;
int _recvQueueSize;
int _sendMsgSize;
int _recvMsgSize;
int _state;
int _timeoutMs;
int _maxInLine;
const char *_connData;
void *_sendBuff = NULL;
void *_recvBuff = NULL;
struct ibv_mr *_sendMr = NULL;
struct ibv_mr *_recvMr = NULL;
boost::lockfree::queue<void *> *_sendBuffers;
void completeClose();
void connect();
void registerMemory();
void createResources();
public:
std::atomic<uint64_t> _requestId{12};
RdmaClientEndpoint(struct rdma_cm_id *id, RdmaEndpointGroup *group, int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, int maxInLine, int timeout);
void connect(const char *ip, const char *port, const char *connData);
bool isConnected();
void processCmEvent(struct rdma_cm_event *event);
void close();
int sendMessage(const char *buffer, int size);
void processSendComp(struct ibv_wc);
void processRecvComp(struct ibv_wc);
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <errno.h>
#include <unordered_map>
#ifndef __RDMACLIENTENDPOINTGROUP__
#define __RDMACLIENTENDPOINTGROUP__
#include "RdmaEndpointGroup.hpp"
#include "RdmaCqProcessor.hpp"
#include "RdmaCmProcessor.hpp"
#include "RdmaClientEndpoint.hpp"
class RdmaClientEndpointGroup : public RdmaEndpointGroup
{
RdmaCmProcessor *_cmProcessor = NULL;
RdmaCqProcessor *_cqProcessor = NULL;
//struct rdma_cm_id *_cm_id;
int _sendQueueSize;
int _recvQueueSize;
int _compQueueSize;
int _sendMsgSize;
int _recvMsgSize;
int _timeoutMs;
int _maxInLine;
public:
RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
int recvMsgSize,int maxInLine, int timeout);
void processCmEvent(struct rdma_cm_event *event);
struct ibv_cq *createCq(struct rdma_cm_id *id);
RdmaClientEndpoint *createEndpoint();
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <iostream>
#ifndef __RDMACMPROCESSOR__
#define __RDMACMPROCESSOR__
#include "RdmaEndpointGroup.hpp"
#include <unordered_map>
class RdmaCmProcessor
{
struct rdma_event_channel *_eventChannel;
std::thread *_cmEventThread;
RdmaEndpointGroup *_endpointGroup;
bool _stop;
public:
RdmaCmProcessor(RdmaEndpointGroup *group);
struct rdma_event_channel *getEventChannel();
struct rdma_cm_id *createId();
void close();
void start();
void processCmEvent();
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
#include <thread>
#include <unordered_map>
#ifndef __RDMACQPROCESSOR__
#define __RDMACQPROCESSOR__
#include "RdmaClientEndpoint.hpp"
class RdmaCqProcessor
{
public:
struct ibv_comp_channel *_compChannel;
struct ibv_cq *_completionQueue;
std::thread *_compQueueThread;
std::unordered_map<uint32_t, RdmaClientEndpoint *> *_qpEndpointMap{NULL};
RdmaCqProcessor(ibv_context *verbs, int compQueueSize);
struct ibv_cq *getCq();
void start();
void processCQEvents();
void dispatchCqEvents(ibv_wc *wc_array, int size);
void close();
void registerEp(uint64_t qpum, RdmaClientEndpoint* ep);
};
#endif
// https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/
// https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html
// https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.html
\ No newline at end of file
#ifndef __RDMAENDPOINT__
#define __RDMAENDPOINT__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
class RdmaEndpoint
{
public :
RdmaEndpoint()
{}
virtual void processCqEvent(struct ibv_wc wc) = 0;
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <map>
#ifndef __RDMAENDPOINTGROUP__
#define __RDMAENDPOINTGROUP__
class RdmaEndpointGroup
{
public:
virtual void processCmEvent(struct rdma_cm_event *event) = 0;
virtual struct ibv_cq *createCq(struct rdma_cm_id *) = 0;
};
#endif
\ No newline at end of file
#ifndef __RdmaFuture__
#define __RdmaFuture__
#include <stdint.h>
class Future
{
public:
uint8_t status;
char* value;
};
#endif
\ No newline at end of file
#include <iostream>
class Buffer
{
public:
void *buffer;
int size;
void *get()
{
return buffer;
}
Buffer()
: buffer(NULL), size(0)
{
}
Buffer(void *buffer, int size)
: buffer(buffer), size(size)
{
}
};
\ No newline at end of file
#include <iostream>
#include "RdmaClientEndpointGroup.hpp"
#include "MessageFormats.hpp"
int main()
{
RdmaClientEndpointGroup *group = new RdmaClientEndpointGroup(5, 5, 5, 50, 50, 0, 1000);
RdmaClientEndpoint *clientEp = group->createEndpoint();
clientEp->connect("192.168.200.20", "1921", "sal");
while (!clientEp->isConnected());
std::cout << "client : connected" << std::endl;
char *message = new char[30];
struct SalRequestHeader *request = (struct SalRequestHeader *)message;
request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed);
request->type = RequestType::DELETE;
request->keySize = 14;
memcpy((char *)request + SalRequestHeaderSize, "key1sendkey1s", 13);
((char*)request)[SalRequestHeaderSize+14] = '\0';
std::cout << "send Request for atomic int " << request->id << "\n";
// memcpy(re->value,"aa",2);
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
// std::cout << "send" << clientEp->sendMessage((char*)re, SalRequestHeaderSize + 2) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
int send = 0;
while (1)
{
std::cin >> send;
if (send == 1)
{
std::cout << "send" << clientEp->sendMessage(message, 10) << std::endl;
}
}
}
\ No newline at end of file
#include "RdmaClientEndpoint.hpp"
int RdmaClientEndpoint::CONN_STATE_INITIALIZED = 0;
int RdmaClientEndpoint::CONN_STATE_ADDR_RESOLVED = 1;
int RdmaClientEndpoint::CONN_STATE_ROUTE_RESOLVED = 2;
int RdmaClientEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 3;
int RdmaClientEndpoint::CONN_STATE_CONNECTED = 4;
int RdmaClientEndpoint::CONN_STATE_PARTIAL_CLOSED = 5;
int RdmaClientEndpoint::CONN_STATE_CLOSED = 6;
RdmaClientEndpoint::RdmaClientEndpoint(struct rdma_cm_id *id, RdmaEndpointGroup *group, int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, int maxInLine, int timeout)
: _cm_id(id), _group(group), _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine), _timeoutMs(timeout)
{
_state = CONN_STATE_INITIALIZED;
_sendBuffers = new boost::lockfree::queue<void*>(_sendQueueSize);
}
void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *connData)
{
_connData = connData;
if (_state != CONN_STATE_INITIALIZED)
{
std::cout << "RdmaClientEndpoint : connect state not initialized" << std::endl;
return;
}
int ret;
std::cout << "RdmaClientEndpoint : step2 getaddrinfo" << std::endl;
struct addrinfo *addr;
ret = getaddrinfo(ip, port, NULL, &addr);
if (ret)
{
std::cout << "RdmaServerEndpointGroup : get_addr_info failed" << std::endl;
}
std::cout << "RdmaClientEndpoint : step2 resolve addr" << std::endl;
ret = rdma_resolve_addr(_cm_id, NULL, addr->ai_addr, _timeoutMs);
if (ret)
{
std::cout << "unable to resolve addr" << std::endl;
return;
}
std::cout << "RdmaClientEndpoint : step2 resolve addr resolved" << std::endl;
_state = CONN_STATE_ADDR_RESOLVED;
}
bool RdmaClientEndpoint::isConnected()
{
return _state == CONN_STATE_CONNECTED;
}
void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
{
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL)
{
std::cout << "RdmaClientEndpoint : step3 resolve_route" << std::endl;
createResources();
rdma_resolve_route(_cm_id, _timeoutMs);
}
else if (event->event == RDMA_CM_EVENT_ROUTE_RESOLVED && event->id != NULL)
{
registerMemory();
std::cout << "RdmaClientEndpoint : step5 connect" << std::endl;
connect();
}
else if (event->id != NULL && event->event == RDMA_CM_EVENT_ESTABLISHED)
{
std::cout << "RdmaClientEndpoint : step6 Connected" << std::endl;
_state = CONN_STATE_CONNECTED;
}
else if (event->id != NULL && event->event == RDMA_CM_EVENT_DISCONNECTED)
{
std::cout << "RdmaClientEndpoint : step7 Closed" << std::endl;
completeClose();
}
else
{
std::cout << "RdmaClientEndpoint : Not able to procces CM EVent" << rdma_event_str(event->event) << event->id << " " << event->listen_id << std::endl;
}
}
void RdmaClientEndpoint::close()
{
if (_state != CONN_STATE_CONNECTED)
{
std::cout << "RdmaClientEndpoint : close invalid state" << std::endl;
return;
}
_state = CONN_STATE_PARTIAL_CLOSED;
int ret = rdma_disconnect(_cm_id);
if (ret)
{
std::cout << "RdmaClientEndpoint : rdma_disconnect failed" << std::endl;
}
}
void RdmaClientEndpoint::completeClose()
{
if (_state != CONN_STATE_PARTIAL_CLOSED)
{
std::cout << "RdmaClientEndpoint : completeClose invalid state" << std::endl;
return;
}
_state = CONN_STATE_CLOSED;
delete _sendBuffers;
free(_sendBuff);
free(_recvBuff);
rdma_dereg_mr(_sendMr);
rdma_dereg_mr(_recvMr);
rdma_destroy_qp(_cm_id);
rdma_destroy_id(_cm_id);
}
void RdmaClientEndpoint::connect()
{
if (_connData != NULL)
{
struct rdma_conn_param conn_param;
memset(&conn_param, 0, sizeof(conn_param));
conn_param.responder_resources = 1;
conn_param.initiator_depth = 1;
conn_param.retry_count = 7;
conn_param.rnr_retry_count = 7;
conn_param.private_data = _connData;
conn_param.private_data_len = strlen(_connData);
rdma_connect(_cm_id, &conn_param);
}
else
{
rdma_connect(_cm_id, NULL);
}
}
void RdmaClientEndpoint::registerMemory()
{
if (_state != CONN_STATE_ROUTE_RESOLVED)
{
std::cout << "RdmaClientEndpoint : createResource address not resolved" << std::endl;
return;
}
_sendBuff = malloc(_sendMsgSize * _sendQueueSize);
if (_sendBuff == NULL)
{
std::cout << "RdmaClientEndpoint : sendBuff malloc failed" << std::endl;
return;
}
_sendMr = rdma_reg_msgs(_cm_id, _sendBuff, _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL)
{
std::cout << "RdmaClientEndpoint : sendMr reg failed" << std::endl;
return;
}
_recvBuff = malloc(_recvMsgSize * _recvQueueSize);
if (_recvBuff == NULL)
{
std::cout << "RdmaClientEndpoint : recvBuff malloc failed" << std::endl;
return;
}
_recvMr = rdma_reg_msgs(_cm_id, _recvBuff, _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL)
{
std::cout << "RdmaClientEndpoint : recvMr reg failed" << std::endl;
return;
}
char *buffer = (char *)_recvBuff;
for (int i = 0; i < _recvQueueSize; i++)
{
void* const location = buffer + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr);
}
buffer = (char *)_sendBuff;
for (int i = 0; i < _sendQueueSize; i++)
{
void* const location = buffer + i * _sendMsgSize;
_sendBuffers->push(location);
}
_state = CONN_STATE_RESOURCES_ALLOCATED;
}
void RdmaClientEndpoint::createResources()
{
if (_state != CONN_STATE_ADDR_RESOLVED)
{
std::cout << "RdmaClientEndpoint : createResource address not resolved" << std::endl;
return;
}
_protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL)
{
std::cout << "RdmaClientEndpoint : ibv_alloc_pd failed " << std::endl;
return;
}
struct ibv_cq *completionQueue = _group->createCq(_cm_id);
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
//This is used to set endpoint address with qp
qp_init_attr.qp_context = (void *)this;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr.sq_sig_all = 1;
// completion queue can be shared or you can use distinct completion queues.
qp_init_attr.send_cq = completionQueue;
qp_init_attr.recv_cq = completionQueue;
qp_init_attr.qp_type = IBV_QPT_RC;
// increase if you want to keep more send work requests in the SQ.
qp_init_attr.cap.max_send_wr = _sendQueueSize;
// increase if you want to keep more receive work requests in the RQ.
qp_init_attr.cap.max_recv_wr = _recvQueueSize;
// increase if you allow send work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_send_sge = 1;
// increase if you allow receive work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.cap.max_inline_data = _maxInLine;
//_queuePair = ibv_create_qp(_protectionDomain, &qp_init_attr);
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret)
{
std::cout << "RdmaClientEndpoint : ibv_create_cq failed\n";
}
if (_cm_id->pd == NULL)
{
std::cout << "RdmaClientEndpoint : pd not set" << std::endl;
_cm_id->pd = _protectionDomain;
}
_state = CONN_STATE_ROUTE_RESOLVED;
}
int RdmaClientEndpoint::sendMessage(const char *buffer, int size)
{
if (size > _sendMsgSize)
return -1;
void* sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr)
return -1;
memcpy(sendBuffer, buffer, size);
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0);
}
void RdmaClientEndpoint::processSendComp(struct ibv_wc wc)
{
_sendBuffers->push((void *)wc.wr_id);
}
void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc)
{
struct SalResponseHeader* response = (struct SalResponseHeader*)wc.wr_id;
std::cout<<"Recieve response for id "<<response->id<<" size "<<response->size <<" type "<<response->type<<"\n";
rdma_post_recv(_cm_id, (void *)wc.wr_id, (void *)wc.wr_id, _recvMsgSize, _recvMr);
}
\ No newline at end of file
#include "RdmaClientEndpointGroup.hpp"
RdmaClientEndpointGroup::RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
int recvMsgSize,int maxInLine, int timeout)
: _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine),_timeoutMs(timeout)
{
_cmProcessor = new RdmaCmProcessor(this);
_cmProcessor->start();
}
void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{
std::cout << "RdmaClientEndpointGroup : event" << rdma_event_str(event->event) << std::endl;
if (event->id != NULL && event->id->context != NULL)
{
((RdmaClientEndpoint *)event->id->context)->processCmEvent(event);
if(event->event == RDMA_CM_EVENT_ADDR_RESOLVED)
{
_cqProcessor->registerEp(event->id->qp->qp_num,((RdmaClientEndpoint *)event->id->context));
}
}
else
{
std::cout << "RdmaClientEndpointGroup : Not able to procces CM EVent";
std::cout << rdma_event_str(event->event) << event->id << " ";
std::cout << event->listen_id << std::endl;
}
}
RdmaClientEndpoint *RdmaClientEndpointGroup::createEndpoint()
{
struct rdma_cm_id *id = _cmProcessor->createId();
RdmaClientEndpoint *endpoint = new RdmaClientEndpoint(id, this,
_sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize,_maxInLine, _timeoutMs);
id->context = (void *)endpoint;
return endpoint;
}
struct ibv_cq *RdmaClientEndpointGroup::createCq(struct rdma_cm_id *id)
{
if (_cqProcessor == NULL)
{
std::cout << "RdmaClientEndpointGroup : Creating CQ processor" << std::endl;
_cqProcessor = new RdmaCqProcessor(id->verbs, _compQueueSize);
_cqProcessor->start();
}
return _cqProcessor->getCq();
}
\ No newline at end of file
#include "RdmaCmProcessor.hpp"
#include <iostream>
RdmaCmProcessor::RdmaCmProcessor(RdmaEndpointGroup *group)
: _endpointGroup(group)
{
std::cout << "CMProcessor : Step 1 creating event channel" << std::endl;
_eventChannel = rdma_create_event_channel();
_stop = false;
if (_eventChannel == NULL)
{
std::cout << "CMProcesor : error creating event channel";
}
}
struct rdma_event_channel *RdmaCmProcessor::getEventChannel()
{
return _eventChannel;
}
struct rdma_cm_id *RdmaCmProcessor::createId()
{
struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
if (ret == -1)
std::cout << "CMProcesor : rdma_create_id failed" << std::endl;
return id;
}
void RdmaCmProcessor::start()
{
_cmEventThread = new std::thread(&RdmaCmProcessor::processCmEvent, this);
pthread_setname_np(_cmEventThread->native_handle(),"CMProcessor");
}
void RdmaCmProcessor::processCmEvent()
{
int ret;
struct rdma_cm_event *event;
std::cout << "CMProcessor : starting cm processing thread" << std::endl;
while (!_stop)
{
ret = rdma_get_cm_event(_eventChannel, &event);
if (ret)
{
std::cout << "CMProcesor : rdma_get_cm_event failed" << std::endl;
continue;
}
_endpointGroup->processCmEvent(event);
ret = rdma_ack_cm_event(event);
if (ret)
{
std::cout << "CMProcesor : rdma_ack_cm_event failed";
}
}
}
void RdmaCmProcessor::close()
{
_stop = true;
_cmEventThread->join();
rdma_destroy_event_channel(_eventChannel);
}
\ No newline at end of file
#include "RdmaCqProcessor.hpp"
RdmaCqProcessor::RdmaCqProcessor(ibv_context *verbs, int compQueueSize)
{
//_qpEndpointMap = new std::unordered_map<>();
_qpEndpointMap = new std::unordered_map<uint32_t, RdmaClientEndpoint *>();
_compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL)
{
std::cout << "CqProcessr : ibv_create_comp_channel failed\n";
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl;
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
}
}
struct ibv_cq *RdmaCqProcessor::getCq()
{
return _completionQueue;
}
void RdmaCqProcessor::registerEp(uint64_t qp,RdmaClientEndpoint* ep)
{
_qpEndpointMap->emplace(qp,ep);
}
void RdmaCqProcessor::start()
{
std::cout << "CqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(),"compQueueThread");
}
void RdmaCqProcessor::processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc wc_array[nevent];
while (1)
{
ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1)
{
std::cout << "CqProcessr : ibv_get_cq_event failed\n";
close();
}
ibv_ack_cq_events(cq, 1);
ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
close();
}
ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0)
{
std::cout << "CqProcessr : ibv_poll_cq failed\n";
close();
}
if (ret == 0)
continue;
dispatchCqEvents(wc_array, ret);
}
}
inline void RdmaCqProcessor::dispatchCqEvents(ibv_wc wc[], int size)
{
for (int i = 0; i < size; i++)
{
if (wc[i].status != IBV_WC_SUCCESS)
{
std::cout << "RdmaCqProcessor : failed work completion : " << ibv_wc_status_str(wc[i].status) << " on qp " << wc[i].qp_num << std::endl;
return;
}
auto it = _qpEndpointMap->find(wc[i].qp_num);
if (it == _qpEndpointMap->end())
{
std::cout << "RdmaCqProcessor : endpoint not registered for qp num" << std::endl;
return;
}
switch (wc[i].opcode)
{
case IBV_WC_SEND:
it->second->processSendComp(wc[i]);
break;
case IBV_WC_RECV:
it->second->processRecvComp(wc[i]);
break;
case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n";
break;
case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n";
break;
default:
std::cout << "RdmaCqProcessor : invalid opcode" << std::endl;
break;
}
}
}
void RdmaCqProcessor::close()
{
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment