Commit 72250328 authored by Paras Garg's avatar Paras Garg

Added properties file

parent 6337beff
No preview for this file type
SRCS := $(shell ls src/) TARGET := fol
SRC_DIR := src SRC_DIR := src
OBJS := $(SRCS:.cpp=.o) OBJ_DIR := .build
BUILD_DIR := .build SRCS := $(wildcard $(SRC_DIR)/*.cpp)
OBJS := $(addprefix $(BUILD_DIR)/, $(OBJS)) OBJS := $(SRCS:$(SRC_DIR)/%.cpp=$(OBJ_DIR)/%.o)
#SRCS := $(shell ls src/)
#OBJS := $(SRCS:.cpp=.o)
#OBJS := $(addprefix $(BUILD_DIR)/, $(OBJS))
CXX = g++ CXX = g++
CXXFLAGS += -O3 -Wall -std=c++17 -I header #compiling flags
CXXFLAGS += -g CXXFLAGS += -g -O3 -Wall -std=c++17 -I header
#libraries
LIBS += -libverbs LIBS += -libverbs
LIBS += -lrdmacm LIBS += -lrdmacm
LIBS += -pthread LIBS += -pthread
LIBS += -lrocksdb LIBS += -lrocksdb
Target := fol
.phony = clean
$(BUILD_DIR)/%.o: $(SRC_DIR)/%.cpp $(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp
$(CXX) -o $@ $(CXXFLAGS) -c $< $(LIBS) $(CXX) -o $@ $(CXXFLAGS) -c $< $(LIBS)
@echo "Compiled "$<" successfully!"
$(Target) : $(OBJS) | $(BUILD_DIR)
#$(BINDIR)/$(TARGET): $(OBJS)
$(TARGET) : $(OBJS)
$(CXX) -o $@ $^ $(LIBS) $(CXX) -o $@ $^ $(LIBS)
@echo "Linked "$<" successfully!"
$(BUILD_DIR) :
mkdir -p $@ .PHONY = clean
\ No newline at end of file clean:
rm -f $(OBJ_DIR)/*
rm -f $(TARGET)
.PHONY: count
count:
find . -type f -name "*.hpp"|xargs wc -l
find . -type f -name "*.cpp"|xargs wc -l
@echo "Lines of code counted!"
\ No newline at end of file
No preview for this file type
///////////////////////////////////////////////////////////////////////////////
// @File Name: Logger.h //
// @Author: Pankaj Choudhary //
// @Version: 0.0.1 //
// @L.M.D: 13th April 2015 //
// @Description: For Logging into file //
// //
// Detail Description: //
// Implemented complete logging mechanism, Supporting multiple logging type //
// like as file based logging, console base logging etc. It also supported //
// for different log type. //
// //
// Thread Safe logging mechanism. Compatible with VC++ (Windows platform) //
// as well as G++ (Linux platform) //
// //
// Supported Log Type: ERROR, ALARM, ALWAYS, INFO, BUFFER, TRACE, DEBUG //
// //
// No control for ERROR, ALRAM and ALWAYS messages. These type of messages //
// should be always captured. //
// //
// BUFFER log type should be use while logging raw buffer or raw messages //
// //
// Having direct interface as well as C++ Singleton inface. can use //
// whatever interface want. //
// //
///////////////////////////////////////////////////////////////////////////////
#ifndef _LOGGER_H_
#define _LOGGER_H_
// C++ Header File(s)
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#ifdef WIN32
// Win Socket Header File(s)
#include <Windows.h>
#include <process.h>
#else
// POSIX Socket Header File(s)
#include <errno.h>
#include <pthread.h>
#endif
namespace CPPLog
{
// Direct Interface for logging into log file or console using MACRO(s)
#define LOG_ERROR(x) Logger::getInstance()->error(x)
#define LOG_ALARM(x) Logger::getInstance()->alarm(x)
#define LOG_ALWAYS(x) Logger::getInstance()->always(x)
#define LOG_INFO(x) Logger::getInstance()->info(x)
#define LOG_BUFFER(x) Logger::getInstance()->buffer(x)
#define LOG_TRACE(x) Logger::getInstance()->trace(x)
#define LOG_DEBUG(x) Logger::getInstance()->debug(x)
// enum for LOG_LEVEL
typedef enum LOG_LEVEL
{
DISABLE_LOG = 1,
LOG_LEVEL_INFO = 2,
LOG_LEVEL_BUFFER = 3,
LOG_LEVEL_TRACE = 4,
LOG_LEVEL_DEBUG = 5,
ENABLE_LOG = 6,
}LogLevel;
// enum for LOG_TYPE
typedef enum LOG_TYPE
{
NO_LOG = 1,
CONSOLE = 2,
FILE_LOG = 3,
}LogType;
class Logger
{
public:
static Logger* getInstance() throw ();
// Interface for Error Log
void error(const char* text) throw();
void error(std::string& text) throw();
void error(std::ostringstream& stream) throw();
// Interface for Alarm Log
void alarm(const char* text) throw();
void alarm(std::string& text) throw();
void alarm(std::ostringstream& stream) throw();
// Interface for Always Log
void always(const char* text) throw();
void always(std::string& text) throw();
void always(std::ostringstream& stream) throw();
// Interface for Buffer Log
void buffer(const char* text) throw();
void buffer(std::string& text) throw();
void buffer(std::ostringstream& stream) throw();
// Interface for Info Log
void info(const char* text) throw();
void info(std::string& text) throw();
void info(std::ostringstream& stream) throw();
// Interface for Trace log
void trace(const char* text) throw();
void trace(std::string& text) throw();
void trace(std::ostringstream& stream) throw();
// Interface for Debug log
void debug(const char* text) throw();
void debug(std::string& text) throw();
void debug(std::ostringstream& stream) throw();
// Error and Alarm log must be always enable
// Hence, there is no interfce to control error and alarm logs
// Interfaces to control log levels
void updateLogLevel(LogLevel logLevel);
void enaleLog(); // Enable all log levels
void disableLog(); // Disable all log levels, except error and alarm
// Interfaces to control log Types
void updateLogType(LogType logType);
void enableConsoleLogging();
void enableFileLogging();
protected:
Logger();
~Logger();
// Wrapper function for lock/unlock
// For Extensible feature, lock and unlock should be in protected
void lock();
void unlock();
std::string getCurrentTime();
private:
void logIntoFile(std::string& data);
void logOnConsole(std::string& data);
Logger(const Logger& obj) {}
void operator=(const Logger& obj) {}
private:
static Logger* m_Instance;
std::ofstream m_File;
#ifdef WIN32
CRITICAL_SECTION m_Mutex;
#else
pthread_mutexattr_t m_Attr;
pthread_mutex_t m_Mutex;
#endif
LogLevel m_LogLevel;
LogType m_LogType;
};
} // End of namespace
#endif // End of _LOGGER_H_
...@@ -7,9 +7,13 @@ enum RequestType ...@@ -7,9 +7,13 @@ enum RequestType
DELETE, DELETE,
INVALIDATE INVALIDATE
}; };
enum ResponseStatus
{
SUCCESS,
FAILURE
};
struct __attribute__ ((__packed__)) SalRequestHeader
struct __attribute__ ((__packed__)) SalRequest
{ {
uint32_t id; uint32_t id;
enum RequestType type; enum RequestType type;
...@@ -17,30 +21,32 @@ struct __attribute__ ((__packed__)) SalRequest ...@@ -17,30 +21,32 @@ struct __attribute__ ((__packed__)) SalRequest
uint32_t valueSize; uint32_t valueSize;
}; };
struct __attribute__ ((__packed__)) SalResponse struct __attribute__ ((__packed__)) SalResponseHeader
{ {
//private:
uint32_t id; uint32_t id;
enum RequestType type; enum ResponseStatus status;
//public: /*
uint32_t size; * Note value will be present only in case of response status is success
*/
uint32_t valueSize;
}; };
struct __attribute__ ((__packed__)) InvRequest struct __attribute__ ((__packed__)) InvRequestHeader
{ {
//private:
uint32_t id; uint32_t id;
enum RequestType type; enum RequestType type;
//public: uint32_t keySize;
uint32_t keySize;
}; };
static uint32_t SUCCESS = 0; struct __attribute__ ((__packed__)) InvResponseHeader
static uint32_t FAILURE = 1; {
uint32_t id;
enum ResponseStatus status;
};
static uint32_t SalRequestHeaderSize = sizeof(SalRequestHeader);
static uint32_t SalResponseHeaderSize = sizeof(SalResponseHeader);
static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader);
static uint32_t InvResponseHeaderSize = sizeof(InvResponseHeader);
static int32_t SalRequestHeaderSize = sizeof(SalRequest);
static int32_t SalResponseSize = sizeof(SalResponse);
static uint32_t InvRequestHeaderSize = sizeof(InvRequest);
#endif #endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <netdb.h>
#include <iostream>
#include <errno.h>
#include <arpa/inet.h>
#ifndef __RDMACLIENTENDPOINT__
#define __RDMACLIENTENDPOINT__
#include "MessageFormats.hpp"
#include <boost/lockfree/queue.hpp>
class RdmaClientEndpoint
{
static int CONN_STATE_INITIALIZED;
static int CONN_STATE_ADDR_RESOLVED;
static int CONN_STATE_ROUTE_RESOLVED;
static int CONN_STATE_RESOURCES_ALLOCATED;
static int CONN_STATE_CONNECTED;
static int CONN_STATE_PARTIAL_CLOSED;
static int CONN_STATE_CLOSED;
struct rdma_cm_id *_cm_id = NULL;
struct ibv_pd *_protectionDomain;
int _sendQueueSize;
int _recvQueueSize;
int _sendMsgSize;
int _recvMsgSize;
int _state;
int _timeoutMs;
int _maxInLine;
const char *_connData;
void *_sendBuff = NULL;
void *_recvBuff = NULL;
struct ibv_mr *_sendMr = NULL;
struct ibv_mr *_recvMr = NULL;
boost::lockfree::queue<void *> *_sendBuffers;
void completeClose();
void connect();
void registerMemory();
public:
std::atomic<uint64_t> _requestId{12};
RdmaClientEndpoint(struct rdma_cm_id *id, int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, int maxInLine, int timeout);
void connect(const char *ip, const char *port, const char *connData);
bool isConnected();
void processCmEvent(struct rdma_cm_event *event);
void createResources(struct ibv_cq *cq);
void close();
int sendMessage(const char *buffer, int size);
void processSendComp(struct ibv_wc);
void processRecvComp(struct ibv_wc);
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <errno.h>
#include <unordered_map>
#ifndef __RDMACLIENTENDPOINTGROUP__
#define __RDMACLIENTENDPOINTGROUP__
#include "RdmaCqProcessor.hpp"
#include "RdmaClientEndpoint.hpp"
class RdmaClientEndpointGroup
{
RdmaCqProcessor *_cqProcessor = NULL;
//struct rdma_cm_id *_cm_id;
int _sendQueueSize;
int _recvQueueSize;
int _compQueueSize;
int _sendMsgSize;
int _recvMsgSize;
int _timeoutMs;
int _maxInLine;
struct rdma_event_channel *_eventChannel;
std::thread *_cmEventThread;
bool _stop;
public:
RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
int recvMsgSize,int maxInLine, int timeout);
void processCmEvents();
void processCmEvent(struct rdma_cm_event *event);
void start();
struct ibv_cq *createCq(struct rdma_cm_id *id);
RdmaClientEndpoint *createEndpoint();
void close();
};
#endif
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
#include <thread>
#include <unordered_map>
#ifndef __RDMACQPROCESSOR__
#define __RDMACQPROCESSOR__
#include "RdmaClientEndpoint.hpp"
class RdmaCqProcessor
{
public:
struct ibv_comp_channel *_compChannel;
struct ibv_cq *_completionQueue;
std::thread *_compQueueThread;
std::unordered_map<uint32_t, RdmaClientEndpoint *> *_qpEndpointMap{NULL};
RdmaCqProcessor(ibv_context *verbs, int compQueueSize);
struct ibv_cq *getCq();
void start();
void processCQEvents();
void dispatchCqEvents(ibv_wc *wc_array, int size);
void close();
void registerEp(uint64_t qpum, RdmaClientEndpoint* ep);
};
#endif
// https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/
// https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html
// https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.html
\ No newline at end of file
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
#include <thread>
#include <unordered_map>
#ifndef __RDMAREPCQPROCESSOR__
#define __RDMAREPCQPROCESSOR__
#include "Executor.hpp"
class RdmaRepCqProcessor
{
public:
struct ibv_comp_channel *_compChannel{NULL};
struct ibv_cq *_completionQueue{NULL};
std::thread *_compQueueThread{NULL};
bool _stop{false};
Executor *_executor{NULL};
RdmaRepCqProcessor(Executor *ex, ibv_context *verbs, int compQueueSize)
: _executor(ex)
{
_compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL)
{
std::cout << "CqProcessr : ibv_create_comp_channel failed\n";
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl;
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
}
}
struct ibv_cq *getCq()
{
return _completionQueue;
}
void start()
{
std::cout << "CqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaRepCqProcessor::processCQEvents, this);
}
void processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc *wc_array = new struct ibv_wc[nevent];
while (!_stop)
{
ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1)
{
std::cout << "CqProcessr : ibv_get_cq_event failed\n";
close();
}
ibv_ack_cq_events(cq, 1);
ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
close();
}
ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0)
{
std::cout << "CqProcessr : ibv_poll_cq failed\n";
close();
}
if (ret == 0)
continue;
for (int i = 0; i < ret; i++)
{
struct ibv_wc *data = new struct ibv_wc(wc_array[i]);
//data->vendor_err = 1;
//_executor->submit(data);
new std::thread(&RdmaRepCqProcessor::processRepEvent, this,data);
}
//_executor->dispatchRepCqEvents(wc_array, ret);
}
}
void processRepEvent(struct ibv_wc* data)
{
std::cout<<"procesing Replication request"<<std::endl;
}
void close()
{
_stop = true;
if (_compQueueThread != NULL)
_compQueueThread->join();
}
};
#endif
#ifndef __RDMASERVERENDPOINT__
#define __RDMASERVERENDPOINT__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <errno.h>
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include "RdmaEndpoint.hpp"
#include "CqEventData.hpp"
#include <rocksdb/db.h>
class RdmaReplicationEndpoint : public RdmaEndpoint
{
rocksdb::DB *_db;
std::atomic<uint64_t> _requestId{12};
public:
RdmaReplicationEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *_db);
void processCqEvent(struct ibv_wc wc);
void processSendCompletion(struct ibv_wc* data);
void processRecvCompletion(struct ibv_wc* data);
int sendMessage(const char *buffer, uint32_t size);
void close();
};
#endif
\ No newline at end of file
...@@ -22,9 +22,9 @@ public: ...@@ -22,9 +22,9 @@ public:
void processCqEvent(struct ibv_wc wc); void processCqEvent(struct ibv_wc wc);
void processSendCompletion(struct ibv_wc *data); void processSendCompletion(struct ibv_wc *data);
void processRecvCompletion(struct ibv_wc *data); void processRecvCompletion(struct ibv_wc *data);
void processDelete(struct SalRequest *); void processDelete(struct SalRequestHeader *);
void processGet(struct SalRequest *req); void processGet(struct SalRequestHeader *req);
void processPut(struct SalRequest *req); void processPut(struct SalRequestHeader *req);
int sendMessage(const char *buffer, uint32_t size); int sendMessage(const char *buffer, uint32_t size);
void close(); void close();
}; };
......
...@@ -12,12 +12,10 @@ ...@@ -12,12 +12,10 @@
#ifndef __RDMASERVERENDPOINTGROUP__ #ifndef __RDMASERVERENDPOINTGROUP__
#define __RDMASERVERENDPOINTGROUP__ #define __RDMASERVERENDPOINTGROUP__
#include "RdmaReplicationEndpoint.hpp"
#include "RdmaSalEndpoint.hpp" #include "RdmaSalEndpoint.hpp"
#include "CqEventData.hpp" #include "CqEventData.hpp"
#include "Executor.hpp" #include "Executor.hpp"
#include "RdmaSalCqProcessor.hpp" #include "RdmaSalCqProcessor.hpp"
#include "RdmaRepCqProcessor.hpp"
class RdmaServerEndpointGroup class RdmaServerEndpointGroup
{ {
......
#For commenting used # empty line are ignored
#comments after parameters also supported
# use key=value format
#All Parameters will be taken as string
# Fixed Parameters
ENABLE_LOGGING=0
SERVER_IP=192.168.200.20
SERVER_PORT=1921
EXECUTOR_POOL_SIZE=4
\ No newline at end of file
///////////////////////////////////////////////////////////////////////////////
// @File Name: Logger.cpp //
// @Author: Pankaj Choudhary //
// @Version: 0.0.1 //
// @L.M.D: 13th April 2015 //
// @Description: For Logging into file //
// //
// Detail Description: //
// Implemented complete logging mechanism, Supporting multiple logging type //
// like as file based logging, console base logging etc. It also supported //
// for different log type. //
// //
// Thread Safe logging mechanism. Compatible with VC++ (Windows platform) //
// as well as G++ (Linux platform) //
// //
// Supported Log Type: ERROR, ALARM, ALWAYS, INFO, BUFFER, TRACE, DEBUG //
// //
// No control for ERROR, ALRAM and ALWAYS messages. These type of messages //
// should be always captured. //
// //
// BUFFER log type should be use while logging raw buffer or raw messages //
// //
// Having direct interface as well as C++ Singleton inface. can use //
// whatever interface want. //
// //
///////////////////////////////////////////////////////////////////////////////
// C++ Header File(s)
#include <iostream>
#include <cstdlib>
#include <ctime>
// Code Specific Header Files(s)
#include "Logger.hpp"
using namespace std;
using namespace CPPLog;
Logger* Logger::m_Instance = 0;
// Log file name. File name should be change from here only
const string logFileName = "LogFile.log";
Logger::Logger()
{
m_File.open(logFileName.c_str(), ios::out|ios::app);
m_LogLevel = LOG_LEVEL_TRACE;
m_LogType = FILE_LOG;
// Initialize mutex
#ifdef WIN32
InitializeCriticalSection(&m_Mutex);
#else
int ret=0;
ret = pthread_mutexattr_settype(&m_Attr, PTHREAD_MUTEX_ERRORCHECK_NP);
if(ret != 0)
{
printf("Logger::Logger() -- Mutex attribute not initialize!!\n");
exit(0);
}
ret = pthread_mutex_init(&m_Mutex,&m_Attr);
if(ret != 0)
{
printf("Logger::Logger() -- Mutex not initialize!!\n");
exit(0);
}
#endif
}
Logger::~Logger()
{
m_File.close();
#ifdef WIN32
DeleteCriticalSection(&m_Mutex);
#else
pthread_mutexattr_destroy(&m_Attr);
pthread_mutex_destroy(&m_Mutex);
#endif
}
Logger* Logger::getInstance() throw ()
{
if (m_Instance == 0)
{
m_Instance = new Logger();
}
return m_Instance;
}
void Logger::lock()
{
#ifdef WIN32
EnterCriticalSection(&m_Mutex);
#else
pthread_mutex_lock(&m_Mutex);
#endif
}
void Logger::unlock()
{
#ifdef WIN32
LeaveCriticalSection(&m_Mutex);
#else
pthread_mutex_unlock(&m_Mutex);
#endif
}
void Logger::logIntoFile(std::string& data)
{
lock();
m_File << getCurrentTime() << " " << data << endl;
unlock();
}
void Logger::logOnConsole(std::string& data)
{
cout << getCurrentTime() << " " << data << endl;
}
string Logger::getCurrentTime()
{
return "";
string currTime;
//Current date/time based on current time
time_t now = time(0);
// Convert current time to string
currTime.assign(ctime(&now));
// Last charactor of currentTime is "\n", so remove it
string currentTime = currTime.substr(0, currTime.size()-1);
return currentTime;
}
// Interface for Error Log
void Logger::error(const char* text) throw()
{
string data;
data.append("[ERROR]: ");
data.append(text);
// ERROR must be capture
if(m_LogType == FILE_LOG)
{
logIntoFile(data);
}
else if(m_LogType == CONSOLE)
{
logOnConsole(data);
}
}
void Logger::error(std::string& text) throw()
{
error(text.data());
}
void Logger::error(std::ostringstream& stream) throw()
{
string text = stream.str();
error(text.data());
}
// Interface for Alarm Log
void Logger::alarm(const char* text) throw()
{
string data;
data.append("[ALARM]: ");
data.append(text);
// ALARM must be capture
if(m_LogType == FILE_LOG)
{
logIntoFile(data);
}
else if(m_LogType == CONSOLE)
{
logOnConsole(data);
}
}
void Logger::alarm(std::string& text) throw()
{
alarm(text.data());
}
void Logger::alarm(std::ostringstream& stream) throw()
{
string text = stream.str();
alarm(text.data());
}
// Interface for Always Log
void Logger::always(const char* text) throw()
{
string data;
data.append("[ALWAYS]: ");
data.append(text);
// No check for ALWAYS logs
if(m_LogType == FILE_LOG)
{
logIntoFile(data);
}
else if(m_LogType == CONSOLE)
{
logOnConsole(data);
}
}
void Logger::always(std::string& text) throw()
{
always(text.data());
}
void Logger::always(std::ostringstream& stream) throw()
{
string text = stream.str();
always(text.data());
}
// Interface for Buffer Log
void Logger::buffer(const char* text) throw()
{
// Buffer is the special case. So don't add log level
// and timestamp in the buffer message. Just log the raw bytes.
if((m_LogType == FILE_LOG) && (m_LogLevel >= LOG_LEVEL_BUFFER))
{
lock();
m_File << text << endl;
unlock();
}
else if((m_LogType == CONSOLE) && (m_LogLevel >= LOG_LEVEL_BUFFER))
{
cout << text << endl;
}
}
void Logger::buffer(std::string& text) throw()
{
buffer(text.data());
}
void Logger::buffer(std::ostringstream& stream) throw()
{
string text = stream.str();
buffer(text.data());
}
// Interface for Info Log
void Logger::info(const char* text) throw()
{
string data;
data.append("[INFO]: ");
data.append(text);
if((m_LogType == FILE_LOG) && (m_LogLevel >= LOG_LEVEL_INFO))
{
logIntoFile(data);
}
else if((m_LogType == CONSOLE) && (m_LogLevel >= LOG_LEVEL_INFO))
{
logOnConsole(data);
}
}
void Logger::info(std::string& text) throw()
{
info(text.data());
}
void Logger::info(std::ostringstream& stream) throw()
{
string text = stream.str();
info(text.data());
}
// Interface for Trace Log
void Logger::trace(const char* text) throw()
{
string data;
data.append("[TRACE]: ");
data.append(text);
if((m_LogType == FILE_LOG) && (m_LogLevel >= LOG_LEVEL_TRACE))
{
logIntoFile(data);
}
else if((m_LogType == CONSOLE) && (m_LogLevel >= LOG_LEVEL_TRACE))
{
logOnConsole(data);
}
}
void Logger::trace(std::string& text) throw()
{
trace(text.data());
}
void Logger::trace(std::ostringstream& stream) throw()
{
string text = stream.str();
trace(text.data());
}
// Interface for Debug Log
void Logger::debug(const char* text) throw()
{
string data;
data.append("[DEBUG]: ");
data.append(text);
if((m_LogType == FILE_LOG) && (m_LogLevel >= LOG_LEVEL_DEBUG))
{
logIntoFile(data);
}
else if((m_LogType == CONSOLE) && (m_LogLevel >= LOG_LEVEL_DEBUG))
{
logOnConsole(data);
}
}
void Logger::debug(std::string& text) throw()
{
debug(text.data());
}
void Logger::debug(std::ostringstream& stream) throw()
{
string text = stream.str();
debug(text.data());
}
// Interfaces to control log levels
void Logger::updateLogLevel(LogLevel logLevel)
{
m_LogLevel = logLevel;
}
// Enable all log levels
void Logger::enaleLog()
{
m_LogLevel = ENABLE_LOG;
}
// Disable all log levels, except error and alarm
void Logger:: disableLog()
{
m_LogLevel = DISABLE_LOG;
}
// Interfaces to control log Types
void Logger::updateLogType(LogType logType)
{
m_LogType = logType;
}
void Logger::enableConsoleLogging()
{
m_LogType = CONSOLE;
}
void Logger::enableFileLogging()
{
m_LogType = FILE_LOG ;
}
#include <string>
#include<iostream>
#include<fstream>
#include<map>
class Properties{
private:
std::map<std::string,std::string> _props;
const std::string _WHITESPACE = " \n\r\t\f\v";
std::string ltrim(const std::string& s)
{
size_t start = s.find_first_not_of(_WHITESPACE);
return (start == std::string::npos) ? "" : s.substr(start);
}
std::string rtrim(const std::string& s)
{
size_t end = s.find_last_not_of(_WHITESPACE);
return (end == std::string::npos) ? "" : s.substr(0, end + 1);
}
std::string trim(const std::string& s)
{
return rtrim(ltrim(s));
}
public:
Properties(std::string filename){
//std::cout<<"Reading Properties From file named prop.config ...........\n";
std::ifstream file (filename);
if(!file.is_open()){
std::cout<<"Confiq file opening failed\n";
exit(0);
}
std::string line;
std::string key,value;
int delimPos;
while(getline(file,line)){
delimPos=line.find('#');
line=trim(line);
if(!line.empty()){
line=line.substr(0,delimPos);
delimPos=line.find('=');
_props.insert(make_pair(trim(line.substr(0,delimPos)),trim(line.substr(delimPos+1))));
}
}
}
std::string getValue(std::string key){
auto it=_props.find(key);
if(it==_props.end()){
return "";
}
return it->second;
}
};
\ No newline at end of file
#include "RdmaClientEndpoint.hpp"
int RdmaClientEndpoint::CONN_STATE_INITIALIZED = 0;
int RdmaClientEndpoint::CONN_STATE_ADDR_RESOLVED = 1;
int RdmaClientEndpoint::CONN_STATE_ROUTE_RESOLVED = 2;
int RdmaClientEndpoint::CONN_STATE_RESOURCES_ALLOCATED = 3;
int RdmaClientEndpoint::CONN_STATE_CONNECTED = 4;
int RdmaClientEndpoint::CONN_STATE_PARTIAL_CLOSED = 5;
int RdmaClientEndpoint::CONN_STATE_CLOSED = 6;
RdmaClientEndpoint::RdmaClientEndpoint(struct rdma_cm_id *id,int sendQueueSize, int recvQueueSize,
int sendMsgSize, int recvMsgSize, int maxInLine, int timeout)
: _cm_id(id), _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine), _timeoutMs(timeout)
{
_state = CONN_STATE_INITIALIZED;
_sendBuffers = new boost::lockfree::queue<void*>(_sendQueueSize);
}
void RdmaClientEndpoint::connect(const char *ip, const char *port, const char *connData)
{
_connData = connData;
if (_state != CONN_STATE_INITIALIZED)
{
std::cout << "RdmaClientEndpoint : connect state not initialized" << std::endl;
return;
}
int ret;
std::cout << "RdmaClientEndpoint : step2 getaddrinfo" << std::endl;
struct addrinfo *addr;
ret = getaddrinfo(ip, port, NULL, &addr);
if (ret)
{
std::cout << "RdmaClientEndpoint : get_addr_info failed" << std::endl;
}
std::cout << "RdmaClientEndpoint : step2 resolve addr" << std::endl;
ret = rdma_resolve_addr(_cm_id, NULL, addr->ai_addr, _timeoutMs);
if (ret)
{
std::cout << "unable to resolve addr" << std::endl;
return;
}
std::cout << "RdmaClientEndpoint : step2 resolve addr resolved" << std::endl;
_state = CONN_STATE_ADDR_RESOLVED;
}
bool RdmaClientEndpoint::isConnected()
{
return _state == CONN_STATE_CONNECTED;
}
void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
{
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL)
{
std::cout << "RdmaClientEndpoint : step3 resolve_route" << std::endl;
rdma_resolve_route(_cm_id, _timeoutMs);
}
else if (event->event == RDMA_CM_EVENT_ROUTE_RESOLVED && event->id != NULL)
{
registerMemory();
std::cout << "RdmaClientEndpoint : step5 connect" << std::endl;
connect();
}
else if (event->id != NULL && event->event == RDMA_CM_EVENT_ESTABLISHED)
{
std::cout << "RdmaClientEndpoint : step6 Connected" << std::endl;
_state = CONN_STATE_CONNECTED;
}
else if (event->id != NULL && event->event == RDMA_CM_EVENT_DISCONNECTED)
{
std::cout << "RdmaClientEndpoint : step7 Closed" << std::endl;
completeClose();
}
else
{
std::cout << "RdmaClientEndpoint : Not able to procces CM EVent" << rdma_event_str(event->event) << event->id << " " << event->listen_id << std::endl;
}
}
void RdmaClientEndpoint::close()
{
if (_state != CONN_STATE_CONNECTED)
{
std::cout << "RdmaClientEndpoint : close invalid state" << std::endl;
return;
}
_state = CONN_STATE_PARTIAL_CLOSED;
int ret = rdma_disconnect(_cm_id);
if (ret)
{
std::cout << "RdmaClientEndpoint : rdma_disconnect failed" << std::endl;
}
}
void RdmaClientEndpoint::completeClose()
{
if (_state != CONN_STATE_PARTIAL_CLOSED)
{
std::cout << "RdmaClientEndpoint : completeClose invalid state" << std::endl;
return;
}
_state = CONN_STATE_CLOSED;
delete _sendBuffers;
free(_sendBuff);
free(_recvBuff);
rdma_dereg_mr(_sendMr);
rdma_dereg_mr(_recvMr);
rdma_destroy_qp(_cm_id);
rdma_destroy_id(_cm_id);
}
void RdmaClientEndpoint::connect()
{
if (_connData != NULL)
{
struct rdma_conn_param conn_param;
memset(&conn_param, 0, sizeof(conn_param));
conn_param.responder_resources = 1;
conn_param.initiator_depth = 1;
conn_param.retry_count = 7;
conn_param.rnr_retry_count = 7;
conn_param.private_data = _connData;
conn_param.private_data_len = strlen(_connData);
rdma_connect(_cm_id, &conn_param);
}
else
{
rdma_connect(_cm_id, NULL);
}
}
void RdmaClientEndpoint::registerMemory()
{
if (_state != CONN_STATE_ROUTE_RESOLVED)
{
std::cout << "RdmaClientEndpoint : createResource address not resolved" << std::endl;
return;
}
_sendBuff = malloc(_sendMsgSize * _sendQueueSize);
if (_sendBuff == NULL)
{
std::cout << "RdmaClientEndpoint : sendBuff malloc failed" << std::endl;
return;
}
_sendMr = rdma_reg_msgs(_cm_id, _sendBuff, _sendMsgSize * _sendQueueSize);
if (_sendMr == NULL)
{
std::cout << "RdmaClientEndpoint : sendMr reg failed" << std::endl;
return;
}
_recvBuff = malloc(_recvMsgSize * _recvQueueSize);
if (_recvBuff == NULL)
{
std::cout << "RdmaClientEndpoint : recvBuff malloc failed" << std::endl;
return;
}
_recvMr = rdma_reg_msgs(_cm_id, _recvBuff, _recvMsgSize * _recvQueueSize);
if (_recvMr == NULL)
{
std::cout << "RdmaClientEndpoint : recvMr reg failed" << std::endl;
return;
}
char *buffer = (char *)_recvBuff;
for (int i = 0; i < _recvQueueSize; i++)
{
void* const location = buffer + i * _recvMsgSize;
rdma_post_recv(_cm_id, reinterpret_cast<void *>(location), reinterpret_cast<void *>(location),
_recvMsgSize, _recvMr);
}
buffer = (char *)_sendBuff;
for (int i = 0; i < _sendQueueSize; i++)
{
void* const location = buffer + i * _sendMsgSize;
_sendBuffers->push(location);
}
_state = CONN_STATE_RESOURCES_ALLOCATED;
}
void RdmaClientEndpoint::createResources(struct ibv_cq *cq)
{
if (_state != CONN_STATE_ADDR_RESOLVED)
{
std::cout << "RdmaClientEndpoint : createResource address not resolved" << std::endl;
return;
}
_protectionDomain = ibv_alloc_pd(_cm_id->verbs);
if (_protectionDomain == NULL)
{
std::cout << "RdmaClientEndpoint : ibv_alloc_pd failed " << std::endl;
return;
}
struct ibv_cq *completionQueue = cq;
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
//This is used to set endpoint address with qp
qp_init_attr.qp_context = (void *)this;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr.sq_sig_all = 1;
// completion queue can be shared or you can use distinct completion queues.
qp_init_attr.send_cq = completionQueue;
qp_init_attr.recv_cq = completionQueue;
qp_init_attr.qp_type = IBV_QPT_RC;
// increase if you want to keep more send work requests in the SQ.
qp_init_attr.cap.max_send_wr = _sendQueueSize;
// increase if you want to keep more receive work requests in the RQ.
qp_init_attr.cap.max_recv_wr = _recvQueueSize;
// increase if you allow send work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_send_sge = 1;
// increase if you allow receive work requests to have multiple scatter gather entry (SGE).
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.cap.max_inline_data = _maxInLine;
//_queuePair = ibv_create_qp(_protectionDomain, &qp_init_attr);
int ret = rdma_create_qp(_cm_id, _protectionDomain, &qp_init_attr);
if (ret)
{
std::cout << "RdmaClientEndpoint : ibv_create_cq failed\n";
}
if (_cm_id->pd == NULL)
{
std::cout << "RdmaClientEndpoint : pd not set" << std::endl;
_cm_id->pd = _protectionDomain;
}
_state = CONN_STATE_ROUTE_RESOLVED;
}
int RdmaClientEndpoint::sendMessage(const char *buffer, int size)
{
if (size > _sendMsgSize)
return -1;
void* sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr)
return -1;
memcpy(sendBuffer, buffer, size);
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0);
}
void RdmaClientEndpoint::processSendComp(struct ibv_wc wc)
{
_sendBuffers->push((void *)wc.wr_id);
}
void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc)
{
struct SalResponseHeader* response = (struct SalResponseHeader*)wc.wr_id;
std::cout<<"Recieve response for id "<<response->id<<" size "<<response->size <<" type "<<response->type<<"\n";
rdma_post_recv(_cm_id, (void *)wc.wr_id, (void *)wc.wr_id, _recvMsgSize, _recvMr);
}
\ No newline at end of file
#include "RdmaClientEndpointGroup.hpp"
RdmaClientEndpointGroup::RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
int recvMsgSize,int maxInLine, int timeout)
: _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _maxInLine(maxInLine),_timeoutMs(timeout)
{
std::cout << "RdmaClientEndpointGroup : Step 1 creating event channel" << std::endl;
_eventChannel = rdma_create_event_channel();
_stop = false;
if (_eventChannel == NULL)
{
std::cout << "RdmaClientEndpointGroup : error creating event channel";
}
}
void RdmaClientEndpointGroup::start()
{
_cmEventThread = new std::thread(&RdmaClientEndpointGroup::processCmEvents, this);
pthread_setname_np(_cmEventThread->native_handle(),"ClientCMProcessor");
}
void RdmaClientEndpointGroup::processCmEvents()
{
int ret;
struct rdma_cm_event *event;
std::cout << "RdmaClientEndpointGroup : starting cm processing thread" << std::endl;
while (!_stop)
{
ret = rdma_get_cm_event(_eventChannel, &event);
if (ret)
{
std::cout << "RdmaClientEndpointGroup : rdma_get_cm_event failed" << std::endl;
continue;
}
processCmEvent(event);
ret = rdma_ack_cm_event(event);
if (ret)
{
std::cout << "RdmaClientEndpointGroup : rdma_ack_cm_event failed";
}
}
}
void RdmaClientEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{
std::cout << "RdmaClientEndpointGroup : event" << rdma_event_str(event->event) << std::endl;
if (event->id != NULL && event->id->context != NULL)
{
if (event->event == RDMA_CM_EVENT_ADDR_RESOLVED && event->id != NULL)
{
((RdmaClientEndpoint *)event->id->context)->createResources(createCq(event->id));
}
((RdmaClientEndpoint *)event->id->context)->processCmEvent(event);
if(event->event == RDMA_CM_EVENT_ADDR_RESOLVED)
{
_cqProcessor->registerEp(event->id->qp->qp_num,((RdmaClientEndpoint *)event->id->context));
}
}
else
{
std::cout << "RdmaClientEndpointGroup : Not able to procces CM EVent";
std::cout << rdma_event_str(event->event) << event->id << " ";
std::cout << event->listen_id << std::endl;
}
}
RdmaClientEndpoint *RdmaClientEndpointGroup::createEndpoint()
{
struct rdma_cm_id *id = NULL;
int ret = rdma_create_id(_eventChannel, &id, NULL, RDMA_PS_TCP);
std::cout<<"id "<<id<<std::endl;
if (ret == -1)
std::cout << "CMProcesor : rdma_create_id failed" << std::endl;
RdmaClientEndpoint *endpoint = new RdmaClientEndpoint(id,
_sendQueueSize, _recvQueueSize,
_sendMsgSize, _recvMsgSize,_maxInLine, _timeoutMs);
id->context = (void *)endpoint;
return endpoint;
}
struct ibv_cq *RdmaClientEndpointGroup::createCq(struct rdma_cm_id *id)
{
if (_cqProcessor == NULL)
{
std::cout << "RdmaClientEndpointGroup : Creating CQ processor" << std::endl;
_cqProcessor = new RdmaCqProcessor(id->verbs, _compQueueSize);
_cqProcessor->start();
}
return _cqProcessor->getCq();
}
void RdmaClientEndpointGroup::close()
{
_stop = true;
_cmEventThread->join();
rdma_destroy_event_channel(_eventChannel);
}
\ No newline at end of file
#include "RdmaCqProcessor.hpp"
RdmaCqProcessor::RdmaCqProcessor(ibv_context *verbs, int compQueueSize)
{
//_qpEndpointMap = new std::unordered_map<>();
_qpEndpointMap = new std::unordered_map<uint32_t, RdmaClientEndpoint *>();
_compChannel = ibv_create_comp_channel(verbs);
if (_compChannel == NULL)
{
std::cout << "CqProcessr : ibv_create_comp_channel failed\n";
return;
}
_completionQueue = ibv_create_cq(verbs, compQueueSize, NULL, _compChannel, 0);
if (_completionQueue == NULL)
{
std::cout << "CqProcessr : ibv_create_cq failed" << std::endl;
return;
}
int ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
}
}
struct ibv_cq *RdmaCqProcessor::getCq()
{
return _completionQueue;
}
void RdmaCqProcessor::registerEp(uint64_t qp,RdmaClientEndpoint* ep)
{
_qpEndpointMap->emplace(qp,ep);
}
void RdmaCqProcessor::start()
{
std::cout << "CqProcessr : starting process CQ events" << std::endl;
_compQueueThread = new std::thread(&RdmaCqProcessor::processCQEvents, this);
pthread_setname_np(_compQueueThread->native_handle(),"compQueueThread");
}
void RdmaCqProcessor::processCQEvents()
{
int ret = 0;
struct ibv_cq *cq;
void *context;
const int nevent = 10;
struct ibv_wc wc_array[nevent];
while (1)
{
ret = ibv_get_cq_event(_compChannel, &cq, &context);
if (ret == -1)
{
std::cout << "CqProcessr : ibv_get_cq_event failed\n";
close();
}
ibv_ack_cq_events(cq, 1);
ret = ibv_req_notify_cq(_completionQueue, 0);
if (ret)
{
std::cout << "CqProcessr : ibv_req_notify_cq failed\n";
close();
}
ret = ibv_poll_cq(cq, nevent, wc_array);
if (ret < 0)
{
std::cout << "CqProcessr : ibv_poll_cq failed\n";
close();
}
if (ret == 0)
continue;
dispatchCqEvents(wc_array, ret);
}
}
inline void RdmaCqProcessor::dispatchCqEvents(ibv_wc wc[], int size)
{
for (int i = 0; i < size; i++)
{
if (wc[i].status != IBV_WC_SUCCESS)
{
std::cout << "RdmaCqProcessor : failed work completion : " << ibv_wc_status_str(wc[i].status) << " on qp " << wc[i].qp_num << std::endl;
return;
}
auto it = _qpEndpointMap->find(wc[i].qp_num);
if (it == _qpEndpointMap->end())
{
std::cout << "RdmaCqProcessor : endpoint not registered for qp num" << std::endl;
return;
}
switch (wc[i].opcode)
{
case IBV_WC_SEND:
it->second->processSendComp(wc[i]);
break;
case IBV_WC_RECV:
it->second->processRecvComp(wc[i]);
break;
case IBV_WC_RDMA_WRITE:
std::cout << "rdma write completion\n";
break;
case IBV_WC_RDMA_READ:
std::cout << "rdma read completion\n";
break;
default:
std::cout << "RdmaCqProcessor : invalid opcode" << std::endl;
break;
}
}
}
void RdmaCqProcessor::close()
{
}
\ No newline at end of file
#include "RdmaRepCqProcessor.hpp"
\ No newline at end of file
#include "RdmaReplicationEndpoint.hpp"
RdmaReplicationEndpoint::RdmaReplicationEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *db)
: RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize)
,_db(db)
{
}
void RdmaReplicationEndpoint::processSendCompletion(struct ibv_wc *data)
{
std::cout << "send completion\n";
_sendBuffers->push((void *)data->wr_id);
}
void RdmaReplicationEndpoint::processRecvCompletion(struct ibv_wc *data)
{
std::cout << "recv completion\n";
std::cout << "recieve" << (char *)(data->wr_id) << "\n";
char* request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id,data->byte_len);
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
}
int RdmaReplicationEndpoint::sendMessage(const char *buffer, uint32_t size)
{
if (size > _sendMsgSize)
return -1;
void* sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr)
return -1;
memcpy(sendBuffer, buffer, size);
return rdma_post_send(_cm_id, sendBuffer, sendBuffer, size, _sendMr, 0);
}
#include "RdmaSalEndpoint.hpp" #include "RdmaSalEndpoint.hpp"
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize, RdmaSalEndpoint::RdmaSalEndpoint(struct rdma_cm_id *id, struct ibv_cq *completionQueue, int sendQueueSize,
int recvQueueSize, int sendMsgSize, int recvMsgSize,rocksdb::DB *db) int recvQueueSize, int sendMsgSize, int recvMsgSize, rocksdb::DB *db)
: RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize) : RdmaEndpoint(id, completionQueue, sendQueueSize, recvQueueSize, sendMsgSize, recvMsgSize), _db(db)
,_db(db)
{ {
} }
void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data) void RdmaSalEndpoint::processSendCompletion(struct ibv_wc *data)
{ {
std::cout << "send completion\n"; /*means data has been send to other side we can use this buffer*/
_sendBuffers->push((void *)data->wr_id); _sendBuffers->push((void *)data->wr_id);
} }
...@@ -18,7 +16,7 @@ int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size) ...@@ -18,7 +16,7 @@ int RdmaSalEndpoint::sendMessage(const char *buffer, uint32_t size)
{ {
if (size > _sendMsgSize) if (size > _sendMsgSize)
return -1; return -1;
void* sendBuffer = nullptr; void *sendBuffer = nullptr;
_sendBuffers->pop(sendBuffer); _sendBuffers->pop(sendBuffer);
if (sendBuffer == nullptr) if (sendBuffer == nullptr)
return -1; return -1;
...@@ -30,11 +28,8 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data) ...@@ -30,11 +28,8 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data)
{ {
char *request = new char[data->byte_len]; char *request = new char[data->byte_len];
memcpy(request, (void *)data->wr_id, data->byte_len); memcpy(request, (void *)data->wr_id, data->byte_len);
struct SalRequest *req = (struct SalRequest *)request; struct SalRequestHeader *req = (struct SalRequestHeader *)request;
rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr); rdma_post_recv(_cm_id, (void *)data->wr_id, (void *)data->wr_id, _recvMsgSize, _recvMr);
std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize << " " << req->type << "size" << data->byte_len << "\n";
if (req->type == RequestType::DELETE) if (req->type == RequestType::DELETE)
processDelete(req); processDelete(req);
if (req->type == RequestType::GET) if (req->type == RequestType::GET)
...@@ -44,37 +39,27 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data) ...@@ -44,37 +39,27 @@ void RdmaSalEndpoint::processRecvCompletion(struct ibv_wc *data)
delete[] request; delete[] request;
} }
void RdmaSalEndpoint::processDelete(struct SalRequest *req) void RdmaSalEndpoint::processDelete(struct SalRequestHeader *req)
{ {
std::cout<<"0\n";
rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}); rocksdb::Status s = _db->Delete(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize});
std::cout<<"1\n";
void *sendBuf = nullptr; void *sendBuf = nullptr;
_sendBuffers->pop(sendBuf); _sendBuffers->pop(sendBuf);
if (sendBuf == nullptr) if (sendBuf == nullptr)
{ {
return; return;
} }
char* sendBuffer = (char*)sendBuf; SalResponseHeader *response = (SalResponseHeader *)sendBuf;
std::cout<<"2 "<<req->id<<"\n"; /*This id done to avoid else case*/
memcpy(sendBuffer, &(req->id), sizeof(uint32_t)); response->status = ResponseStatus::FAILURE;
response->id = req->id;
if (s.ok()) if (s.ok())
{ {
std::cout<<"33\n"; response->status = ResponseStatus::SUCCESS;
memcpy(sendBuffer+4, &SUCCESS, sizeof(int));
std::cout<<"44\n";
} }
else rdma_post_send(_cm_id, sendBuf, sendBuf, SalResponseHeaderSize, _sendMr, 0);
{
std::cout<<"331\n";
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
std::cout<<"441\n";
}
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
std::cout<<"3\n";
} }
void RdmaSalEndpoint::processGet(struct SalRequest *req) void RdmaSalEndpoint::processGet(struct SalRequestHeader *req)
{ {
std::string value; std::string value;
rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, &value); rocksdb::Status s = _db->Get(rocksdb::ReadOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, &value);
...@@ -84,22 +69,20 @@ void RdmaSalEndpoint::processGet(struct SalRequest *req) ...@@ -84,22 +69,20 @@ void RdmaSalEndpoint::processGet(struct SalRequest *req)
{ {
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf;
char* sendBuffer = (char*)sendBuf; /*This id done to avoid else case*/
memcpy(sendBuffer, &(req->id), sizeof(uint32_t)); response->status = ResponseStatus::FAILURE;
response->id = req->id;
if (s.ok()) if (s.ok())
{ {
memcpy(sendBuffer+4, &SUCCESS, sizeof(int)); response->status = ResponseStatus::SUCCESS;
memcpy(sendBuffer+8, (void *)value.size(), sizeof(value.size())); response->valueSize = value.size();
memcpy(sendBuffer+12, value.c_str(), value.size()); memcpy(response + SalResponseHeaderSize, value.c_str(), value.size());
} }
else rdma_post_send(_cm_id, sendBuf, sendBuf, SalRequestHeaderSize + value.size(), _sendMr, 0);
{
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
}
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
} }
void RdmaSalEndpoint::processPut(struct SalRequest *req)
void RdmaSalEndpoint::processPut(struct SalRequestHeader *req)
{ {
rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize}, rocksdb::Status s = _db->Put(rocksdb::WriteOptions(), {(char *)req + SalRequestHeaderSize, req->keySize},
{(char *)req + SalRequestHeaderSize + req->keySize, req->valueSize}); {(char *)req + SalRequestHeaderSize + req->keySize, req->valueSize});
...@@ -109,13 +92,11 @@ void RdmaSalEndpoint::processPut(struct SalRequest *req) ...@@ -109,13 +92,11 @@ void RdmaSalEndpoint::processPut(struct SalRequest *req)
{ {
return; return;
} }
SalResponseHeader *response = (SalResponseHeader *)sendBuf;
char* sendBuffer = (char*)sendBuf; /*This id done to avoid else case*/
memcpy(sendBuffer, &(req->id), sizeof(uint32_t)); response->status = ResponseStatus::FAILURE;
response->id = req->id;
if (s.ok()) if (s.ok())
memcpy(sendBuffer+4, &SUCCESS, sizeof(int)); response->status = ResponseStatus::FAILURE;
else rdma_post_send(_cm_id, sendBuf, sendBuf, SalResponseHeaderSize, _sendMr, 0);
memcpy(sendBuffer+4, &FAILURE, sizeof(int));
rdma_post_send(_cm_id, sendBuffer, sendBuffer, _sendMsgSize, _sendMr, 0);
} }
...@@ -25,7 +25,7 @@ RdmaServerEndpointGroup::RdmaServerEndpointGroup(int sendQueueSize, int recvQueu ...@@ -25,7 +25,7 @@ RdmaServerEndpointGroup::RdmaServerEndpointGroup(int sendQueueSize, int recvQueu
rocksdb::Options options; rocksdb::Options options;
options.create_if_missing = true; options.create_if_missing = true;
// open a database with a name which corresponds to a file system directory // open a database with a name which corresponds to a file system directory
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &_db); rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb1", &_db);
std::cout << "Rocks started" << std::endl; std::cout << "Rocks started" << std::endl;
if (!status.ok()) if (!status.ok())
{ {
......
...@@ -86,7 +86,7 @@ void TaskThread::processEvent(struct ibv_wc *data) ...@@ -86,7 +86,7 @@ void TaskThread::processEvent(struct ibv_wc *data)
memcpy(buffer, (void *)data->wr_id, data->byte_len); memcpy(buffer, (void *)data->wr_id, data->byte_len);
rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id, rdma_post_recv(it->second->_cm_id, (void *)data->wr_id, (void *)data->wr_id,
it->second->_recvMsgSize, it->second->_recvMr); it->second->_recvMsgSize, it->second->_recvMr);
struct SalRequest *req = (struct SalRequest *)buffer; struct SalRequestHeader *req = (struct SalRequestHeader *)buffer;
std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize; std::cout << "recieve id : " << req->id << " " << (char *)req + SalRequestHeaderSize;
std::cout << " " << req->type << "size" << data->byte_len << "\n"; std::cout << " " << req->type << "size" << data->byte_len << "\n";
switch (req->type) switch (req->type)
......
#include <iostream> #include <iostream>
#include "RdmaServerEndpointGroup.hpp" #include "RdmaServerEndpointGroup.hpp"
#include "RdmaClientEndpointGroup.hpp"
#include "RdmaEndpoint.hpp"
#include "Executor.hpp" #include "Executor.hpp"
#include "rocksdb/db.h" #include "rocksdb/db.h"
...@@ -8,10 +10,17 @@ ...@@ -8,10 +10,17 @@
int main() int main()
{ {
RdmaServerEndpointGroup *group = new RdmaServerEndpointGroup(5, 5, 5, 50, 50); RdmaClientEndpointGroup *clgroup = new RdmaClientEndpointGroup(5, 5, 5, 50, 50, 0, 1000);
group->createExecutor(4); clgroup->start();
group->bind("192.168.200.20", "1921", 2); RdmaClientEndpoint *clientEp = clgroup->createEndpoint();
group->startCmProcessor(false); clientEp->connect("192.168.200.20", "1921", "fol");
while (!clientEp->isConnected());
std::cout << "client : connected" << std::endl;
RdmaServerEndpointGroup *sgroup = new RdmaServerEndpointGroup(5, 5, 5, 50, 50);
sgroup->createExecutor(4);
sgroup->bind("192.168.200.20", "1920", 2);
sgroup->startCmProcessor(false);
std::cout << "rhdhj" << std::endl; std::cout << "rhdhj" << std::endl;
while (1) while (1)
; ;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment