Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
H
hpdos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
SYNERG
hpdos
Commits
6a2d8a83
Commit
6a2d8a83
authored
Apr 17, 2022
by
Paras Garg
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Added local changes to git
parent
cfb4eeec
Changes
27
Hide whitespace changes
Inline
Side-by-side
Showing
27 changed files
with
1539 additions
and
0 deletions
+1539
-0
code/cppserver/.gitignore
code/cppserver/.gitignore
+5
-0
code/cppserver/Makefile
code/cppserver/Makefile
+26
-0
code/cppserver/header/ConcurrentQueue.hpp
code/cppserver/header/ConcurrentQueue.hpp
+158
-0
code/cppserver/header/CqEventData.hpp
code/cppserver/header/CqEventData.hpp
+0
-0
code/cppserver/header/Executor.hpp
code/cppserver/header/Executor.hpp
+22
-0
code/cppserver/header/MessageFormats.hpp
code/cppserver/header/MessageFormats.hpp
+46
-0
code/cppserver/header/RdmaCmProcessor.hpp
code/cppserver/header/RdmaCmProcessor.hpp
+27
-0
code/cppserver/header/RdmaEndpoint.hpp
code/cppserver/header/RdmaEndpoint.hpp
+50
-0
code/cppserver/header/RdmaEndpointGroup.hpp
code/cppserver/header/RdmaEndpointGroup.hpp
+26
-0
code/cppserver/header/RdmaRepCqProcessor.hpp
code/cppserver/header/RdmaRepCqProcessor.hpp
+105
-0
code/cppserver/header/RdmaReplicationEndpoint.hpp
code/cppserver/header/RdmaReplicationEndpoint.hpp
+30
-0
code/cppserver/header/RdmaSalCqProcessor.hpp
code/cppserver/header/RdmaSalCqProcessor.hpp
+29
-0
code/cppserver/header/RdmaSalEndpoint.hpp
code/cppserver/header/RdmaSalEndpoint.hpp
+31
-0
code/cppserver/header/RdmaServerEndpointGroup.hpp
code/cppserver/header/RdmaServerEndpointGroup.hpp
+72
-0
code/cppserver/header/Runnable.hpp
code/cppserver/header/Runnable.hpp
+9
-0
code/cppserver/header/TaskThread.hpp
code/cppserver/header/TaskThread.hpp
+32
-0
code/cppserver/readme.md
code/cppserver/readme.md
+16
-0
code/cppserver/src/Executor.cpp
code/cppserver/src/Executor.cpp
+20
-0
code/cppserver/src/RdmaCmProcessor.cpp
code/cppserver/src/RdmaCmProcessor.cpp
+61
-0
code/cppserver/src/RdmaEndpoint.cpp
code/cppserver/src/RdmaEndpoint.cpp
+155
-0
code/cppserver/src/RdmaRepCqProcessor.cpp
code/cppserver/src/RdmaRepCqProcessor.cpp
+1
-0
code/cppserver/src/RdmaReplicationEndpoint.cpp
code/cppserver/src/RdmaReplicationEndpoint.cpp
+34
-0
code/cppserver/src/RdmaSalCqProcessor.cpp
code/cppserver/src/RdmaSalCqProcessor.cpp
+87
-0
code/cppserver/src/RdmaSalEndpoint.cpp
code/cppserver/src/RdmaSalEndpoint.cpp
+121
-0
code/cppserver/src/RdmaServerEndpointGroup.cpp
code/cppserver/src/RdmaServerEndpointGroup.cpp
+209
-0
code/cppserver/src/Server.cpp
code/cppserver/src/Server.cpp
+19
-0
code/cppserver/src/TaskThread.cpp
code/cppserver/src/TaskThread.cpp
+148
-0
No files found.
code/cppserver/.gitignore
0 → 100644
View file @
6a2d8a83
*.o
.vscode/
.build/*
server
*.txt
\ No newline at end of file
code/cppserver/Makefile
0 → 100644
View file @
6a2d8a83
SRCS
:=
$(
shell
ls
src/
)
SRC_DIR
:=
src
OBJS
:=
$(SRCS:.cpp=.o)
BUILD_DIR
:=
.build
OBJS
:=
$(
addprefix
$(BUILD_DIR)
/,
$(OBJS)
)
CXX
=
g++
CXXFLAGS
+=
-O3
-Wall
-std
=
c++17
-I
header
CXXFLAGS
+=
-g
LIBS
+=
-libverbs
LIBS
+=
-lrdmacm
LIBS
+=
-pthread
LIBS
+=
-lrocksdb
Target
:=
server
.phony
=
clean
$(BUILD_DIR)/%.o
:
$(SRC_DIR)/%.cpp
$(CXX)
-o
$@
$(CXXFLAGS)
-c
$<
$(LIBS)
$(Target)
:
$(OBJS) | $(BUILD_DIR)
$(CXX)
-o
$@
$^
$(LIBS)
$(BUILD_DIR)
:
mkdir
-p
$@
\ No newline at end of file
code/cppserver/header/ConcurrentQueue.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __ConQueue__
#define __ConQueue__
#include <queue>
#include <mutex>
#include <condition_variable>
#include <set>
#include <string>
class
Comparator
{
public:
inline
bool
operator
()(
const
struct
ibv_wc
*
c1
,
const
struct
ibv_wc
*
c2
)
const
{
struct
SalRequest
*
req1
=
(
struct
SalRequest
*
)
c1
->
wr_id
;
struct
SalRequest
*
req2
=
(
struct
SalRequest
*
)
c2
->
wr_id
;
if
(
req1
->
keySize
!=
req2
->
keySize
)
return
true
;
char
*
key1
=
(
char
*
)
req1
+
SalRequestHeaderSize
;
char
*
key2
=
(
char
*
)
req2
+
SalRequestHeaderSize
;
for
(
int
i
=
0
;
i
<
req1
->
keySize
;
i
++
)
{
if
(
key1
[
i
]
!=
key2
[
i
])
return
true
;
}
return
false
;
}
};
class
ConcurrentQueue
{
private:
std
::
queue
<
struct
ibv_wc
*>
queue1
;
std
::
queue
<
struct
ibv_wc
*>
queue2
;
std
::
mutex
queueMutex
;
std
::
set
<
struct
ibv_wc
*
,
Comparator
>
runningRequests
;
std
::
condition_variable
queueCv
;
public:
void
push
(
struct
ibv_wc
*
const
&
data
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
queueMutex
);
queue1
.
push
(
data
);
std
::
cout
<<
data
<<
std
::
endl
;
lock
.
unlock
();
queueCv
.
notify_one
();
}
bool
empty
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
queueMutex
);
return
queue1
.
empty
()
&&
queue2
.
empty
();
}
struct
ibv_wc
*
try_pop
()
{
struct
ibv_wc
*
value
=
NULL
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
queueMutex
);
if
(
queue2
.
empty
())
{
queueCv
.
wait
(
lock
,
[
&
]
{
return
queue1
.
size
()
>
0
;
});
value
=
queue1
.
front
();
queue1
.
pop
();
}
else
{
value
=
queue2
.
front
();
queue2
.
pop
();
}
if
(
value
->
opcode
!=
IBV_WC_RECV
)
{
return
value
;
}
std
::
cout
<<
"value "
<<
value
<<
std
::
endl
;
if
(
runningRequests
.
empty
())
{
runningRequests
.
insert
(
value
);
return
value
;
}
auto
it
=
runningRequests
.
find
(
value
);
if
(
it
!=
runningRequests
.
end
())
{
queue2
.
push
(
value
);
std
::
cout
<<
"found putting in 2"
<<
std
::
endl
;
return
NULL
;
}
return
value
;
}
void
removeFromSet
(
struct
ibv_wc
*
data
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
queueMutex
);
std
::
cout
<<
"removing"
<<
data
<<
std
::
endl
;
runningRequests
.
erase
(
data
);
}
void
wait_and_pop
(
struct
ibv_wc
*&
popped_value
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
queueMutex
);
queueCv
.
wait
(
lock
,
[
&
]
{
return
queue1
.
size
()
>
0
;
});
popped_value
=
queue1
.
front
();
queue1
.
pop
();
}
};
#endif
/*
template <typename Data>
class ConcurrentQueue
{
private:
std::queue1<Data> queue;
std::queue2<Data> queue;
std::mutex queueMutex;
std::condition_variable queueCv;
public:
void push(Data const &data)
{
std::unique_lock<std::mutex> lock(queueMutex);
queue1.push(data);
lock.unlock();
queueCv.notify_one();
}
bool empty()
{
std::unique_lock<std::mutex> lock(queueMutex);
return queue.empty();
}
bool try_pop(Data &popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
if (queue2.empty())
{
return false;
}
popped_value = queue.front();
queue.pop();
return true;
}
void wait_and_pop(Data &popped_value)
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCv.wait(lock, [&]{return queue.size() > 0;});
popped_value = queue.front();
queue.pop();
}
};
#endif
*/
\ No newline at end of file
code/cppserver/header/CqEventData.hpp
0 → 100644
View file @
6a2d8a83
code/cppserver/header/Executor.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __Executor__
#define __Executor__
#include <vector>
#include "CqEventData.hpp"
#include "RdmaEndpointGroup.hpp"
#include "ConcurrentQueue.hpp"
#include "TaskThread.hpp"
class
Executor
{
int
_size
{
0
};
std
::
vector
<
TaskThread
*>
*
_taskThreads
{
NULL
};
ConcurrentQueue
*
_taskQueue
{
NULL
};
RdmaEndpointGroup
*
_group
;
public:
Executor
(
int
size
,
RdmaEndpointGroup
*
group
);
void
submit
(
struct
ibv_wc
*
task
);
void
getTask
();
};
//long affinities[]
#endif
\ No newline at end of file
code/cppserver/header/MessageFormats.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __MessageFormats__
#define __MessageFormats__
enum
RequestType
{
GET
,
PUT
,
DELETE
,
INVALIDATE
};
struct
__attribute__
((
__packed__
))
SalRequest
{
uint32_t
id
;
enum
RequestType
type
;
uint32_t
keySize
;
uint32_t
valueSize
;
};
struct
__attribute__
((
__packed__
))
SalResponse
{
//private:
uint32_t
id
;
enum
RequestType
type
;
//public:
uint32_t
size
;
};
struct
__attribute__
((
__packed__
))
InvRequest
{
//private:
uint32_t
id
;
enum
RequestType
type
;
//public:
uint32_t
keySize
;
};
static
uint32_t
SUCCESS
=
0
;
static
uint32_t
FAILURE
=
1
;
static
int32_t
SalRequestHeaderSize
=
sizeof
(
SalRequest
);
static
int32_t
SalResponseSize
=
sizeof
(
SalResponse
);
static
uint32_t
InvRequestHeaderSize
=
sizeof
(
InvRequest
);
#endif
\ No newline at end of file
code/cppserver/header/RdmaCmProcessor.hpp
0 → 100644
View file @
6a2d8a83
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <iostream>
#ifndef __RDMACMPROCESSOR__
#define __RDMACMPROCESSOR__
#include "RdmaEndpointGroup.hpp"
class
RdmaCmProcessor
{
struct
rdma_event_channel
*
_eventChannel
{
NULL
};
std
::
thread
*
_cmEventThread
{
NULL
};
RdmaEndpointGroup
*
_endpointGroup
{
NULL
};
bool
_stop
{
false
};
public:
RdmaCmProcessor
(
RdmaEndpointGroup
*
group
);
struct
rdma_cm_id
*
createId
();
void
processCmEvent
();
void
start
(
bool
newThread
);
void
close
();
};
#endif
\ No newline at end of file
code/cppserver/header/RdmaEndpoint.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __RDMAENDPOINT__
#define __RDMAENDPOINT__
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <netdb.h>
#include <arpa/inet.h>
#include <map>
#include <vector>
#include <mutex>
#include <shared_mutex>
#include "CqEventData.hpp"
class
RdmaEndpoint
{
public:
static
int
CONN_STATE_INITIALIZED
;
static
int
CONN_STATE_RESOURCES_ALLOCATED
;
static
int
CONN_STATE_CONNECTED
;
static
int
CONN_STATE_CLOSED
;
struct
rdma_cm_id
*
_cm_id
{
NULL
};
struct
ibv_cq
*
_completionQueue
{
NULL
};
struct
ibv_pd
*
_protectionDomain
{
NULL
};
int
_sendQueueSize
{
0
};
int
_recvQueueSize
{
0
};
int
_sendMsgSize
{
0
};
int
_recvMsgSize
{
0
};
int
_state
{
0
};
char
*
_sendBuff
{
NULL
};
char
*
_recvBuff
{
NULL
};
struct
ibv_mr
*
_sendMr
{
NULL
};
struct
ibv_mr
*
_recvMr
{
NULL
};
boost
::
lockfree
::
queue
<
void
*>
*
_sendBuffers
{
NULL
};
RdmaEndpoint
(
struct
rdma_cm_id
*
id
,
struct
ibv_cq
*
completionQueue
,
int
sendQueueSize
,
int
recvQueueSize
,
int
sendMsgSize
,
int
recvMsgSize
);
void
createResources
();
void
processCmEvent
(
struct
rdma_cm_event
*
event
);
void
clientClose
();
virtual
void
processSendCompletion
(
struct
ibv_wc
*
data
)
=
0
;
virtual
void
processRecvCompletion
(
struct
ibv_wc
*
data
)
=
0
;
};
#endif
\ No newline at end of file
code/cppserver/header/RdmaEndpointGroup.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __RDMA_ENDPOINT_GROUP__
#define __RDMA_ENDPOINT_GROUP__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <netdb.h>
#include <arpa/inet.h>
#include <map>
#include <vector>
#include <mutex>
#include <shared_mutex>
#include "RdmaSalEndpoint.hpp"
#include "RdmaReplicationEndpoint.hpp"
class
RdmaEndpointGroup
{
public:
std
::
vector
<
RdmaSalEndpoint
*>
*
_salEps
{
NULL
};
std
::
vector
<
RdmaReplicationEndpoint
*>
*
_repEps
{
NULL
};
std
::
unordered_map
<
uint32_t
,
RdmaReplicationEndpoint
*>
*
_qpRepEndpointMap
{
NULL
};
std
::
unordered_map
<
uint32_t
,
RdmaSalEndpoint
*>
*
_qpSalEndpointMap
{
NULL
};
virtual
void
processCmEvent
(
struct
rdma_cm_event
*
event
)
=
0
;
};
#endif
\ No newline at end of file
code/cppserver/header/RdmaRepCqProcessor.hpp
0 → 100644
View file @
6a2d8a83
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
#include <thread>
#include <unordered_map>
#ifndef __RDMAREPCQPROCESSOR__
#define __RDMAREPCQPROCESSOR__
#include "Executor.hpp"
class
RdmaRepCqProcessor
{
public:
struct
ibv_comp_channel
*
_compChannel
{
NULL
};
struct
ibv_cq
*
_completionQueue
{
NULL
};
std
::
thread
*
_compQueueThread
{
NULL
};
bool
_stop
{
false
};
Executor
*
_executor
{
NULL
};
RdmaRepCqProcessor
(
Executor
*
ex
,
ibv_context
*
verbs
,
int
compQueueSize
)
:
_executor
(
ex
)
{
_compChannel
=
ibv_create_comp_channel
(
verbs
);
if
(
_compChannel
==
NULL
)
{
std
::
cout
<<
"CqProcessr : ibv_create_comp_channel failed
\n
"
;
return
;
}
_completionQueue
=
ibv_create_cq
(
verbs
,
compQueueSize
,
NULL
,
_compChannel
,
0
);
if
(
_completionQueue
==
NULL
)
{
std
::
cout
<<
"CqProcessr : ibv_create_cq failed"
<<
std
::
endl
;
return
;
}
int
ret
=
ibv_req_notify_cq
(
_completionQueue
,
0
);
if
(
ret
)
{
std
::
cout
<<
"CqProcessr : ibv_req_notify_cq failed
\n
"
;
}
}
struct
ibv_cq
*
getCq
()
{
return
_completionQueue
;
}
void
start
()
{
std
::
cout
<<
"CqProcessr : starting process CQ events"
<<
std
::
endl
;
_compQueueThread
=
new
std
::
thread
(
&
RdmaRepCqProcessor
::
processCQEvents
,
this
);
}
void
processCQEvents
()
{
int
ret
=
0
;
struct
ibv_cq
*
cq
;
void
*
context
;
const
int
nevent
=
10
;
struct
ibv_wc
*
wc_array
=
new
struct
ibv_wc
[
nevent
];
while
(
!
_stop
)
{
ret
=
ibv_get_cq_event
(
_compChannel
,
&
cq
,
&
context
);
if
(
ret
==
-
1
)
{
std
::
cout
<<
"CqProcessr : ibv_get_cq_event failed
\n
"
;
close
();
}
ibv_ack_cq_events
(
cq
,
1
);
ret
=
ibv_req_notify_cq
(
_completionQueue
,
0
);
if
(
ret
)
{
std
::
cout
<<
"CqProcessr : ibv_req_notify_cq failed
\n
"
;
close
();
}
ret
=
ibv_poll_cq
(
cq
,
nevent
,
wc_array
);
if
(
ret
<
0
)
{
std
::
cout
<<
"CqProcessr : ibv_poll_cq failed
\n
"
;
close
();
}
if
(
ret
==
0
)
continue
;
for
(
int
i
=
0
;
i
<
ret
;
i
++
)
{
struct
ibv_wc
*
data
=
new
struct
ibv_wc
(
wc_array
[
i
]);
//data->vendor_err = 1;
//_executor->submit(data);
new
std
::
thread
(
&
RdmaRepCqProcessor
::
processRepEvent
,
this
,
data
);
}
//_executor->dispatchRepCqEvents(wc_array, ret);
}
}
void
processRepEvent
(
struct
ibv_wc
*
data
)
{
std
::
cout
<<
"procesing Replication request"
<<
std
::
endl
;
}
void
close
()
{
_stop
=
true
;
if
(
_compQueueThread
!=
NULL
)
_compQueueThread
->
join
();
}
};
#endif
code/cppserver/header/RdmaReplicationEndpoint.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __RDMASERVERENDPOINT__
#define __RDMASERVERENDPOINT__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <errno.h>
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include "RdmaEndpoint.hpp"
#include "CqEventData.hpp"
#include <rocksdb/db.h>
class
RdmaReplicationEndpoint
:
public
RdmaEndpoint
{
rocksdb
::
DB
*
_db
;
std
::
atomic
<
uint64_t
>
_requestId
{
12
};
public:
RdmaReplicationEndpoint
(
struct
rdma_cm_id
*
id
,
struct
ibv_cq
*
completionQueue
,
int
sendQueueSize
,
int
recvQueueSize
,
int
sendMsgSize
,
int
recvMsgSize
,
rocksdb
::
DB
*
_db
);
void
processCqEvent
(
struct
ibv_wc
wc
);
void
processSendCompletion
(
struct
ibv_wc
*
data
);
void
processRecvCompletion
(
struct
ibv_wc
*
data
);
int
sendMessage
(
const
char
*
buffer
,
uint32_t
size
);
void
close
();
};
#endif
\ No newline at end of file
code/cppserver/header/RdmaSalCqProcessor.hpp
0 → 100644
View file @
6a2d8a83
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <iostream>
#include <thread>
#include <unordered_map>
#ifndef __RDMASALCQPROCESSOR__
#define __RDMASALCQPROCESSOR__
#include "Executor.hpp"
class
RdmaSalCqProcessor
{
public:
struct
ibv_comp_channel
*
_compChannel
{
NULL
};
struct
ibv_cq
*
_completionQueue
{
NULL
};
std
::
thread
*
_compQueueThread
{
NULL
};
bool
_stop
{
false
};
Executor
*
_executor
{
NULL
};
RdmaSalCqProcessor
(
Executor
*
ex
,
ibv_context
*
verbs
,
int
compQueueSize
);
struct
ibv_cq
*
getCq
();
void
start
();
void
processCQEvents
();
void
close
();
};
#endif
code/cppserver/header/RdmaSalEndpoint.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __RdmaInvEndpoint__
#define __RdmaInvEndpoint__
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <errno.h>
#include <iostream>
#include <boost/lockfree/queue.hpp>
#include "RdmaEndpoint.hpp"
#include "MessageFormats.hpp"
#include <rocksdb/db.h>
class
RdmaSalEndpoint
:
public
RdmaEndpoint
{
public:
rocksdb
::
DB
*
_db
;
RdmaSalEndpoint
(
struct
rdma_cm_id
*
id
,
struct
ibv_cq
*
completionQueue
,
int
sendQueueSize
,
int
recvQueueSize
,
int
sendMsgSize
,
int
recvMsgSize
,
rocksdb
::
DB
*
_db
);
void
processCqEvent
(
struct
ibv_wc
wc
);
void
processSendCompletion
(
struct
ibv_wc
*
data
);
void
processRecvCompletion
(
struct
ibv_wc
*
data
);
void
processDelete
(
struct
SalRequest
*
);
void
processGet
(
struct
SalRequest
*
req
);
void
processPut
(
struct
SalRequest
*
req
);
int
sendMessage
(
const
char
*
buffer
,
uint32_t
size
);
void
close
();
};
#endif
\ No newline at end of file
code/cppserver/header/RdmaServerEndpointGroup.hpp
0 → 100644
View file @
6a2d8a83
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <stdint.h>
#include <thread>
#include <netdb.h>
#include <arpa/inet.h>
#include <map>
#include <vector>
#include <mutex>
#include <shared_mutex>
#ifndef __RDMASERVERENDPOINTGROUP__
#define __RDMASERVERENDPOINTGROUP__
#include "RdmaReplicationEndpoint.hpp"
#include "RdmaSalEndpoint.hpp"
#include "RdmaEndpointGroup.hpp"
#include "CqEventData.hpp"
#include "Executor.hpp"
#include "RdmaCmProcessor.hpp"
#include "RdmaSalCqProcessor.hpp"
#include "RdmaRepCqProcessor.hpp"
class
RdmaServerEndpointGroup
:
public
RdmaEndpointGroup
{
/*
* Variables to maintain Group state
*/
static
int
CONN_STATE_INITIALIZED
;
static
int
CONN_STATE_BINDED
;
static
int
CONN_STATE_CONNECTED
;
static
int
CONN_STATE_CLOSED
;
struct
rdma_cm_id
*
_cm_id
{
NULL
};
RdmaCmProcessor
*
_cmProcessor
{
NULL
};
RdmaSalCqProcessor
*
_salCqProcessor
{
NULL
};
RdmaRepCqProcessor
*
_repCqProcessor
{
NULL
};
Executor
*
_executor
;
/*
* variables to maintain queue state
*/
int
_sendQueueSize
{
0
};
int
_recvQueueSize
{
0
};
int
_compQueueSize
{
0
};
int
_sendMsgSize
{
0
};
int
_recvMsgSize
{
0
};
rocksdb
::
DB
*
_db
;
mutable
std
::
shared_mutex
_salMutex
;
mutable
std
::
shared_mutex
_repMutex
;
void
clientClose
();
public:
RdmaServerEndpointGroup
(
int
sendQueueSize
,
int
recvQueueSize
,
int
compQueueSize
,
int
sendMsgSize
,
int
recvMsgSize
);
// void setExecutor(Executor *executor);
void
bind
(
const
char
*
ip
,
const
char
*
port
,
int
backlog
);
struct
ibv_cq
*
createSalCq
(
struct
rdma_cm_id
*
id
);
struct
ibv_cq
*
createRepCq
(
struct
rdma_cm_id
*
id
);
void
dispatchRepCqEvents
(
ibv_wc
wc
[],
int
size
);
void
dispatchSalCqEvents
(
ibv_wc
wc
[],
int
size
);
void
setExecutor
(
Executor
*
ex
);
void
startCmProcessor
(
bool
newThread
);
void
processCmEvent
(
struct
rdma_cm_event
*
event
);
void
createEpCmEvent
(
struct
rdma_cm_event
*
event
);
void
close
();
};
#endif
\ No newline at end of file
code/cppserver/header/Runnable.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __Runnable__
#define __Runnable__
#include <iostream>
class
Runnable
{
public:
virtual
void
run
()
=
0
;
};
#endif
\ No newline at end of file
code/cppserver/header/TaskThread.hpp
0 → 100644
View file @
6a2d8a83
#ifndef __TaskThread__
#define __TaskThread__
#include "Runnable.hpp"
#include "CqEventData.hpp"
#include "RdmaEndpointGroup.hpp"
#include "ConcurrentQueue.hpp"
#include <pthread.h>
#include <iostream>
#include <queue>
class
TaskThread
{
private:
ConcurrentQueue
*
_taskQueue
;
bool
_stop
{
false
};
int
_id
;
pthread_t
thread
;
RdmaEndpointGroup
*
_group
;
public:
TaskThread
(
int
id
,
int
cpu
,
ConcurrentQueue
*
,
RdmaEndpointGroup
*
group
);
TaskThread
(
int
id
,
ConcurrentQueue
*
,
RdmaEndpointGroup
*
group
);
void
replicateSalRequest
(
char
*
salRequest
,
uint32_t
size
);
static
void
*
run
(
void
*
object
);
void
stop
();
void
processEvent
(
struct
ibv_wc
*
data
);
~
TaskThread
();
};
#endif
\ No newline at end of file
code/cppserver/readme.md
0 → 100644
View file @
6a2d8a83
// https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/
// https://man7.org/linux/man-pages/man3/ibv_poll_cq.3.html
// https://man7.org/linux/man-pages/man3/ibv_get_cq_event.3.
//https://docs.microsoft.com/en-us/cpp/cpp/delegating-constructors?view=msvc-170
https://www.toptal.com/c-plus-plus/c-plus-plus-understanding-compilation
/
*
cpu_set_t cpuset;
CPU_ZERO(
&cpuset);
CPU_SET(i,
&cpuset);
int rc = pthread_setaffinity_np(threads[i].native_handle(),
sizeof(cpu_set_t),
&cpuset);
if (rc != 0) {
std::cerr << "Error calling pthread_setaffinity_np: " << rc << "
\n
";
}
*
/
\ No newline at end of file
code/cppserver/src/Executor.cpp
0 → 100644
View file @
6a2d8a83
#include "Executor.hpp"
Executor
::
Executor
(
int
size
,
RdmaEndpointGroup
*
group
)
:
_size
(
size
),
_group
(
group
)
{
_taskQueue
=
new
ConcurrentQueue
();
_taskThreads
=
new
std
::
vector
<
TaskThread
*>
();
_taskThreads
->
reserve
(
size
);
for
(
int
i
=
0
;
i
<
_size
;
i
++
)
{
TaskThread
*
thread
=
new
TaskThread
(
i
,
_taskQueue
,
_group
);
_taskThreads
->
push_back
(
thread
);
}
}
void
Executor
::
submit
(
struct
ibv_wc
*
task
)
{
_taskQueue
->
push
(
task
);
}
void
Executor
::
getTask
()
{
}
\ No newline at end of file
code/cppserver/src/RdmaCmProcessor.cpp
0 → 100644
View file @
6a2d8a83
#include "RdmaCmProcessor.hpp"
#include <iostream>
RdmaCmProcessor
::
RdmaCmProcessor
(
RdmaEndpointGroup
*
group
)
:
_endpointGroup
(
group
)
{
std
::
cout
<<
"CMProcessor : Step 1 creating event channel"
<<
std
::
endl
;
_eventChannel
=
rdma_create_event_channel
();
if
(
_eventChannel
==
NULL
)
{
std
::
cout
<<
"CMProcesor : error creating event channel"
;
}
}
struct
rdma_cm_id
*
RdmaCmProcessor
::
createId
()
{
struct
rdma_cm_id
*
id
=
NULL
;
int
ret
=
rdma_create_id
(
_eventChannel
,
&
id
,
NULL
,
RDMA_PS_TCP
);
if
(
ret
==
-
1
)
std
::
cout
<<
"CMProcesor : rdma_create_id failed"
<<
std
::
endl
;
return
id
;
}
void
RdmaCmProcessor
::
processCmEvent
()
{
int
ret
;
struct
rdma_cm_event
*
event
;
std
::
cout
<<
"CMProcessor : starting cm processing thread"
<<
std
::
endl
;
while
(
!
_stop
)
{
ret
=
rdma_get_cm_event
(
_eventChannel
,
&
event
);
if
(
ret
)
{
std
::
cout
<<
"CMProcesor : rdma_get_cm_event failed"
<<
std
::
endl
;
continue
;
}
_endpointGroup
->
processCmEvent
(
event
);
ret
=
rdma_ack_cm_event
(
event
);
if
(
ret
)
{
std
::
cout
<<
"CMProcesor : rdma_ack_cm_event failed"
;
}
}
}
void
RdmaCmProcessor
::
start
(
bool
newThread
)
{
if
(
newThread
)
_cmEventThread
=
new
std
::
thread
(
&
RdmaCmProcessor
::
processCmEvent
,
this
);
else
processCmEvent
();
}
void
RdmaCmProcessor
::
close
()
{
_stop
=
true
;
if
(
_cmEventThread
!=
NULL
)
_cmEventThread
->
join
();
rdma_destroy_event_channel
(
_eventChannel
);
}
code/cppserver/src/RdmaEndpoint.cpp
0 → 100644
View file @
6a2d8a83
#include "RdmaEndpoint.hpp"
int
RdmaEndpoint
::
CONN_STATE_INITIALIZED
=
1
;
int
RdmaEndpoint
::
CONN_STATE_RESOURCES_ALLOCATED
=
2
;
int
RdmaEndpoint
::
CONN_STATE_CONNECTED
=
3
;
int
RdmaEndpoint
::
CONN_STATE_CLOSED
=
4
;
RdmaEndpoint
::
RdmaEndpoint
(
struct
rdma_cm_id
*
id
,
struct
ibv_cq
*
completionQueue
,
int
sendQueueSize
,
int
recvQueueSize
,
int
sendMsgSize
,
int
recvMsgSize
)
:
_cm_id
(
id
),
_completionQueue
(
completionQueue
),
_sendQueueSize
(
sendQueueSize
),
_recvQueueSize
(
recvQueueSize
),
_sendMsgSize
(
sendMsgSize
),
_recvMsgSize
(
recvMsgSize
)
{
_state
=
CONN_STATE_INITIALIZED
;
_sendBuffers
=
new
boost
::
lockfree
::
queue
<
void
*>
(
_sendMsgSize
);
}
void
RdmaEndpoint
::
createResources
()
{
if
(
_state
!=
CONN_STATE_INITIALIZED
)
{
std
::
cout
<<
"RdmaEndpoint : createResource invalid satte"
<<
std
::
endl
;
}
_protectionDomain
=
ibv_alloc_pd
(
_cm_id
->
verbs
);
if
(
_protectionDomain
==
NULL
)
{
std
::
cout
<<
"RdmaEndpoint : ibv_alloc_pd failed "
<<
std
::
endl
;
return
;
}
struct
ibv_qp_init_attr
qp_init_attr
;
memset
(
&
qp_init_attr
,
0
,
sizeof
(
qp_init_attr
));
//This is used to set endpoint address with qp
qp_init_attr
.
qp_context
=
(
void
*
)
this
;
// if not set 0, all work requests submitted to SQ will always generate a Work Completion
qp_init_attr
.
sq_sig_all
=
1
;
// completion queue can be shared or you can use distinct completion queues.
qp_init_attr
.
send_cq
=
_completionQueue
;
qp_init_attr
.
recv_cq
=
_completionQueue
;
qp_init_attr
.
qp_type
=
IBV_QPT_RC
;
// increase if you want to keep more send work requests in the SQ.
qp_init_attr
.
cap
.
max_send_wr
=
_sendQueueSize
;
// increase if you want to keep more receive work requests in the RQ.
qp_init_attr
.
cap
.
max_recv_wr
=
_recvQueueSize
;
// increase if you allow send work requests to have multiple scatter gather entry (SGE).
qp_init_attr
.
cap
.
max_send_sge
=
1
;
// increase if you allow receive work requests to have multiple scatter gather entry (SGE).
qp_init_attr
.
cap
.
max_recv_sge
=
1
;
int
ret
=
rdma_create_qp
(
_cm_id
,
_protectionDomain
,
&
qp_init_attr
);
if
(
ret
)
{
std
::
cout
<<
"RdmaEndpoint : ibv_create_cq failed
\n
"
;
}
if
(
_cm_id
->
pd
==
NULL
)
{
std
::
cout
<<
"RdmaEndpoint : pd not set"
<<
std
::
endl
;
_cm_id
->
pd
=
_protectionDomain
;
}
_sendBuff
=
(
char
*
)
malloc
(
_sendMsgSize
*
_sendQueueSize
);
if
(
_sendBuff
==
NULL
)
std
::
cout
<<
"RdmaEndpoint : sendBuff malloc failed"
<<
std
::
endl
;
_recvBuff
=
(
char
*
)
malloc
(
_recvMsgSize
*
_recvQueueSize
);
_sendMr
=
rdma_reg_write
(
_cm_id
,
reinterpret_cast
<
void
*>
(
_sendBuff
),
_sendMsgSize
*
_sendQueueSize
);
if
(
_sendMr
==
NULL
)
std
::
cout
<<
"RdmaEndpoint : sendMr reg failed"
<<
std
::
endl
;
if
(
_recvBuff
==
NULL
)
std
::
cout
<<
"RdmaEndpoint : recvBuff malloc failed"
<<
std
::
endl
;
_recvMr
=
rdma_reg_read
(
_cm_id
,
reinterpret_cast
<
void
*>
(
_recvBuff
),
_recvMsgSize
*
_recvQueueSize
);
if
(
_recvMr
==
NULL
)
std
::
cout
<<
"RdmaEndpoint : recvMr reg failed"
<<
std
::
endl
;
for
(
int
i
=
0
;
i
<
_recvQueueSize
;
i
++
)
{
char
*
const
location
=
_recvBuff
+
i
*
_recvMsgSize
;
rdma_post_recv
(
_cm_id
,
reinterpret_cast
<
void
*>
(
location
),
reinterpret_cast
<
void
*>
(
location
),
_recvMsgSize
,
_recvMr
);
}
for
(
int
i
=
0
;
i
<
_sendQueueSize
;
i
++
)
{
void
*
const
location
=
_sendBuff
+
i
*
_sendMsgSize
;
_sendBuffers
->
push
(
location
);
}
_state
=
CONN_STATE_RESOURCES_ALLOCATED
;
}
void
RdmaEndpoint
::
processCmEvent
(
struct
rdma_cm_event
*
event
)
{
std
::
cout
<<
"RdmaEndpoint : Event "
<<
rdma_event_str
(
event
->
event
)
<<
std
::
endl
;
if
(
event
->
event
==
RDMA_CM_EVENT_CONNECT_REQUEST
)
{
std
::
cout
<<
"RdmaEndpoint : Connect request"
;
}
else
if
(
event
->
event
==
RDMA_CM_EVENT_ESTABLISHED
)
{
if
(
_state
!=
CONN_STATE_RESOURCES_ALLOCATED
)
{
std
::
cout
<<
"RdmaEndpoint : EstablishedEvent invalid state "
<<
std
::
endl
;
}
std
::
cout
<<
"RdmaEndpoint : step 6 Connected"
<<
std
::
endl
;
_state
=
CONN_STATE_CONNECTED
;
}
else
if
(
event
->
event
==
RDMA_CM_EVENT_DISCONNECTED
)
{
std
::
cout
<<
"RdmaEndpoint : step 7 disconnected"
<<
std
::
endl
;
clientClose
();
}
}
void
RdmaEndpoint
::
clientClose
()
{
if
(
_state
!=
CONN_STATE_CONNECTED
)
{
std
::
cout
<<
"RdmaEndpoint : clientClose invalid state"
<<
std
::
endl
;
return
;
}
std
::
cout
<<
"RdmaEndpoint : closing connection qp "
<<
_cm_id
->
qp
->
qp_num
<<
std
::
endl
;
int
ret
;
ret
=
rdma_disconnect
(
_cm_id
);
if
(
ret
)
{
std
::
cout
<<
"RdmaEndpoint : rdma_disconnect failed"
<<
std
::
endl
;
}
ret
=
rdma_dereg_mr
(
_sendMr
);
if
(
ret
)
{
std
::
cout
<<
"RdmaEndpoint : rdma_dereg_mr send failed"
<<
std
::
endl
;
}
free
(
_sendBuff
);
ret
=
rdma_dereg_mr
(
_recvMr
);
if
(
ret
)
{
std
::
cout
<<
"RdmaEndpoint : rdma_dereg_mr recv failed"
<<
std
::
endl
;
}
free
(
_recvBuff
);
rdma_destroy_qp
(
_cm_id
);
std
::
cout
<<
"des qp"
<<
std
::
endl
;
// ret = rdma_destroy_id(_cm_id);
std
::
cout
<<
"des mr"
<<
std
::
endl
;
if
(
ret
)
{
std
::
cout
<<
"RdmaEndpoint : rdma_destroy_id failed"
<<
std
::
endl
;
}
_state
=
CONN_STATE_CLOSED
;
std
::
cout
<<
"closed"
<<
std
::
endl
;
}
code/cppserver/src/RdmaRepCqProcessor.cpp
0 → 100644
View file @
6a2d8a83
#include "RdmaRepCqProcessor.hpp"
\ No newline at end of file
code/cppserver/src/RdmaReplicationEndpoint.cpp
0 → 100644
View file @
6a2d8a83
#include "RdmaReplicationEndpoint.hpp"
RdmaReplicationEndpoint
::
RdmaReplicationEndpoint
(
struct
rdma_cm_id
*
id
,
struct
ibv_cq
*
completionQueue
,
int
sendQueueSize
,
int
recvQueueSize
,
int
sendMsgSize
,
int
recvMsgSize
,
rocksdb
::
DB
*
db
)
:
RdmaEndpoint
(
id
,
completionQueue
,
sendQueueSize
,
recvQueueSize
,
sendMsgSize
,
recvMsgSize
)
,
_db
(
db
)
{
}
void
RdmaReplicationEndpoint
::
processSendCompletion
(
struct
ibv_wc
*
data
)
{
std
::
cout
<<
"send completion
\n
"
;
_sendBuffers
->
push
((
void
*
)
data
->
wr_id
);
}
void
RdmaReplicationEndpoint
::
processRecvCompletion
(
struct
ibv_wc
*
data
)
{
std
::
cout
<<
"recv completion
\n
"
;
std
::
cout
<<
"recieve"
<<
(
char
*
)(
data
->
wr_id
)
<<
"
\n
"
;
char
*
request
=
new
char
[
data
->
byte_len
];
memcpy
(
request
,
(
void
*
)
data
->
wr_id
,
data
->
byte_len
);
rdma_post_recv
(
_cm_id
,
(
void
*
)
data
->
wr_id
,
(
void
*
)
data
->
wr_id
,
_recvMsgSize
,
_recvMr
);
}
int
RdmaReplicationEndpoint
::
sendMessage
(
const
char
*
buffer
,
uint32_t
size
)
{
if
(
size
>
_sendMsgSize
)
return
-
1
;
void
*
sendBuffer
=
nullptr
;
_sendBuffers
->
pop
(
sendBuffer
);
if
(
sendBuffer
==
nullptr
)
return
-
1
;
memcpy
(
sendBuffer
,
buffer
,
size
);
return
rdma_post_send
(
_cm_id
,
sendBuffer
,
sendBuffer
,
size
,
_sendMr
,
0
);
}
code/cppserver/src/RdmaSalCqProcessor.cpp
0 → 100644
View file @
6a2d8a83
#include "RdmaSalCqProcessor.hpp"
RdmaSalCqProcessor
::
RdmaSalCqProcessor
(
Executor
*
ex
,
ibv_context
*
verbs
,
int
compQueueSize
)
:
_executor
(
ex
)
{
_compChannel
=
ibv_create_comp_channel
(
verbs
);
if
(
_compChannel
==
NULL
)
{
std
::
cout
<<
"SalCqProcessr : ibv_create_comp_channel failed
\n
"
;
return
;
}
_completionQueue
=
ibv_create_cq
(
verbs
,
compQueueSize
,
NULL
,
_compChannel
,
0
);
if
(
_completionQueue
==
NULL
)
{
std
::
cout
<<
"SalCqProcessr : ibv_create_cq failed"
<<
std
::
endl
;
return
;
}
int
ret
=
ibv_req_notify_cq
(
_completionQueue
,
0
);
if
(
ret
)
{
std
::
cout
<<
"SalCqProcessr : ibv_req_notify_cq failed
\n
"
;
}
}
struct
ibv_cq
*
RdmaSalCqProcessor
::
getCq
()
{
return
_completionQueue
;
}
void
RdmaSalCqProcessor
::
start
()
{
std
::
cout
<<
"SalCqProcessr : starting process CQ events"
<<
std
::
endl
;
_compQueueThread
=
new
std
::
thread
(
&
RdmaSalCqProcessor
::
processCQEvents
,
this
);
}
void
RdmaSalCqProcessor
::
processCQEvents
()
{
int
ret
=
0
;
struct
ibv_cq
*
cq
;
void
*
context
;
const
int
nevent
=
10
;
struct
ibv_wc
wc_array
[
nevent
];
while
(
!
_stop
)
{
/*
* get_CQ_event is a blocking call and it wait save some cpu cycles but.
* it might not be that efficient compared to polling
*/
ret
=
ibv_get_cq_event
(
_compChannel
,
&
cq
,
&
context
);
if
(
ret
==
-
1
)
{
std
::
cout
<<
"SalCqProcessr : ibv_get_cq_event failed
\n
"
;
close
();
}
ibv_ack_cq_events
(
cq
,
1
);
/*
* Create a request for next completion cycle
*/
ret
=
ibv_req_notify_cq
(
_completionQueue
,
0
);
if
(
ret
)
{
std
::
cout
<<
"SalCqProcessr : ibv_req_notify_cq failed
\n
"
;
close
();
}
ret
=
ibv_poll_cq
(
cq
,
nevent
,
wc_array
);
if
(
ret
<
0
)
{
std
::
cout
<<
"SalCqProcessr : ibv_poll_cq failed
\n
"
;
close
();
}
if
(
ret
==
0
)
continue
;
for
(
int
i
=
0
;
i
<
ret
;
i
++
)
{
struct
ibv_wc
*
data
=
new
struct
ibv_wc
(
wc_array
[
i
]);
/*
* vendor_err is set to check whether the request came from sal or followers
* data->vendor_err = 0;
*/
_executor
->
submit
(
data
);
}
// _executor->dispatchSalCqEvents(wc_array, ret);
}
}
void
RdmaSalCqProcessor
::
close
()
{
_stop
=
true
;
if
(
_compQueueThread
!=
NULL
)
_compQueueThread
->
join
();
}
\ No newline at end of file
code/cppserver/src/RdmaSalEndpoint.cpp
0 → 100644
View file @
6a2d8a83
#include "RdmaSalEndpoint.hpp"
#include "MessageFormats.hpp"
RdmaSalEndpoint
::
RdmaSalEndpoint
(
struct
rdma_cm_id
*
id
,
struct
ibv_cq
*
completionQueue
,
int
sendQueueSize
,
int
recvQueueSize
,
int
sendMsgSize
,
int
recvMsgSize
,
rocksdb
::
DB
*
db
)
:
RdmaEndpoint
(
id
,
completionQueue
,
sendQueueSize
,
recvQueueSize
,
sendMsgSize
,
recvMsgSize
)
,
_db
(
db
)
{
}
void
RdmaSalEndpoint
::
processSendCompletion
(
struct
ibv_wc
*
data
)
{
std
::
cout
<<
"send completion
\n
"
;
_sendBuffers
->
push
((
void
*
)
data
->
wr_id
);
}
int
RdmaSalEndpoint
::
sendMessage
(
const
char
*
buffer
,
uint32_t
size
)
{
if
(
size
>
_sendMsgSize
)
return
-
1
;
void
*
sendBuffer
=
nullptr
;
_sendBuffers
->
pop
(
sendBuffer
);
if
(
sendBuffer
==
nullptr
)
return
-
1
;
memcpy
(
sendBuffer
,
buffer
,
size
);
return
rdma_post_send
(
_cm_id
,
sendBuffer
,
sendBuffer
,
size
,
_sendMr
,
0
);
}
void
RdmaSalEndpoint
::
processRecvCompletion
(
struct
ibv_wc
*
data
)
{
char
*
request
=
new
char
[
data
->
byte_len
];
memcpy
(
request
,
(
void
*
)
data
->
wr_id
,
data
->
byte_len
);
struct
SalRequest
*
req
=
(
struct
SalRequest
*
)
request
;
rdma_post_recv
(
_cm_id
,
(
void
*
)
data
->
wr_id
,
(
void
*
)
data
->
wr_id
,
_recvMsgSize
,
_recvMr
);
std
::
cout
<<
"recieve id : "
<<
req
->
id
<<
" "
<<
(
char
*
)
req
+
SalRequestHeaderSize
<<
" "
<<
req
->
type
<<
"size"
<<
data
->
byte_len
<<
"
\n
"
;
if
(
req
->
type
==
RequestType
::
DELETE
)
processDelete
(
req
);
if
(
req
->
type
==
RequestType
::
GET
)
processGet
(
req
);
if
(
req
->
type
==
RequestType
::
PUT
)
processPut
(
req
);
delete
[]
request
;
}
void
RdmaSalEndpoint
::
processDelete
(
struct
SalRequest
*
req
)
{
std
::
cout
<<
"0
\n
"
;
rocksdb
::
Status
s
=
_db
->
Delete
(
rocksdb
::
WriteOptions
(),
{(
char
*
)
req
+
SalRequestHeaderSize
,
req
->
keySize
});
std
::
cout
<<
"1
\n
"
;
void
*
sendBuf
=
nullptr
;
_sendBuffers
->
pop
(
sendBuf
);
if
(
sendBuf
==
nullptr
)
{
return
;
}
char
*
sendBuffer
=
(
char
*
)
sendBuf
;
std
::
cout
<<
"2 "
<<
req
->
id
<<
"
\n
"
;
memcpy
(
sendBuffer
,
&
(
req
->
id
),
sizeof
(
uint32_t
));
if
(
s
.
ok
())
{
std
::
cout
<<
"33
\n
"
;
memcpy
(
sendBuffer
+
4
,
&
SUCCESS
,
sizeof
(
int
));
std
::
cout
<<
"44
\n
"
;
}
else
{
std
::
cout
<<
"331
\n
"
;
memcpy
(
sendBuffer
+
4
,
&
FAILURE
,
sizeof
(
int
));
std
::
cout
<<
"441
\n
"
;
}
rdma_post_send
(
_cm_id
,
sendBuffer
,
sendBuffer
,
_sendMsgSize
,
_sendMr
,
0
);
std
::
cout
<<
"3
\n
"
;
}
void
RdmaSalEndpoint
::
processGet
(
struct
SalRequest
*
req
)
{
std
::
string
value
;
rocksdb
::
Status
s
=
_db
->
Get
(
rocksdb
::
ReadOptions
(),
{(
char
*
)
req
+
SalRequestHeaderSize
,
req
->
keySize
},
&
value
);
void
*
sendBuf
=
nullptr
;
_sendBuffers
->
pop
(
sendBuf
);
if
(
sendBuf
==
nullptr
)
{
return
;
}
char
*
sendBuffer
=
(
char
*
)
sendBuf
;
memcpy
(
sendBuffer
,
&
(
req
->
id
),
sizeof
(
uint32_t
));
if
(
s
.
ok
())
{
memcpy
(
sendBuffer
+
4
,
&
SUCCESS
,
sizeof
(
int
));
memcpy
(
sendBuffer
+
8
,
(
void
*
)
value
.
size
(),
sizeof
(
value
.
size
()));
memcpy
(
sendBuffer
+
12
,
value
.
c_str
(),
value
.
size
());
}
else
{
memcpy
(
sendBuffer
+
4
,
&
FAILURE
,
sizeof
(
int
));
}
rdma_post_send
(
_cm_id
,
sendBuffer
,
sendBuffer
,
_sendMsgSize
,
_sendMr
,
0
);
}
void
RdmaSalEndpoint
::
processPut
(
struct
SalRequest
*
req
)
{
rocksdb
::
Status
s
=
_db
->
Put
(
rocksdb
::
WriteOptions
(),
{(
char
*
)
req
+
SalRequestHeaderSize
,
req
->
keySize
},
{(
char
*
)
req
+
SalRequestHeaderSize
+
req
->
keySize
,
req
->
valueSize
});
void
*
sendBuf
=
nullptr
;
_sendBuffers
->
pop
(
sendBuf
);
if
(
sendBuf
!=
nullptr
)
{
return
;
}
char
*
sendBuffer
=
(
char
*
)
sendBuf
;
memcpy
(
sendBuffer
,
&
(
req
->
id
),
sizeof
(
uint32_t
));
if
(
s
.
ok
())
memcpy
(
sendBuffer
+
4
,
&
SUCCESS
,
sizeof
(
int
));
else
memcpy
(
sendBuffer
+
4
,
&
FAILURE
,
sizeof
(
int
));
rdma_post_send
(
_cm_id
,
sendBuffer
,
sendBuffer
,
_sendMsgSize
,
_sendMr
,
0
);
}
code/cppserver/src/RdmaServerEndpointGroup.cpp
0 → 100644
View file @
6a2d8a83
#include "RdmaServerEndpointGroup.hpp"
int
RdmaServerEndpointGroup
::
CONN_STATE_INITIALIZED
=
0
;
int
RdmaServerEndpointGroup
::
CONN_STATE_BINDED
=
3
;
int
RdmaServerEndpointGroup
::
CONN_STATE_CONNECTED
=
4
;
int
RdmaServerEndpointGroup
::
CONN_STATE_CLOSED
=
5
;
RdmaServerEndpointGroup
::
RdmaServerEndpointGroup
(
int
sendQueueSize
,
int
recvQueueSize
,
int
compQueueSize
,
int
sendMsgSize
,
int
recvMsgSize
)
:
_sendQueueSize
(
sendQueueSize
),
_recvQueueSize
(
recvQueueSize
),
_compQueueSize
(
compQueueSize
),
_sendMsgSize
(
sendMsgSize
),
_recvMsgSize
(
recvMsgSize
)
{
_cmProcessor
=
new
RdmaCmProcessor
(
this
);
_cm_id
=
_cmProcessor
->
createId
();
_qpRepEndpointMap
=
new
std
::
unordered_map
<
uint32_t
,
RdmaReplicationEndpoint
*>
();
_qpSalEndpointMap
=
new
std
::
unordered_map
<
uint32_t
,
RdmaSalEndpoint
*>
();
_salEps
=
new
std
::
vector
<
RdmaSalEndpoint
*>
();
_repEps
=
new
std
::
vector
<
RdmaReplicationEndpoint
*>
();
rocksdb
::
Options
options
;
options
.
create_if_missing
=
true
;
// open a database with a name which corresponds to a file system directory
rocksdb
::
Status
status
=
rocksdb
::
DB
::
Open
(
options
,
"/tmp/testdb"
,
&
_db
);
std
::
cout
<<
"Rocks started"
<<
std
::
endl
;
if
(
!
status
.
ok
())
{
std
::
cout
<<
status
.
ToString
()
<<
std
::
endl
;
exit
(
1
);
}
}
void
RdmaServerEndpointGroup
::
setExecutor
(
Executor
*
ex
)
{
_executor
=
ex
;
}
void
RdmaServerEndpointGroup
::
startCmProcessor
(
bool
newThread
)
{
_cmProcessor
->
start
(
newThread
);
}
void
RdmaServerEndpointGroup
::
bind
(
const
char
*
ip
,
const
char
*
port
,
int
backlog
)
{
int
ret
;
std
::
cout
<<
"RdmaServerEndpointGroup : Step 2 bind_addr"
<<
std
::
endl
;
struct
addrinfo
*
addr
;
ret
=
getaddrinfo
(
ip
,
port
,
NULL
,
&
addr
);
if
(
ret
)
{
std
::
cout
<<
"RdmaServerEndpointGroup : get_addr_info failed"
<<
std
::
endl
;
}
ret
=
rdma_bind_addr
(
_cm_id
,
addr
->
ai_addr
);
if
(
ret
)
{
std
::
cout
<<
"RdmaServerEndpointGroup : rdma_bind_addr failed"
<<
std
::
endl
;
return
;
}
std
::
cout
<<
"RdmaServerEndpointGroup : rdma_bind_addr successful
\n
"
;
ret
=
rdma_listen
(
_cm_id
,
backlog
);
if
(
ret
)
{
std
::
cout
<<
"RdmaServerEndpointGroup : rdma_listen failed"
<<
std
::
endl
;
return
;
}
}
struct
ibv_cq
*
RdmaServerEndpointGroup
::
createSalCq
(
struct
rdma_cm_id
*
id
)
{
if
(
_salCqProcessor
==
NULL
)
{
std
::
cout
<<
"RdmaServerEndpointGroup : step 5 create salcq processor"
<<
std
::
endl
;
_salCqProcessor
=
new
RdmaSalCqProcessor
(
_executor
,
_cm_id
->
verbs
,
_compQueueSize
);
_salCqProcessor
->
start
();
}
return
_salCqProcessor
->
getCq
();
}
struct
ibv_cq
*
RdmaServerEndpointGroup
::
createRepCq
(
struct
rdma_cm_id
*
id
)
{
if
(
_repCqProcessor
==
NULL
)
{
std
::
cout
<<
"RdmaServerEndpointGroup : step 5 create repcq processor"
<<
std
::
endl
;
_repCqProcessor
=
new
RdmaRepCqProcessor
(
_executor
,
_cm_id
->
verbs
,
_compQueueSize
);
_repCqProcessor
->
start
();
}
return
_repCqProcessor
->
getCq
();
}
void
RdmaServerEndpointGroup
::
createEpCmEvent
(
struct
rdma_cm_event
*
event
)
{
std
::
cout
<<
"RdmaServerEndpointGroup : step 4 Got Connect Request Server Endpoint"
<<
std
::
endl
;
/* create and add to vectors for replication and invalidation processing*/
const
char
*
connData
=
reinterpret_cast
<
const
char
*>
(
event
->
param
.
conn
.
private_data
);
if
(
strcmp
(
connData
,
"sal"
)
==
0
)
{
std
::
cout
<<
"sal"
<<
std
::
endl
;
RdmaSalEndpoint
*
endpoint
=
new
RdmaSalEndpoint
(
event
->
id
,
createSalCq
(
event
->
id
),
_sendQueueSize
,
_recvQueueSize
,
_sendMsgSize
,
_recvMsgSize
,
_db
);
event
->
id
->
context
=
(
void
*
)
endpoint
;
endpoint
->
createResources
();
std
::
unique_lock
lock
(
_salMutex
);
_salEps
->
push_back
(
endpoint
);
std
::
cout
<<
"qp num"
<<
event
->
id
->
qp
->
qp_num
<<
" "
<<
_qpSalEndpointMap
->
size
();
_qpSalEndpointMap
->
emplace
(
event
->
id
->
qp
->
qp_num
,
endpoint
);
std
::
cout
<<
_qpSalEndpointMap
->
size
()
<<
std
::
endl
;
}
else
{
RdmaReplicationEndpoint
*
endpoint
=
new
RdmaReplicationEndpoint
(
event
->
id
,
createRepCq
(
event
->
id
),
_sendQueueSize
,
_recvQueueSize
,
_sendMsgSize
,
_recvMsgSize
,
_db
);
event
->
id
->
context
=
(
void
*
)
endpoint
;
endpoint
->
createResources
();
std
::
unique_lock
lock
(
_repMutex
);
_repEps
->
push_back
(
endpoint
);
_qpRepEndpointMap
->
emplace
(
event
->
id
->
qp
->
qp_num
,
endpoint
);
}
rdma_accept
(
event
->
id
,
NULL
);
}
void
RdmaServerEndpointGroup
::
processCmEvent
(
struct
rdma_cm_event
*
event
)
{
std
::
cout
<<
"RdmaServerEndpointGroup : event "
<<
rdma_event_str
(
event
->
event
)
<<
" id "
<<
event
->
id
<<
" "
<<
std
::
endl
;
/*
* Connect request came on listener ie endpointgroup
* in this listen_id contains cm_id of listener and id contains the new id of connection request
*/
if
(
event
->
event
==
RDMA_CM_EVENT_CONNECT_REQUEST
&&
event
->
listen_id
!=
NULL
)
{
createEpCmEvent
(
event
);
}
/*
* Event came for listener ie endpointgroup
*/
else
if
(
event
->
id
!=
NULL
&&
_cm_id
==
event
->
id
)
{
if
(
event
->
event
==
RDMA_CM_EVENT_DISCONNECTED
)
{
std
::
cout
<<
"RdmaServerEndpointGroup : Disconnect Server Endpoint"
<<
std
::
endl
;
close
();
}
else
{
std
::
cout
<<
"RdmaServerEndpointGroup : unknown Event for listener"
<<
rdma_event_str
(
event
->
event
)
<<
std
::
endl
;
}
}
/*
* Event came for a endpoint
*/
else
if
(
event
->
id
!=
NULL
&&
event
->
id
->
context
!=
NULL
)
{
RdmaEndpoint
*
ep
=
((
RdmaEndpoint
*
)
event
->
id
->
context
);
ep
->
processCmEvent
(
event
);
if
(
event
->
event
==
RDMA_CM_EVENT_DISCONNECTED
)
{
{
std
::
unique_lock
lock
(
_repMutex
);
auto
it
=
std
::
find
(
_repEps
->
begin
(),
_repEps
->
end
(),
(
RdmaReplicationEndpoint
*
)
ep
);
if
(
it
!=
_repEps
->
end
())
{
_repEps
->
erase
(
it
);
delete
((
RdmaReplicationEndpoint
*
)
event
->
id
->
context
);
}
}
{
std
::
unique_lock
lock
(
_salMutex
);
auto
it
=
std
::
find
(
_salEps
->
begin
(),
_salEps
->
end
(),
(
RdmaSalEndpoint
*
)
ep
);
if
(
it
!=
_salEps
->
end
())
{
_salEps
->
erase
(
it
);
delete
((
RdmaSalEndpoint
*
)
event
->
id
->
context
);
}
}
}
/*if (event->event == RDMA_CM_EVENT_TIMEWAIT_EXIT)
{
std::cout<<"des"<<std::endl;
rdma_destroy_id(event->id);
std::cout<<"des"<<std::endl;
}
*/
}
else
{
std
::
cout
<<
"RdmaServerEndpointGroup : Not able to procces CM EVent"
<<
rdma_event_str
(
event
->
event
)
<<
event
->
id
<<
" "
<<
event
->
listen_id
<<
std
::
endl
;
}
}
void
RdmaServerEndpointGroup
::
clientClose
()
{
_cmProcessor
->
close
();
delete
_cmProcessor
;
_salCqProcessor
->
close
();
_repCqProcessor
->
close
();
delete
_salCqProcessor
;
delete
_repCqProcessor
;
/* for(int i = 0 ;i < _endPoints->size();i++)
{
_endPoints->at(i)->close();
}
delete _endPoints;
*/
rdma_disconnect
(
_cm_id
);
rdma_destroy_id
(
_cm_id
);
}
void
RdmaServerEndpointGroup
::
close
()
{
}
code/cppserver/src/Server.cpp
0 → 100644
View file @
6a2d8a83
#include <iostream>
#include "RdmaServerEndpointGroup.hpp"
#include "Executor.hpp"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
int
main
()
{
RdmaServerEndpointGroup
*
group
=
new
RdmaServerEndpointGroup
(
5
,
5
,
5
,
50
,
50
);
Executor
*
ex
=
new
Executor
(
4
,
group
);
group
->
setExecutor
(
ex
);
group
->
bind
(
"192.168.200.20"
,
"1921"
,
2
);
group
->
startCmProcessor
(
false
);
std
::
cout
<<
"rhdhj"
<<
std
::
endl
;
while
(
1
)
;
}
\ No newline at end of file
code/cppserver/src/TaskThread.cpp
0 → 100644
View file @
6a2d8a83
#include "TaskThread.hpp"
#include "MessageFormats.hpp"
TaskThread
::
TaskThread
(
int
id
,
int
cpu
,
ConcurrentQueue
*
taskqueue
,
RdmaEndpointGroup
*
group
)
:
_id
(
id
),
_group
(
group
)
{
_taskQueue
=
taskqueue
;
if
(
pthread_create
(
&
thread
,
NULL
,
&
TaskThread
::
run
,
this
))
{
std
::
cout
<<
"pthread create has been failed while creating taskthread "
<<
std
::
endl
;
exit
(
0
);
}
cpu_set_t
cpuset
;
CPU_ZERO
(
&
cpuset
);
CPU_SET
(
cpu
,
&
cpuset
);
if
(
pthread_setaffinity_np
(
thread
,
sizeof
(
cpu_set_t
),
&
cpuset
)
!=
0
)
{
std
::
cerr
<<
"Error calling pthread_setaffinity_np: "
<<
"
\n
"
;
}
}
TaskThread
::
TaskThread
(
int
id
,
ConcurrentQueue
*
taskqueue
,
RdmaEndpointGroup
*
group
)
:
_id
(
id
),
_group
(
group
)
{
_taskQueue
=
taskqueue
;
if
(
pthread_create
(
&
thread
,
NULL
,
&
TaskThread
::
run
,
this
))
{
std
::
cout
<<
"pthread create has been failed while creating taskthread "
<<
std
::
endl
;
exit
(
0
);
}
}
TaskThread
::~
TaskThread
()
{
std
::
cout
<<
"Task Destructed"
<<
std
::
endl
;
stop
();
}
void
TaskThread
::
stop
()
{
_stop
=
true
;
if
(
pthread_join
(
thread
,
NULL
)
==
0
)
{
std
::
cout
<<
"pthread join failed"
<<
std
::
endl
;
}
}
inline
void
*
TaskThread
::
run
(
void
*
object
)
{
TaskThread
*
thread
=
reinterpret_cast
<
TaskThread
*>
(
object
);
std
::
cout
<<
"running task thread"
<<
thread
->
_id
<<
std
::
endl
;
while
(
!
thread
->
_stop
)
{
struct
ibv_wc
*
data
=
NULL
;
data
=
thread
->
_taskQueue
->
try_pop
();
if
(
data
!=
NULL
)
{
std
::
cout
<<
"TaskThread:: got data"
<<
std
::
endl
;
thread
->
processEvent
(
data
);
thread
->
_taskQueue
->
removeFromSet
(
data
);
delete
data
;
}
}
return
NULL
;
}
void
TaskThread
::
processEvent
(
struct
ibv_wc
*
data
)
{
if
(
data
==
NULL
||
data
->
status
!=
IBV_WC_SUCCESS
)
{
std
::
cout
<<
"TaskThread : failed work completion : "
;
std
::
cout
<<
ibv_wc_status_str
(
data
->
status
)
<<
" on qp "
<<
data
->
qp_num
<<
std
::
endl
;
return
;
}
/*
* Process Request from client
*/
auto
it
=
_group
->
_qpSalEndpointMap
->
find
(
data
->
qp_num
);
if
(
it
==
_group
->
_qpSalEndpointMap
->
end
())
{
std
::
cout
<<
data
->
qp_num
<<
"RdmaSal : endpoint not registered for qp num
\n
"
;
return
;
}
// processSalCQEvent(data, it->second);
switch
(
data
->
opcode
)
{
case
IBV_WC_SEND
:
it
->
second
->
processSendCompletion
(
data
);
break
;
case
IBV_WC_RECV
:
{
// it->second->processRecvCompletion(data);
char
*
buffer
=
new
char
[
data
->
byte_len
];
memcpy
(
buffer
,
(
void
*
)
data
->
wr_id
,
data
->
byte_len
);
rdma_post_recv
(
it
->
second
->
_cm_id
,
(
void
*
)
data
->
wr_id
,
(
void
*
)
data
->
wr_id
,
it
->
second
->
_recvMsgSize
,
it
->
second
->
_recvMr
);
struct
SalRequest
*
req
=
(
struct
SalRequest
*
)
buffer
;
std
::
cout
<<
"recieve id : "
<<
req
->
id
<<
" "
<<
(
char
*
)
req
+
SalRequestHeaderSize
;
std
::
cout
<<
" "
<<
req
->
type
<<
"size"
<<
data
->
byte_len
<<
"
\n
"
;
switch
(
req
->
type
)
{
case
RequestType
:
:
GET
:
it
->
second
->
processGet
(
req
);
break
;
case
RequestType
:
:
DELETE
:
replicateSalRequest
(
buffer
,
data
->
byte_len
);
it
->
second
->
processGet
(
req
);
break
;
case
RequestType
:
:
PUT
:
replicateSalRequest
(
buffer
,
data
->
byte_len
);
it
->
second
->
processPut
(
req
);
break
;
default:
std
::
cout
<<
"SalRequest invalid req type"
;
break
;
}
delete
[]
buffer
;
}
break
;
case
IBV_WC_RDMA_WRITE
:
std
::
cout
<<
"rdma write completion
\n
"
;
break
;
case
IBV_WC_RDMA_READ
:
std
::
cout
<<
"rdma read completion
\n
"
;
break
;
default:
std
::
cout
<<
"TaskThread default opcode : "
<<
data
->
opcode
<<
std
::
endl
;
break
;
}
}
void
TaskThread
::
replicateSalRequest
(
char
*
req
,
uint32_t
size
)
{
for
(
auto
i
=
_group
->
_qpRepEndpointMap
->
begin
();
i
!=
_group
->
_qpRepEndpointMap
->
end
();
i
++
)
{
i
->
second
->
sendMessage
(
req
,
size
);
}
SalRequest
*
salReq
=
(
SalRequest
*
)
req
;
char
*
buffer
=
new
char
[
InvRequestHeaderSize
+
salReq
->
keySize
];
InvRequest
*
invRequest
=
(
InvRequest
*
)(
buffer
);
invRequest
->
type
=
RequestType
::
INVALIDATE
;
invRequest
->
id
=
salReq
->
id
;
invRequest
->
keySize
=
salReq
->
keySize
;
memcpy
(
buffer
+
12
,
salReq
+
SalRequestHeaderSize
,
salReq
->
keySize
);
for
(
auto
i
=
_group
->
_qpSalEndpointMap
->
begin
();
i
!=
_group
->
_qpSalEndpointMap
->
end
();
i
++
)
{
i
->
second
->
sendMessage
(
buffer
,
InvRequestHeaderSize
+
salReq
->
keySize
);
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment