Commit fc9ee38b authored by Paras Garg's avatar Paras Garg

Added properties file

parent 24b439e7
...@@ -3,4 +3,4 @@ ...@@ -3,4 +3,4 @@
.build/* .build/*
server server
*.txt *.txt
.log *.log
\ No newline at end of file \ No newline at end of file
...@@ -23,18 +23,14 @@ $(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp ...@@ -23,18 +23,14 @@ $(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp
@echo "Compiled "$<" successfully!" @echo "Compiled "$<" successfully!"
#$(BINDIR)/$(TARGET): $(OBJS) #$(BINDIR)/$(TARGET): $(OBJS)
$(TARGET) : $(OBJS) | $(OBJ_DIR) $(TARGET) : $(OBJS)
$(CXX) -o $@ $^ $(LIBS) $(CXX) -o $@ $^ $(LIBS)
@echo "Linked "$<" successfully!" @echo "Linked "$<" successfully!"
$(OBJ_DIR) :
mkdir -p $@
.PHONY = clean .PHONY = clean
clean: clean:
rm -r .build/* rm -f $(OBJ_DIR)/*
rm server rm -f $(TARGET)
.PHONY: count .PHONY: count
count: count:
......
...@@ -12,8 +12,8 @@ public: ...@@ -12,8 +12,8 @@ public:
inline bool operator()(const struct ibv_wc* c1,const struct ibv_wc* c2) inline bool operator()(const struct ibv_wc* c1,const struct ibv_wc* c2)
const const
{ {
struct SalRequest *req1 = (struct SalRequest *)c1->wr_id; struct SalRequestHeader *req1 = (struct SalRequestHeader *)c1->wr_id;
struct SalRequest *req2 = (struct SalRequest *)c2->wr_id; struct SalRequestHeader *req2 = (struct SalRequestHeader *)c2->wr_id;
if(req1->keySize != req2->keySize) if(req1->keySize != req2->keySize)
return true; return true;
char* key1 = (char*)req1+SalRequestHeaderSize; char* key1 = (char*)req1+SalRequestHeaderSize;
...@@ -72,7 +72,7 @@ public: ...@@ -72,7 +72,7 @@ public:
{ {
return value; return value;
} }
std::cout<<"value "<<value<<std::endl; // std::cout<<"value "<<value<<std::endl;
if (runningRequests.empty()) if (runningRequests.empty())
{ {
runningRequests.insert(value); runningRequests.insert(value);
...@@ -84,7 +84,7 @@ public: ...@@ -84,7 +84,7 @@ public:
if (it != runningRequests.end()) if (it != runningRequests.end())
{ {
queue2.push(value); queue2.push(value);
std::cout<<"found putting in 2"<<std::endl; //std::cout<<"found putting in 2"<<std::endl;
return NULL; return NULL;
} }
return value; return value;
...@@ -92,7 +92,7 @@ public: ...@@ -92,7 +92,7 @@ public:
void removeFromSet(struct ibv_wc* data) void removeFromSet(struct ibv_wc* data)
{ {
std::unique_lock<std::mutex> lock(queueMutex); std::unique_lock<std::mutex> lock(queueMutex);
std::cout<<"removing"<<data<<std::endl; // std::cout<<"removing"<<data<<std::endl;
runningRequests.erase(data); runningRequests.erase(data);
} }
void void
......
...@@ -44,7 +44,7 @@ ...@@ -44,7 +44,7 @@
#include <pthread.h> #include <pthread.h>
#endif #endif
namespace CPlusPlusLogging namespace CPPLog
{ {
// Direct Interface for logging into log file or console using MACRO(s) // Direct Interface for logging into log file or console using MACRO(s)
#define LOG_ERROR(x) Logger::getInstance()->error(x) #define LOG_ERROR(x) Logger::getInstance()->error(x)
......
...@@ -4,12 +4,16 @@ enum RequestType ...@@ -4,12 +4,16 @@ enum RequestType
{ {
GET, GET,
PUT, PUT,
DELETE, DELETE
};
enum ResponseStatus
{
SUCCESS,
FAILURE,
INVALIDATE INVALIDATE
}; };
struct __attribute__ ((__packed__)) SalRequestHeader
struct __attribute__ ((__packed__)) SalRequest
{ {
uint32_t id; uint32_t id;
enum RequestType type; enum RequestType type;
...@@ -17,30 +21,32 @@ struct __attribute__ ((__packed__)) SalRequest ...@@ -17,30 +21,32 @@ struct __attribute__ ((__packed__)) SalRequest
uint32_t valueSize; uint32_t valueSize;
}; };
struct __attribute__ ((__packed__)) SalResponse struct __attribute__ ((__packed__)) SalResponseHeader
{ {
//private:
uint32_t id; uint32_t id;
enum RequestType type; enum ResponseStatus status;
//public: /*
uint32_t size; * Note value will be present only in case of response status is success
*/
uint32_t valueSize;
}; };
struct __attribute__ ((__packed__)) InvRequest struct __attribute__ ((__packed__)) InvRequestHeader
{ {
//private:
uint32_t id; uint32_t id;
enum RequestType type; enum ResponseStatus type;
//public: uint32_t keySize;
uint32_t keySize;
}; };
static uint32_t SUCCESS = 0; struct __attribute__ ((__packed__)) InvResponseHeader
static uint32_t FAILURE = 1; {
uint32_t id;
enum ResponseStatus status;
};
static uint32_t SalRequestHeaderSize = sizeof(SalRequestHeader);
static uint32_t SalResponseHeaderSize = sizeof(SalResponseHeader);
static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader);
static uint32_t InvResponseHeaderSize = sizeof(InvResponseHeader);
static int32_t SalRequestHeaderSize = sizeof(SalRequest);
static int32_t SalResponseSize = sizeof(SalResponse);
static uint32_t InvRequestHeaderSize = sizeof(InvRequest);
#endif #endif
\ No newline at end of file
...@@ -48,6 +48,7 @@ public: ...@@ -48,6 +48,7 @@ public:
{ {
std::cout << "CqProcessr : starting process CQ events" << std::endl; std::cout << "CqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaRepCqProcessor::processCQEvents, this); _compQueueThread = new std::thread(&RdmaRepCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(),"RepCQ");
} }
void processCQEvents() void processCQEvents()
{ {
...@@ -84,6 +85,9 @@ public: ...@@ -84,6 +85,9 @@ public:
struct ibv_wc *data = new struct ibv_wc(wc_array[i]); struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
//data->vendor_err = 1; //data->vendor_err = 1;
//_executor->submit(data); //_executor->submit(data);
/*
* created new thread because currently we are not getting data from followers
*/
new std::thread(&RdmaRepCqProcessor::processRepEvent, this,data); new std::thread(&RdmaRepCqProcessor::processRepEvent, this,data);
} }
......
...@@ -22,9 +22,9 @@ public: ...@@ -22,9 +22,9 @@ public:
void processCqEvent(struct ibv_wc wc); void processCqEvent(struct ibv_wc wc);
void processSendCompletion(struct ibv_wc *data); void processSendCompletion(struct ibv_wc *data);
void processRecvCompletion(struct ibv_wc *data); void processRecvCompletion(struct ibv_wc *data);
void processDelete(struct SalRequest *); void processDelete(struct SalRequestHeader *);
void processGet(struct SalRequest *req); void processGet(struct SalRequestHeader *req);
void processPut(struct SalRequest *req); void processPut(struct SalRequestHeader *req);
int sendMessage(const char *buffer, uint32_t size); int sendMessage(const char *buffer, uint32_t size);
void close(); void close();
}; };
......
#For commenting used # empty line are ignored
#comments after parameters also supported
# use key=value format
#All Parameters will be taken as string
# Fixed Parameters
ENABLE_LOGGING=0
SERVER_IP=192.168.200.20
SERVER_PORT=1921
EXECUTOR_POOL_SIZE=4
\ No newline at end of file
# To do list
### High Priority:
- [] consensus
- [] failure detection and recovery
- [] explore multicast
- [x] Added logging api
- [ ] Add future in client
- [ ] add multi server in client
- [ ] add yscb
- [ ] add dynamic hashing in client code
### Low Priority
- [ ] Handle RDMA_CM_EVENT_TIMEWAIT_EXIT when closed connection
- [ ] Handle Work Request Flushed Error on qp
References: References:
> https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/ <br> > https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/ <br>
> https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html <br> > https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html <br>
...@@ -7,6 +22,13 @@ References: ...@@ -7,6 +22,13 @@ References:
> https://www.codeproject.com/Tips/987850/Logging-in-Cplusplus <br> > https://www.codeproject.com/Tips/987850/Logging-in-Cplusplus <br>
> https://www.mygreatlearning.com/blog/readme-file/ <br> > https://www.mygreatlearning.com/blog/readme-file/ <br>
git update-index --assume-unchanged FILE_NAME
and if you want to track the changes again use this command:
git update-index --no-assume-unchanged FILE_NAME
Example to set cpu affinity:“` Example to set cpu affinity:“`
``` ```
cpu_set_t cpuset; cpu_set_t cpuset;
......
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include "Logger.hpp" #include "Logger.hpp"
using namespace std; using namespace std;
using namespace CPlusPlusLogging; using namespace CPPLog;
Logger* Logger::m_Instance = 0; Logger* Logger::m_Instance = 0;
......
#include <string>
#include<iostream>
#include<fstream>
#include<map>
class Properties{
private:
std::map<std::string,std::string> _props;
const std::string _WHITESPACE = " \n\r\t\f\v";
std::string ltrim(const std::string& s)
{
size_t start = s.find_first_not_of(_WHITESPACE);
return (start == std::string::npos) ? "" : s.substr(start);
}
std::string rtrim(const std::string& s)
{
size_t end = s.find_last_not_of(_WHITESPACE);
return (end == std::string::npos) ? "" : s.substr(0, end + 1);
}
std::string trim(const std::string& s)
{
return rtrim(ltrim(s));
}
public:
Properties(std::string filename){
//std::cout<<"Reading Properties From file named prop.config ...........\n";
std::ifstream file (filename);
if(!file.is_open()){
std::cout<<"Confiq file opening failed\n";
exit(0);
}
std::string line;
std::string key,value;
int delimPos;
while(getline(file,line)){
delimPos=line.find('#');
line=trim(line);
if(!line.empty()){
line=line.substr(0,delimPos);
delimPos=line.find('=');
_props.insert(make_pair(trim(line.substr(0,delimPos)),trim(line.substr(delimPos+1))));
}
}
}
std::string getValue(std::string key){
auto it=_props.find(key);
if(it==_props.end()){
return "";
}
return it->second;
}
};
/*
int main(){
Properties prop;
std::cout<<prop.getValue("NO_OF_CLIENT");
// std::cout<<DEPARTURE_EVENT;
return 11;
}
*/
\ No newline at end of file
#include "RdmaSalEndpoint.hpp" #include "RdmaSalEndpoint.hpp"
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize, RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *db) int recvQueueSize, int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize) : RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize), _db(db)
,_db(db)
{ {
} }
void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data) void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data)
{ {
std::cout << "send completion\n"; /*means data has been send to other side we can use this buffer*/
_sendBuffers->push((void *)data->wr_id); _sendBuffers->push((void *)data->wr_id);
} }
...@@ -18,7 +16,7 @@ int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size) ...@@ -18,7 +16,7 @@ int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size)
{ {
if (size > _sendMsgSize) if (size > _sendMsgSize)
return -1; return -1;
void* sendBuffer = nullptr; void *sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer); _sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr) if (sendBuffer == nullptr)
return -1; return -1;
...@@ -30,11 +28,8 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data) ...@@ -30,11 +28,8 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data)
{ {
char *request = new char[data->byte_len]; char *request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id, data->byte_len); memcpy(request, (void *)data->wr_id, data->byte_len);
struct SalRequest *req = (struct SalRequest *)request; struct SalRequestHeader *req = (struct SalRequestHeader *)request;
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr); rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize << " " << req->type << "size" << data->byte_len << "\n";
if (req->type == RequestType::DELETE) if (req->type == RequestType::DELETE)
processDelete(req); processDelete(req);
if (req->type == RequestType::GET) if (req->type == RequestType::GET)
...@@ -44,37 +39,27 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data) ...@@ -44,37 +39,27 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data)
delete[] request; delete[] request;
} }
void RdmaSalEndpoint::processDelete(struct SalRequest *req) void RdmaSalEndpoint::processDelete(struct SalRequestHeader *req)
{ {
std::cout<<"0\n";
rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}); rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize});
std::cout<<"1\n";
void *sendBuf = nullptr; void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf); _sendBuffers->pop(sendBuf);
if (sendBuf == nullptr) if (sendBuf == nullptr)
{ {
return; return;
} }
char* sendBuffer = (char*)sendBuf; SalResponseHeader *response = (SalResponseHeader *)sendBuf;
std::cout<<"2 "<<req->id<<"\n"; /*This id done to avoid else case*/
memcpy(sendBuffer, &(req->id), sizeof(uint32_t)); response->status = ResponseStatus::FAILURE;
response->id = req->id;
if (s.ok()) if (s.ok())
{ {
std::cout<<"33\n"; response->status = ResponseStatus::SUCCESS;
memcpy(sendBuffer+4, &SUCCESS, sizeof(int));
std::cout<<"44\n";
} }
else rdma_post_send(_cm_id, sendBuf, sendBuf, SalResponseHeaderSize, _sendMr, 0);
{
std::cout<<"331\n";
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
std::cout<<"441\n";
}
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
std::cout<<"3\n";
} }
void RdmaSalEndpoint::processGet(struct SalRequest *req) void RdmaSalEndpoint::processGet(struct SalRequestHeader *req)
{ {
std::string value; std::string value;
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, &value); rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, &value);
...@@ -84,38 +69,35 @@ void RdmaSalEndpoint::processGet(struct SalRequest *req) ...@@ -84,38 +69,35 @@ void RdmaSalEndpoint::processGet(struct SalRequest *req)
{ {
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf;
char* sendBuffer = (char*)sendBuf; /*This id done to avoid else case*/
memcpy(sendBuffer, &(req->id), sizeof(uint32_t)); response->status = ResponseStatus::FAILURE;
response->id = req->id;
if (s.ok()) if (s.ok())
{ {
memcpy(sendBuffer+4, &SUCCESS, sizeof(int)); response->status = ResponseStatus::SUCCESS;
memcpy(sendBuffer+8, (void *)value.size(), sizeof(value.size())); response->valueSize = value.size();
memcpy(sendBuffer+12, value.c_str(), value.size()); memcpy(response + SalResponseHeaderSize, value.c_str(), value.size());
} }
else rdma_post_send(_cm_id, sendBuf, sendBuf, SalRequestHeaderSize + value.size(), _sendMr, 0);
{
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
}
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
} }
void RdmaSalEndpoint::processPut(struct SalRequest *req)
void RdmaSalEndpoint::processPut(struct SalRequestHeader *req)
{ {
rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize},
{(char *)req + SalRequestHeaderSize + req->keySize, req->valueSize}); {(char *)req + SalRequestHeaderSize + req->keySize, req->valueSize});
void *sendBuf = nullptr; void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf); _sendBuffers->pop(sendBuf);
if (sendBuf != nullptr) if (sendBuf == nullptr)
{ {
std::cout<<"No send Buffer"<<std::endl;
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf;
char* sendBuffer = (char*)sendBuf; /*This id done to avoid else case*/
memcpy(sendBuffer, &(req->id), sizeof(uint32_t)); response->status = ResponseStatus::FAILURE;
response->id = req->id;
if (s.ok()) if (s.ok())
memcpy(sendBuffer+4, &SUCCESS, sizeof(int)); response->status = ResponseStatus::FAILURE;
else rdma_post_send(_cm_id, sendBuf, sendBuf, SalResponseHeaderSize, _sendMr, 0);
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
} }
...@@ -98,9 +98,9 @@ void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event) ...@@ -98,9 +98,9 @@ void RdmaServerEndpointGroup::createEpCmEvent(struct rdma_cm_event *event)
endpoint->createResources(); endpoint->createResources();
std::unique_lock lock(_salMutex); std::unique_lock lock(_salMutex);
_salEps->push_back(endpoint); _salEps->push_back(endpoint);
std::cout<<"qp num"<<event->id->qp->qp_num<<" "<<_qpSalEndpointMap->size(); // std::cout<<"qp num"<<event->id->qp->qp_num<<" "<<_qpSalEndpointMap->size();
_qpSalEndpointMap->emplace(event->id->qp->qp_num, endpoint); _qpSalEndpointMap->emplace(event->id->qp->qp_num, endpoint);
std::cout<<_qpSalEndpointMap->size()<<std::endl; // std::cout<<_qpSalEndpointMap->size()<<std::endl;
} }
else else
{ {
......
...@@ -3,23 +3,22 @@ ...@@ -3,23 +3,22 @@
#include "Executor.hpp" #include "Executor.hpp"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "Properties.cpp"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "Logger.hpp" #include "Logger.hpp"
using namespace CPlusPlusLogging;
int main() int main()
{ {
// Logger* pLogger = NULL; // Create the object pointer for Logger Class CPPLog::LOG_INFO("Starting Server Main Thread");
// pLogger = Logger::getInstance(); Properties prop("prop.config");
// pLogger->info(""); RdmaServerEndpointGroup *group = new RdmaServerEndpointGroup(5, 5, 5, 100, 100);
LOG_INFO(":dsdsd"); Executor *ex = new Executor(stoi(prop.getValue("EXECUTOR_POOL_SIZE")), group);
RdmaServerEndpointGroup *group = new RdmaServerEndpointGroup(5, 5, 5, 50, 50);
Executor *ex = new Executor(4, group);
group->setExecutor(ex); group->setExecutor(ex);
group->bind("192.168.200.20", "1921", 2); group->bind(prop.getValue("SERVER_IP").c_str(), prop.getValue("SERVER_PORT").c_str(), 2);
group->startCmProcessor(false); group->startCmProcessor(false);
std::cout << "rhdhj" << std::endl; std::cout << "rhdhj" << std::endl;
// Just to make main thread wait else program will exit
while (1) while (1)
; ;
} }
\ No newline at end of file
...@@ -28,6 +28,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr ...@@ -28,6 +28,7 @@ TaskThread::TaskThread(int id, ConcurrentQueue *taskqueue, RdmaEndpointGroup *gr
std::cout << "pthread create has been failed while creating taskthread " << std::endl; std::cout << "pthread create has been failed while creating taskthread " << std::endl;
exit(0); exit(0);
} }
pthread_setname_np(thread,"TaskThread");
} }
TaskThread::~TaskThread() TaskThread::~TaskThread()
...@@ -35,6 +36,7 @@ TaskThread::~TaskThread() ...@@ -35,6 +36,7 @@ TaskThread::~TaskThread()
std::cout << "Task Destructed" << std::endl; std::cout << "Task Destructed" << std::endl;
stop(); stop();
} }
void TaskThread::stop() void TaskThread::stop()
{ {
_stop = true; _stop = true;
...@@ -57,7 +59,6 @@ inline void *TaskThread::run(void *object) ...@@ -57,7 +59,6 @@ inline void *TaskThread::run(void *object)
thread->processEvent(data); thread->processEvent(data);
thread->_taskQueue->removeFromSet(data); thread->_taskQueue->removeFromSet(data);
delete data; delete data;
} }
} }
return NULL; return NULL;
...@@ -71,10 +72,6 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -71,10 +72,6 @@ void TaskThread::processEvent(struct ibv_wc *data)
std::cout << ibv_wc_status_str(data->status) << " on qp " << data->qp_num << std::endl; std::cout << ibv_wc_status_str(data->status) << " on qp " << data->qp_num << std::endl;
return; return;
} }
/*
* Process Request from client
*/
auto it = _group->_qpSalEndpointMap->find(data->qp_num); auto it = _group->_qpSalEndpointMap->find(data->qp_num);
if (it == _group->_qpSalEndpointMap->end()) if (it == _group->_qpSalEndpointMap->end())
{ {
...@@ -94,7 +91,7 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -94,7 +91,7 @@ void TaskThread::processEvent(struct ibv_wc *data)
memcpy(buffer, (void *)data->wr_id, data->byte_len); memcpy(buffer, (void *)data->wr_id, data->byte_len);
rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id, rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
it->second->_recvMsgSize, it->second->_recvMr); it->second->_recvMsgSize, it->second->_recvMr);
struct SalRequest *req = (struct SalRequest *)buffer; struct SalRequestHeader *req = (struct SalRequestHeader *)buffer;
std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize; std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize;
std::cout << " " << req->type << "size" << data->byte_len << "\n"; std::cout << " " << req->type << "size" << data->byte_len << "\n";
switch (req->type) switch (req->type)
...@@ -104,7 +101,7 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -104,7 +101,7 @@ void TaskThread::processEvent(struct ibv_wc *data)
break; break;
case RequestType::DELETE: case RequestType::DELETE:
replicateSalRequest(buffer, data->byte_len); replicateSalRequest(buffer, data->byte_len);
it->second->processGet(req); it->second->processDelete(req);
break; break;
case RequestType::PUT: case RequestType::PUT:
replicateSalRequest(buffer, data->byte_len); replicateSalRequest(buffer, data->byte_len);
...@@ -130,17 +127,19 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -130,17 +127,19 @@ void TaskThread::processEvent(struct ibv_wc *data)
} }
void TaskThread::replicateSalRequest(char *req, uint32_t size) void TaskThread::replicateSalRequest(char *req, uint32_t size)
{ {
//send to follower to Replicate Request
for (auto i = _group->_qpRepEndpointMap->begin(); i != _group->_qpRepEndpointMap->end(); i++) for (auto i = _group->_qpRepEndpointMap->begin(); i != _group->_qpRepEndpointMap->end(); i++)
{ {
i->second->sendMessage(req, size); i->second->sendMessage(req, size);
} }
SalRequest *salReq = (SalRequest *)req; SalRequestHeader *salReq = (SalRequestHeader *)req;
char *buffer = new char[InvRequestHeaderSize + salReq->keySize]; char *buffer = new char[InvRequestHeaderSize + salReq->keySize];
InvRequest *invRequest = (InvRequest *)(buffer); InvRequestHeader *invRequest = (InvRequestHeader *)(buffer);
invRequest->type = RequestType::INVALIDATE; invRequest->type = ResponseStatus::INVALIDATE;
invRequest->id = salReq->id; invRequest->id = salReq->id;
invRequest->keySize = salReq->keySize; invRequest->keySize = salReq->keySize;
memcpy(buffer + 12, salReq + SalRequestHeaderSize, salReq->keySize); memcpy(buffer + InvRequestHeaderSize, salReq + SalRequestHeaderSize, salReq->keySize);
//Send Invalidation to sal's
for (auto i = _group->_qpSalEndpointMap->begin(); i != _group->_qpSalEndpointMap->end(); i++) for (auto i = _group->_qpSalEndpointMap->begin(); i != _group->_qpSalEndpointMap->end(); i++)
{ {
i->second->sendMessage(buffer, InvRequestHeaderSize + salReq->keySize); i->second->sendMessage(buffer, InvRequestHeaderSize + salReq->keySize);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment