Commit 0da2fc70 authored by Paras Garg's avatar Paras Garg

Added common cqevents list to wait for replication

parent be7dea1a
...@@ -11,7 +11,7 @@ class CQEventData ...@@ -11,7 +11,7 @@ class CQEventData
CQEventData(int repRequired,RdmaSalEndpoint* ep) CQEventData(int repRequired,RdmaSalEndpoint* ep)
: _repRequired(repRequired),_ep(ep) : _repRequired(repRequired),_ep(ep)
{ {
std::cout<<"created\n"; std::cout<<"created "<<_repRequired<<"\n";
} }
~CQEventData() ~CQEventData()
{ {
......
...@@ -13,6 +13,7 @@ class Executor ...@@ -13,6 +13,7 @@ class Executor
std::vector<TaskThread *> _taskThreads{NULL}; std::vector<TaskThread *> _taskThreads{NULL};
ConcurrentQueue *_taskQueue{NULL}; ConcurrentQueue *_taskQueue{NULL};
std::unordered_map<uint32_t,std::shared_ptr<CQEventData>> cqEvents; std::unordered_map<uint32_t,std::shared_ptr<CQEventData>> cqEvents;
std::mutex _cqEventsMutex;
public: public:
Executor(int size); Executor(int size);
......
...@@ -26,19 +26,21 @@ private: ...@@ -26,19 +26,21 @@ private:
std::unordered_map<uint32_t, RdmaRepEndpoint *> *_serverRepMap; std::unordered_map<uint32_t, RdmaRepEndpoint *> *_serverRepMap;
std::unordered_map<uint32_t, RdmaSalEndpoint *> *_salMap; std::unordered_map<uint32_t, RdmaSalEndpoint *> *_salMap;
std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &_cqEvents; std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &_cqEvents;
std::mutex &_cqEventsMutex;
public: public:
TaskThread(int id, int cpu, ConcurrentQueue *, TaskThread(int id, int cpu, ConcurrentQueue *,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap, std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap, std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap,
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap, std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap,
std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &cqEvents); std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &cqEvents,
std::mutex &_cqEventsMutex);
void replicateSalRequest(char *salRequest, uint32_t size); void replicateSalRequest(char *salRequest, uint32_t size);
void sendInvalidation(char *salRequest); void sendInvalidation(char *salRequest);
static void *run(void *object); static void *run(void *object);
void stop(); void stop();
void processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data); void processSalEvent(RdmaSalEndpoint *ep, struct ibv_wc *data);
void processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data); void processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data);
void processReplicationDone(uint32_t reqid, MessageType type, bool localWrite); void processReplicationDone(uint32_t reqid, MessageType type, bool localWrite);
~TaskThread(); ~TaskThread();
......
...@@ -12,7 +12,7 @@ void Executor::createThreads(std::unordered_map<uint32_t, RdmaRepEndpoint *> *cl ...@@ -12,7 +12,7 @@ void Executor::createThreads(std::unordered_map<uint32_t, RdmaRepEndpoint *> *cl
{ {
for (int i = 0; i < _size; i++) for (int i = 0; i < _size; i++)
{ {
TaskThread *thread = new TaskThread(i, i, _taskQueue, clientRepMap,serverRepMap, salMap,cqEvents); TaskThread *thread = new TaskThread(i, i, _taskQueue, clientRepMap,serverRepMap, salMap,cqEvents,_cqEventsMutex);
_taskThreads.push_back(thread); _taskThreads.push_back(thread);
} }
} }
......
...@@ -4,8 +4,9 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, ...@@ -4,8 +4,9 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap, std::unordered_map<uint32_t, RdmaRepEndpoint *> *clientRepMap,
std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap, std::unordered_map<uint32_t, RdmaRepEndpoint *> *serverRepMap,
std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap, std::unordered_map<uint32_t, RdmaSalEndpoint *> *salMap,
std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &cqEvents) std::unordered_map<uint32_t, std::shared_ptr<CQEventData>> &cqEvents,
: _id(id), _clientRepMap(clientRepMap), _serverRepMap(serverRepMap), _salMap(salMap), _cqEvents(cqEvents) std::mutex &cqEventsMutex)
: _id(id), _clientRepMap(clientRepMap), _serverRepMap(serverRepMap), _salMap(salMap), _cqEvents(cqEvents), _cqEventsMutex(cqEventsMutex)
{ {
_taskQueue = taskqueue; _taskQueue = taskqueue;
if (pthread_create(&thread, NULL, &TaskThread::run, this)) if (pthread_create(&thread, NULL, &TaskThread::run, this))
...@@ -94,7 +95,7 @@ inline void *TaskThread::run(void *object) ...@@ -94,7 +95,7 @@ inline void *TaskThread::run(void *object)
continue; continue;
} }
else if (salEp != nullptr) else if (salEp != nullptr)
thread->processEvent(salEp, data); thread->processSalEvent(salEp, data);
else if (repEp != nullptr) else if (repEp != nullptr)
thread->processRepEvent(repEp, data); thread->processRepEvent(repEp, data);
thread->_taskQueue->removeFromSet(data); thread->_taskQueue->removeFromSet(data);
...@@ -104,7 +105,7 @@ inline void *TaskThread::run(void *object) ...@@ -104,7 +105,7 @@ inline void *TaskThread::run(void *object)
return NULL; return NULL;
} }
void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data) void TaskThread::processSalEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
{ {
std::cout << "processing sal event" << data->opcode << "\n"; std::cout << "processing sal event" << data->opcode << "\n";
/* sal Request*/ /* sal Request*/
...@@ -128,34 +129,42 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data) ...@@ -128,34 +129,42 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
* We need to wait for replication and invalidation to be done for delete and put request,hence we are putting data in map * We need to wait for replication and invalidation to be done for delete and put request,hence we are putting data in map
*/ */
rocksdb::Status s; rocksdb::Status s;
switch (req->type) if (req->type == MessageType::GET)
{ {
case MessageType::GET:
ep->processGet(req); ep->processGet(req);
break; }
case MessageType::DELETE: else if (req->type == MessageType::DELETE)
{
std::unique_lock<std::mutex> lock(_cqEventsMutex);
_cqEvents[id] = cqevent; _cqEvents[id] = cqevent;
lock.unlock();
replicateSalRequest(buffer, data->byte_len); replicateSalRequest(buffer, data->byte_len);
sendInvalidation(buffer); sendInvalidation(buffer);
s = ep->_db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize}); s = ep->_db->Delete(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize});
processReplicationDone(req->id, MessageType::FAILURE, true); if (s.ok())
break; processReplicationDone(req->id, MessageType::SUCCESS, true);
case MessageType::PUT: else
std::cout << "id " << id << " " << _cqEvents.size() << '\n'; processReplicationDone(req->id, MessageType::FAILURE, true);
}
else if (req->type == MessageType::PUT)
{
std::unique_lock<std::mutex> lock(_cqEventsMutex);
_cqEvents[id] = cqevent; _cqEvents[id] = cqevent;
std::cout << "size " << _cqEvents.size() << "\n"; lock.unlock();
replicateSalRequest(buffer, data->byte_len); replicateSalRequest(buffer, data->byte_len);
sendInvalidation(buffer); sendInvalidation(buffer);
s = ep->_db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize}, s = ep->_db->Put(rocksdb::WriteOptions(), {(char *)req + MessageHeaderSize, req->keySize},
{(char *)req + MessageHeaderSize + req->keySize, req->valueSize}); {(char *)req + MessageHeaderSize + req->keySize, req->valueSize});
; if (s.ok())
processReplicationDone(req->id, MessageType::FAILURE, true); processReplicationDone(req->id, MessageType::SUCCESS, true);
break; else
default: processReplicationDone(req->id, MessageType::FAILURE, true);
}
else
{
std::ostringstream ss; std::ostringstream ss;
ss << "SalRequest invalid req type" << data->opcode; ss << "SalRequest invalid req type" << data->opcode;
CPPLog::LOG_ERROR(ss); CPPLog::LOG_ERROR(ss);
break;
} }
delete[] buffer; delete[] buffer;
} }
...@@ -166,8 +175,6 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data) ...@@ -166,8 +175,6 @@ void TaskThread::processEvent(RdmaSalEndpoint *ep, struct ibv_wc *data)
CPPLog::LOG_INFO(ss); CPPLog::LOG_INFO(ss);
break; break;
} }
std::cout << "size "
<< " " << _cqEvents.size() << '\n';
} }
void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data) void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
...@@ -223,8 +230,9 @@ void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data) ...@@ -223,8 +230,9 @@ void TaskThread::processRepEvent(RdmaRepEndpoint *ep, struct ibv_wc *data)
void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool localWrite) void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool localWrite)
{ {
std::unique_lock<std::mutex> cqEventlock(_cqEventsMutex);
auto it = _cqEvents.find(id); auto it = _cqEvents.find(id);
// std::cout<<"size " <<_cqEvents.size()<<"\n"; std::cout << "size " << _cqEvents.size() << "\n";
if (it == _cqEvents.end()) if (it == _cqEvents.end())
{ {
std::ostringstream ss; std::ostringstream ss;
...@@ -233,13 +241,18 @@ void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool loca ...@@ -233,13 +241,18 @@ void TaskThread::processReplicationDone(uint32_t id, MessageType type, bool loca
return; return;
} }
// std::cout<<"rep done\n"; // std::cout<<"rep done\n";
// For local write we do not increase repdone count
if (!localWrite) if (!localWrite)
++(it->second->_repDone); ++(it->second->_repDone);
/* if there is no rep event then this condition will be false(0<0) and we can send response back to client*/
if (it->second->_repDone < it->second->_repRequired) if (it->second->_repDone < it->second->_repRequired)
{ {
// std::cout<<"event 2\n"; // std::cout<<"event 2\n";
return; return;
} }
_cqEvents.erase(it);
cqEventlock.unlock();
RdmaSalEndpoint *ep = it->second->_ep; RdmaSalEndpoint *ep = it->second->_ep;
char *sendBuf = nullptr; char *sendBuf = nullptr;
std::unique_lock<std::mutex> lock(ep->_sendBuffersM); std::unique_lock<std::mutex> lock(ep->_sendBuffersM);
...@@ -277,7 +290,6 @@ void TaskThread::replicateSalRequest(char *req, uint32_t size) ...@@ -277,7 +290,6 @@ void TaskThread::replicateSalRequest(char *req, uint32_t size)
{ {
serverRepIt->second->sendMessage(req, size); serverRepIt->second->sendMessage(req, size);
} }
} }
void TaskThread::sendInvalidation(char *req) void TaskThread::sendInvalidation(char *req)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment