Commit be7dea1a authored by Paras Garg's avatar Paras Garg

Added common cqevents list to wait for replication

parent 9454fb43
......@@ -5,12 +5,14 @@
#include "RdmaEndpointGroup.hpp"
#include "TaskThread.hpp"
#include "Logger.hpp"
#include "CqEventData.hpp"
class Executor
{
int _size{0};
std::vector<TaskThread *> _taskThreads{NULL};
ConcurrentQueue *_taskQueue{NULL};
std::unordered_map<uint32_t,std::shared_ptr<CQEventData>> cqEvents;
public:
Executor(int size);
......
......@@ -4,7 +4,7 @@
#include <iostream>
#include <queue>
#include <pthread.h>
#include<memory>
#include <memory>
#include "Runnable.hpp"
#include "CqEventData.hpp"
......@@ -13,7 +13,6 @@
#include "ConcurrentQueue.hpp"
#include "RdmaRepEndpoint.hpp"
#include "RdmaSalEndpoint.hpp"
#include "CqEventData.hpp"
class TaskThread
{
......@@ -26,20 +25,22 @@ private:
std::unordered_map<uint32_t, RdmaRepEndpoint *> *_clientRepMap;
std::unordered_map<uint32_t, RdmaRepEndpoint *> *_serverRepMap;
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_salMap;
std::unordered_map<uint32_t,std::shared_ptr<CQEventData>> cqEvents;
std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &_cqEvents;
public:
TaskThread(int id, int cpu, ConcurrentQueue *,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap,
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap);
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap,
std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &cqEvents);
void replicateSalRequest(char *salRequest, uint32_t size);
void sendInvalidation(char *salRequest);
static void *run(void *object);
void stop();
void processEvent(RdmaSalEndpoint* ep,struct ibv_wc *data);
void processRepEvent(RdmaRepEndpoint* ep,struct ibv_wc *data);
void processReplicationDone(uint32_t reqid);
void processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data);
void processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data);
void processReplicationDone(uint32_t reqid, MessageType type, bool localWrite);
~TaskThread();
};
......
......@@ -12,7 +12,7 @@ void Executor::createThreads(std::unordered_map<uint32_t, RdmaRepEndpoint *> *cl
{
for (int i = 0; i < _size; i++)
{
TaskThread *thread = new TaskThread(i, i, _taskQueue, clientRepMap,serverRepMap, salMap);
TaskThread *thread = new TaskThread(i, i, _taskQueue, clientRepMap,serverRepMap, salMap,cqEvents);
_taskThreads.push_back(thread);
}
}
......
......@@ -3,8 +3,9 @@
TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap,
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap)
: _id(id), _clientRepMap(clientRepMap), _serverRepMap(serverRepMap), _salMap(salMap)
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap,
std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &cqEvents)
: _id(id), _clientRepMap(clientRepMap), _serverRepMap(serverRepMap), _salMap(salMap), _cqEvents(cqEvents)
{
_taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this))
......@@ -105,7 +106,7 @@ inline void *TaskThread::run(void *object)
void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
{
std::cout << "processing sal event"<<data->opcode<<"\n";
std::cout << "processing sal event" << data->opcode << "\n";
/* sal Request*/
switch (data->opcode)
{
......@@ -120,7 +121,7 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
rdma_post_recv(ep->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
ep->_recvMsgSize, ep->_recvMr);
struct MessageHeader *req = (struct MessageHeader *)buffer;
std::cout<<"recv"<<req->type<<"\n";
std::cout << "recv" << req->type << "\n";
std::shared_ptr<CQEventData> cqevent = std::make_shared<CQEventData>(_serverRepMap->size() + _clientRepMap->size(), ep);
uint32_t id = req->id;
/*
......@@ -133,15 +134,22 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
ep->processGet(req);
break;
case MessageType::DELETE:
cqEvents.emplace(id, cqevent);
_cqEvents[id] = cqevent;
replicateSalRequest(buffer, data->byte_len);
sendInvalidation(buffer);
s = ep->_db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize});
processReplicationDone(req->id, MessageType::FAILURE, true);
break;
case MessageType::PUT:
cqEvents.emplace(id, cqevent);
std::cout << "id " << id << " " << _cqEvents.size() << '\n';
_cqEvents[id] = cqevent;
std::cout << "size " << _cqEvents.size() << "\n";
replicateSalRequest(buffer, data->byte_len);
sendInvalidation(buffer);
s = ep->_db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize},
{(char *)req + MessageHeaderSize + req->keySize, req->valueSize});;
{(char *)req + MessageHeaderSize + req->keySize, req->valueSize});
;
processReplicationDone(req->id, MessageType::FAILURE, true);
break;
default:
std::ostringstream ss;
......@@ -158,11 +166,13 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
CPPLog::LOG_INFO(ss);
break;
}
std::cout << "size "
<< " " << _cqEvents.size() << '\n';
}
void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
{
std::cout << "processing rep event"<<data->opcode<<"\n";
std::cout << "processing rep event" << data->opcode << "\n";
switch (data->opcode)
{
case IBV_WC_SEND:
......@@ -176,7 +186,7 @@ void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
rdma_post_recv(ep->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
ep->_recvMsgSize, ep->_recvMr);
struct MessageHeader *req = (struct MessageHeader *)buffer;
std::cout<<"recv"<<req->type<<"\n";
std::cout << "recv" << req->type << "\n";
switch (req->type)
{
case MessageType::GET:
......@@ -189,10 +199,10 @@ void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
ep->processPut(req);
break;
case MessageType::SUCCESS:
processReplicationDone(req->id);
processReplicationDone(req->id, MessageType::FAILURE, false);
break;
case MessageType::FAILURE:
processReplicationDone(req->id);
processReplicationDone(req->id, MessageType::FAILURE, false);
break;
default:
std::ostringstream ss;
......@@ -211,20 +221,26 @@ void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
}
}
void TaskThread::processReplicationDone(uint32_t id)
void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool localWrite)
{
auto it = cqEvents.find(id);
if(it == cqEvents.end())
auto it = _cqEvents.find(id);
// std::cout<<"size " <<_cqEvents.size()<<"\n";
if (it == _cqEvents.end())
{
CPPLog::LOG_ERROR("Cqevent not found");
std::ostringstream ss;
ss << "Cqevent not found " << id;
CPPLog::LOG_ERROR(ss);
return;
}
std::cout<<"rep done\n";
++(it->second->_repDone);
if(it->second->_repDone < it->second->_repRequired)
// std::cout<<"rep done\n";
if (!localWrite)
++(it->second->_repDone);
if (it->second->_repDone < it->second->_repRequired)
{
// std::cout<<"event 2\n";
return;
RdmaSalEndpoint* ep = it->second->_ep;
}
RdmaSalEndpoint *ep = it->second->_ep;
char *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(ep->_sendBuffersM);
if (ep->_sendBuffers.size() == 0)
......@@ -236,11 +252,8 @@ void TaskThread::processReplicationDone(uint32_t id)
ep->_sendBuffers.pop();
lock.unlock();
MessageHeader *response = (MessageHeader *)sendBuf;
/*This id done to avoid else case*/
//response->type = MessageType::FAILURE;
//response->id = id;
//if (s.ok())
response->type = MessageType::SUCCESS;
response->id = id;
response->type = type;
int ret = rdma_post_send(ep->_cm_id, sendBuf, sendBuf, MessageHeaderSize, ep->_sendMr, 0);
if (ret == -1)
{
......@@ -248,7 +261,6 @@ void TaskThread::processReplicationDone(uint32_t id)
ss << "rdma_post_send error occured errno " << errno << strerror(errno);
CPPLog::LOG_ERROR(ss);
}
}
void TaskThread::replicateSalRequest(char *req, uint32_t size)
......@@ -266,6 +278,9 @@ void TaskThread::replicateSalRequest(char *req, uint32_t size)
serverRepIt->second->sendMessage(req, size);
}
}
void TaskThread::sendInvalidation(char *req)
{
MessageHeader *salReq = (MessageHeader *)req;
char *buffer = new char[MessageHeaderSize + salReq->keySize];
MessageHeader *invRequest = (MessageHeader *)(buffer);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment