Commit d7793d3c authored by p's avatar p

Satisified sequential but without progress of threads

parent 16411c49
*.o
.vscode/
TaskThread.cpp
server
*.txt
\ No newline at end of file
......@@ -3,12 +3,118 @@
#include <queue>
#include <mutex>
#include <condition_variable>
#include <set>
#include <string>
class Comparator
{
public:
inline bool operator()(const struct ibv_wc* c1,const struct ibv_wc* c2)
const
{
struct SalRequest *req1 = (struct SalRequest *)c1->wr_id;
struct SalRequest *req2 = (struct SalRequest *)c2->wr_id;
if(req1->keySize != req2->keySize)
return true;
char* key1 = (char*)req1+SalRequestHeaderSize;
char* key2 = (char*)req2+SalRequestHeaderSize;
for(int i=0;i<req1->keySize;i++)
{
if(key1[i]!=key2[i])
return true;
}
return false;
}
};
class ConcurrentQueue
{
private:
std::queue<struct ibv_wc *> queue1;
std::queue<struct ibv_wc *> queue2;
std::mutex queueMutex;
std::set<struct ibv_wc *, Comparator> runningRequests;
std::condition_variable queueCv;
public:
void push(struct ibv_wc *const &data)
{
std::unique_lock<std::mutex> lock(queueMutex);
queue1.push(data);
std::cout<<data<<std::endl;
lock.unlock();
queueCv.notify_one();
}
bool empty()
{
std::unique_lock<std::mutex> lock(queueMutex);
return queue1.empty() && queue2.empty();
}
struct ibv_wc *try_pop()
{
struct ibv_wc *value = NULL;
std::unique_lock<std::mutex> lock(queueMutex);
if (queue2.empty())
{
queueCv.wait(lock, [&]
{ return queue1.size() > 0; });
value = queue1.front();
queue1.pop();
}
else
{
value = queue2.front();
queue2.pop();
}
if (value->opcode != IBV_WC_RECV)
{
return value;
}
std::cout<<"value "<<value<<std::endl;
if (runningRequests.empty())
{
runningRequests.insert(value);
return value
;
}
auto it = runningRequests.find(value);
if (it != runningRequests.end())
{
queue2.push(value);
std::cout<<"found putting in 2"<<std::endl;
return NULL;
}
return value;
}
void removeFromSet(struct ibv_wc* data)
{
std::unique_lock<std::mutex> lock(queueMutex);
std::cout<<"removing"<<data<<std::endl;
runningRequests.erase(data);
}
void
wait_and_pop(struct ibv_wc *&popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCv.wait(lock, [&]
{ return queue1.size() > 0; });
popped_value = queue1.front();
queue1.pop();
}
};
#endif
/*
template <typename Data>
class ConcurrentQueue
{
private:
std::queue<Data> queue;
std::queue1<Data> queue;
std::queue2<Data> queue;
std::mutex queueMutex;
std::condition_variable queueCv;
......@@ -16,7 +122,7 @@ public:
void push(Data const &data)
{
std::unique_lock<std::mutex> lock(queueMutex);
queue.push(data);
queue1.push(data);
lock.unlock();
queueCv.notify_one();
}
......@@ -30,7 +136,7 @@ public:
bool try_pop(Data &popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
if (queue.empty())
if (queue2.empty())
{
return false;
}
......@@ -49,3 +155,4 @@ public:
}
};
#endif
*/
\ No newline at end of file
......@@ -2,7 +2,7 @@
Executor::Executor(int size, RdmaEndpointGroup *group)
: _size(size), _group(group)
{
_taskQueue = new ConcurrentQueue<struct ibv_wc *>();
_taskQueue = new ConcurrentQueue();
_taskThreads = new std::vector<TaskThread *>();
_taskThreads->reserve(size);
for (int i = 0; i < _size; i++)
......
......@@ -10,7 +10,7 @@ class Executor
{
int _size{0};
std::vector<TaskThread *> *_taskThreads{NULL};
ConcurrentQueue<ibv_wc *> *_taskQueue{NULL};
ConcurrentQueue *_taskQueue{NULL};
RdmaEndpointGroup *_group;
public:
......
......@@ -15,7 +15,8 @@ server1: object
g++ -std=c++17 -o server -ggdb *.o $(LIBS)
server: object
g++ -std=c++17 -o server -ggdb Server.o Executor.o TaskThread.o RdmaServerEndpointGroup.o RdmaReplicationEndpoint.o RdmaSalEndpoint.o RdmaEndpoint.o RdmaCmProcessor.o RdmaCqProcessor.o $(LIBS)
g++ -std=c++17 -o server -ggdb Server.o Executor.o TaskThread.o RdmaServerEndpointGroup.o\
RdmaReplicationEndpoint.o RdmaSalEndpoint.o RdmaEndpoint.o RdmaSalCqProcessor.o RdmaCmProcessor.o RdmaCqProcessor.o $(LIBS)
.PHONY:
clean:
rm -f *.o *.gch server
......@@ -82,14 +82,18 @@ public:
for (int i = 0; i < ret; i++)
{
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
data->vendor_err = 1;
_executor->submit(data);
//data->vendor_err = 1;
//_executor->submit(data);
new std::thread(&RdmaRepCqProcessor::processRepEvent, this,data);
}
//_executor->dispatchRepCqEvents(wc_array, ret);
}
}
void processRepEvent(struct ibv_wc* data)
{
std::cout<<"procesing Replication request"<<std::endl;
}
void close()
{
_stop = true;
......
......@@ -71,8 +71,8 @@ RdmaSalCqProcessor::RdmaSalCqProcessor(Executor *ex, ibv_context *verbs, int com
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
/*
* vendor_err is set to check whether the request came from sal or followers
* data->vendor_err = 0;
*/
data->vendor_err = 0;
_executor->submit(data);
}
// _executor->dispatchSalCqEvents(wc_array, ret);
......
#include "TaskThread.hpp"
#include "MessageFormats.hpp"
TaskThread::TaskThread(int id, int cpu, ConcurrentQueue<struct ibv_wc *> *taskqueue, RdmaEndpointGroup *group)
TaskThread::TaskThread(int id, int cpu, ConcurrentQueue *taskqueue, RdmaEndpointGroup *group)
: _id(id), _group(group)
{
_taskQueue = taskqueue;
......@@ -19,7 +19,7 @@ TaskThread::TaskThread(int id, int cpu, ConcurrentQueue<struct ibv_wc *> *taskqu
}
}
TaskThread::TaskThread(int id, ConcurrentQueue<struct ibv_wc *> *taskqueue, RdmaEndpointGroup *group)
TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *group)
: _id(id), _group(group)
{
_taskQueue = taskqueue;
......@@ -50,9 +50,15 @@ inline void *TaskThread::run(void *object)
while (!thread->_stop)
{
struct ibv_wc *data = NULL;
thread->_taskQueue->wait_and_pop(data);
data = thread->_taskQueue->try_pop();
if (data != NULL)
{
std::cout<<"TaskThread:: got data"<<std::endl;
thread->processEvent(data);
thread->_taskQueue->removeFromSet(data);
delete data;
}
}
return NULL;
}
......@@ -66,56 +72,24 @@ void TaskThread::processEvent(struct ibv_wc *data)
return;
}
/*
* Process Request from followers
*/
if (data->vendor_err == 1)
{
auto it = _group->_qpRepEndpointMap->find(data->qp_num);
if (it == _group->_qpRepEndpointMap->end())
{
std::cout << "RdmaRep : endpoint not registered for qp num\n";
return;
}
// processRepCQEvent(data, it->second);
switch (data->opcode)
{
case IBV_WC_SEND:
it->second->processSendCompletion(data);
break;
case IBV_WC_RECV:
//ep->processRecvCompletion(data);
break;
case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n";
break;
case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n";
break;
default:
std::cout << "TaskThread default opcode : " << data->opcode << std::endl;
break;
}
}
/*
* Process Request from client
*/
if (data->vendor_err == 0)
{
auto it = _group->_qpSalEndpointMap->find(data->qp_num);
if (it == _group->_qpSalEndpointMap->end())
{
std::cout << data->qp_num << "RdmaSal : endpoint not registered for qp num\n";
return;
}
//processSalCQEvent(data, it->second);
// processSalCQEvent(data, it->second);
switch (data->opcode)
{
case IBV_WC_SEND:
it->second->processSendCompletion(data);
break;
case IBV_WC_RECV:
//it->second->processRecvCompletion(data);
{
// it->second->processRecvCompletion(data);
char *buffer = new char[data->byte_len];
memcpy(buffer, (void *)data->wr_id, data->byte_len);
rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
......@@ -153,7 +127,6 @@ void TaskThread::processEvent(struct ibv_wc *data)
std::cout << "TaskThread default opcode : " << data->opcode << std::endl;
break;
}
}
}
void TaskThread::replicateSalRequest(char *req, uint32_t size)
{
......
......@@ -13,15 +13,15 @@ class TaskThread
{
private:
ConcurrentQueue<struct ibv_wc *> *_taskQueue;
ConcurrentQueue *_taskQueue;
bool _stop{false};
int _id;
pthread_t thread;
RdmaEndpointGroup *_group;
public:
TaskThread(int id, int cpu, ConcurrentQueue<struct ibv_wc *> *, RdmaEndpointGroup *group);
TaskThread(int id, ConcurrentQueue<struct ibv_wc *> *, RdmaEndpointGroup *group);
TaskThread(int id, int cpu, ConcurrentQueue *, RdmaEndpointGroup *group);
TaskThread(int id, ConcurrentQueue *, RdmaEndpointGroup *group);
void replicateSalRequest(char *salRequest,uint32_t size);
static void *run(void *object);
void stop();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment