Commit e3d7e950 authored by Paras Garg's avatar Paras Garg

Configured rocksdb

parent 0da2fc70
...@@ -49,6 +49,7 @@ class RdmaServerEndpointGroup ...@@ -49,6 +49,7 @@ class RdmaServerEndpointGroup
int _sendMsgSize{0}; int _sendMsgSize{0};
int _recvMsgSize{0}; int _recvMsgSize{0};
rocksdb::DB *_db; rocksdb::DB *_db;
rocksdb::Options _options;
mutable std::shared_mutex _salMutex; mutable std::shared_mutex _salMutex;
mutable std::shared_mutex _repMutex; mutable std::shared_mutex _repMutex;
...@@ -60,7 +61,7 @@ public: ...@@ -60,7 +61,7 @@ public:
std::unordered_map<uint32_t, RdmaRepEndpoint*> *_qpRepEndpointMap; std::unordered_map<uint32_t, RdmaRepEndpoint*> *_qpRepEndpointMap;
RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize,
int sendMsgSize, int recvMsgSize, rocksdb::DB *db); int sendMsgSize, int recvMsgSize, rocksdb::DB *db,rocksdb::Options &options);
// void setExecutor(Executor *executor); // void setExecutor(Executor *executor);
void bind(const char *ip, const char *port, int backlog); void bind(const char *ip, const char *port, int backlog);
......
https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/
// https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/ // https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/
// https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html // https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html
// https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3. // https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.
......
...@@ -6,9 +6,14 @@ int RdmaServerEndpointGroup::CONN_STATE_CONNECTED = 4; ...@@ -6,9 +6,14 @@ int RdmaServerEndpointGroup::CONN_STATE_CONNECTED = 4;
int RdmaServerEndpointGroup::CONN_STATE_CLOSED = 5; int RdmaServerEndpointGroup::CONN_STATE_CLOSED = 5;
RdmaServerEndpointGroup::RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, RdmaServerEndpointGroup::RdmaServerEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize,
int sendMsgSize, int recvMsgSize, rocksdb::DB *db) int sendMsgSize, int recvMsgSize, rocksdb::DB *db, rocksdb::Options &options)
: _sendQueueSize(sendQueueSize), _recvQueueSize(recvQueueSize), _compQueueSize(compQueueSize), : _sendQueueSize(sendQueueSize),
_sendMsgSize(sendMsgSize), _recvMsgSize(recvMsgSize), _db(db) _recvQueueSize(recvQueueSize),
_compQueueSize(compQueueSize),
_sendMsgSize(sendMsgSize),
_recvMsgSize(recvMsgSize),
_db(db),
_options(options)
{ {
CPPLog::LOG_INFO("SalEndpointGroup : Step 1 creating event channel"); CPPLog::LOG_INFO("SalEndpointGroup : Step 1 creating event channel");
_eventChannel = rdma_create_event_channel(); _eventChannel = rdma_create_event_channel();
...@@ -150,8 +155,8 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -150,8 +155,8 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
createEpCmEvent(event); createEpCmEvent(event);
} }
/* /*
* Event came for server on listen cm_id * Event came for server on listen cm_id
*/ */
else if (event->id != NULL && _cm_id == event->id) else if (event->id != NULL && _cm_id == event->id)
{ {
if (event->event == RDMA_CM_EVENT_DISCONNECTED) if (event->event == RDMA_CM_EVENT_DISCONNECTED)
...@@ -199,14 +204,15 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event) ...@@ -199,14 +204,15 @@ void RdmaServerEndpointGroup::processCmEvent(struct rdma_cm_event *event)
{ {
// std::unique_lock lock(_salMutex); // std::unique_lock lock(_salMutex);
/* /*
* Since we are processing event on group after endpoint had processed it . * Since we are processing event on group after endpoint had processed it .
* it is safe to assume that endpoint has been closed already we just need to delete the endpoint * it is safe to assume that endpoint has been closed already we just need to delete the endpoint
*/ */
auto it = _qpSalEndpointMap->find(qp); auto it = _qpSalEndpointMap->find(qp);
if (it != _qpSalEndpointMap->end()) if (it != _qpSalEndpointMap->end())
{ {
_qpSalEndpointMap->erase(qp); _qpSalEndpointMap->erase(qp);
delete ((RdmaSalEndpoint *)event->id->context); delete ((RdmaSalEndpoint *)event->id->context);
std::cout<<_options.statistics->ToString();
} }
auto it2 = _qpRepEndpointMap->find(qp); auto it2 = _qpRepEndpointMap->find(qp);
if (it2 != _qpRepEndpointMap->end()) if (it2 != _qpRepEndpointMap->end())
......
...@@ -5,23 +5,25 @@ ...@@ -5,23 +5,25 @@
#include "Executor.hpp" #include "Executor.hpp"
#include "Properties.hpp" #include "Properties.hpp"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/table.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
int connectToRepServer(Properties &prop, RdmaRepEndpointGroup* repGroup) int connectToRepServer(Properties &prop, RdmaRepEndpointGroup *repGroup)
{ {
int followers = stoi(prop.getValue("FOLLOWERS")); int followers = stoi(prop.getValue("FOLLOWERS"));
std::cout<<"followers "<<followers<<"\n"; std::cout << "followers " << followers << "\n";
RdmaClientRepEndpoint* clientEPs[followers]; RdmaClientRepEndpoint *clientEPs[followers];
for(int i = 0 ;i< followers ;i++) for (int i = 0; i < followers; i++)
{ {
clientEPs[i] = repGroup->createEndpoint(); clientEPs[i] = repGroup->createEndpoint();
std::string ip = prop.getValue("FOLLOWER"+std::to_string(i+1)+"_IP"); std::string ip = prop.getValue("FOLLOWER" + std::to_string(i + 1) + "_IP");
std::string port = prop.getValue("FOLLOWER"+std::to_string(i+1)+"_PORT"); std::string port = prop.getValue("FOLLOWER" + std::to_string(i + 1) + "_PORT");
std::cout<<"Connecting to follower "<<ip<<":"<<port<<"\n"; std::cout << "Connecting to follower " << ip << ":" << port << "\n";
clientEPs[i]->connect(ip.c_str(), port.c_str(), "fol"); clientEPs[i]->connect(ip.c_str(), port.c_str(), "fol");
} }
return 0; return 0;
} }
int main() int main()
{ {
...@@ -37,12 +39,35 @@ int main() ...@@ -37,12 +39,35 @@ int main()
std::string serverPort = prop.getValue("SERVER_PORT"); std::string serverPort = prop.getValue("SERVER_PORT");
int executorPoolSize = stoi(prop.getValue("EXECUTOR_POOL_SIZE")); int executorPoolSize = stoi(prop.getValue("EXECUTOR_POOL_SIZE"));
int enableLogging = stoi(prop.getValue("ENABLE_LOGGING")); int enableLogging = stoi(prop.getValue("ENABLE_LOGGING"));
int maxFlushBackGroundJobs = stoi(prop.getValue("ROCKS_MAX_FLUSH_BACKGROUND_JOBS"));
int maxCompactionBackGroundJobs = stoi(prop.getValue("ROCKS_MAX_COMPACTION_BACKGROUND_JOBS"));
int writeBufferSize = stoi(prop.getValue("ROCKS_WRITE_BUFFER_SIZE"));
int lruCacheSize = stoi(prop.getValue("ROCKS_CACHE_SIZE"));
if (enableLogging == 0) if (enableLogging == 0)
CPPLog::Logger::getInstance()->updateLogLevel(CPPLog::DISABLE_LOG); CPPLog::Logger::getInstance()->updateLogLevel(CPPLog::DISABLE_LOG);
rocksdb::DB *db; rocksdb::DB *db;
rocksdb::Options options; rocksdb::Options options;
options.statistics = rocksdb::CreateDBStatistics();
if (lruCacheSize != -1)
{
std::shared_ptr<rocksdb::Cache> cache = rocksdb::NewLRUCache(8 << 20);
ROCKSDB_NAMESPACE::BlockBasedTableOptions table_options;
table_options.block_cache = cache;
auto table_factory = rocksdb::NewBlockBasedTableFactory(table_options);
options.table_factory.reset(table_factory);
}
if (writeBufferSize != -1)
options.write_buffer_size = writeBufferSize << 20;
options.create_if_missing = true; options.create_if_missing = true;
// setting flush threads
if (maxFlushBackGroundJobs != -1)
options.env->SetBackgroundThreads(maxFlushBackGroundJobs, rocksdb::Env::Priority::HIGH);
// setting compaction threads
if (maxCompactionBackGroundJobs != -1)
options.env->SetBackgroundThreads(maxCompactionBackGroundJobs, rocksdb::Env::Priority::LOW);
// open a database with a name which corresponds to a file system directory // open a database with a name which corresponds to a file system directory
rocksdb::Status status = rocksdb::DB::Open(options, dbpath, &db); rocksdb::Status status = rocksdb::DB::Open(options, dbpath, &db);
if (!status.ok()) if (!status.ok())
...@@ -51,11 +76,11 @@ int main() ...@@ -51,11 +76,11 @@ int main()
exit(1); exit(1);
} }
CPPLog::LOG_ALWAYS("Rocks started\n\n"); CPPLog::LOG_ALWAYS("Rocks started\n\n");
std::cout << "Backgroundtthreads" << options.env->GetBackgroundThreads();
Executor *executor = new Executor(executorPoolSize); Executor *executor = new Executor(executorPoolSize);
RdmaRepEndpointGroup *repgroup = new RdmaRepEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, 1, 100000, db); RdmaRepEndpointGroup *repgroup = new RdmaRepEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, 1, 100000, db);
RdmaServerEndpointGroup *sgroup = new RdmaServerEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, db); RdmaServerEndpointGroup *sgroup = new RdmaServerEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, db, options);
executor->createThreads(repgroup->_qpRepEndpointMap, sgroup->_qpRepEndpointMap, sgroup->_qpSalEndpointMap); executor->createThreads(repgroup->_qpRepEndpointMap, sgroup->_qpRepEndpointMap, sgroup->_qpSalEndpointMap);
repgroup->setExecutor(executor); repgroup->setExecutor(executor);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment