Commit 94b24420 authored by Paras Garg's avatar Paras Garg

changed message formats combined to single header for all message type

parent 92761964
...@@ -14,7 +14,6 @@ to do ...@@ -14,7 +14,6 @@ to do
delete client endpoint on close delete client endpoint on close
threading in client and hashing in client threading in client and hashing in client
resolve double delete for get and put
add cache add support for invalidation add cache add support for invalidation
interface client API through endpointGroup interface client API through endpointGroup
......
...@@ -16,15 +16,24 @@ import java.util.concurrent.ConcurrentMap; ...@@ -16,15 +16,24 @@ import java.util.concurrent.ConcurrentMap;
public class HpdosClient extends DB { public class HpdosClient extends DB {
//private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBClient.class); // private static final Logger LOGGER =
// LoggerFactory.getLogger(RocksDBClient.class);
public HpdosClient() { public HpdosClient() {
} }
long endpointGroup;
public void init() throws DBException { public void init() throws DBException {
Properties prop = getProperties(); Properties prop = getProperties();
prop.forEach((k, v) -> System.out.println(k + "" + v)); prop.forEach((k, v) -> System.out.println(k + "" + v));
System.out.println("init hpdos"); System.out.println("init hpdos");
synchronized (HpdosClient.class) {
if (endpointGroup == 0)
endpointGroup++;
}
System.out.println("endpoit" + endpointGroup);
} }
public void cleanup() throws DBException { public void cleanup() throws DBException {
......
#ifndef __MessageFormats__ #ifndef __MessageFormats__
#define __MessageFormats__ #define __MessageFormats__
enum RequestType enum MessageType
{ {
GET, GET = (1u << 0),
PUT, PUT = (1u << 1),
DELETE DELETE = (1u << 2),
}; INVALIDATE = (1u << 3),
enum ResponseStatus SUCCESS = (1u << 4),
{ FAILURE = (1u <<5)
SUCCESS,
FAILURE,
INVALIDATE
}; };
struct __attribute__ ((__packed__)) SalRequestHeader struct __attribute__ ((__packed__)) MessageHeader
{ {
uint32_t id; uint32_t id;
enum RequestType type; enum MessageType type;
uint32_t keySize; uint32_t keySize;
uint32_t valueSize; uint32_t valueSize;
}; };
static const uint32_t MessageHeaderSize = sizeof(MessageHeader);
/*
struct __attribute__ ((__packed__)) SalResponseHeader struct __attribute__ ((__packed__)) SalResponseHeader
{ {
uint32_t id; uint32_t id;
enum ResponseStatus status; enum MessageType status;
/* //Note value will be present only in case of response status is success
* Note value will be present only in case of response status is success
*/
uint32_t valueSize; uint32_t valueSize;
}; };
struct __attribute__ ((__packed__)) InvRequestHeader struct __attribute__ ((__packed__)) InvRequestHeader
{ {
uint32_t id; uint32_t id;
enum ResponseStatus type; enum MessageType type;
uint32_t keySize; uint32_t keySize;
}; };
...@@ -42,11 +40,12 @@ struct __attribute__ ((__packed__)) InvRequestHeader ...@@ -42,11 +40,12 @@ struct __attribute__ ((__packed__)) InvRequestHeader
struct __attribute__ ((__packed__)) InvResponseHeader struct __attribute__ ((__packed__)) InvResponseHeader
{ {
uint32_t id; uint32_t id;
enum ResponseStatus status; enum MessageType status; MessageHeader
}; };
static uint32_t SalRequestHeaderSize = sizeof(SalRequestHeader); */
/*
static uint32_t SalResponseHeaderSize = sizeof(SalResponseHeader); static uint32_t SalResponseHeaderSize = sizeof(SalResponseHeader);
static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader); static uint32_t InvRequestHeaderSize = sizeof(InvRequestHeader);
static uint32_t InvResponseHeaderSize = sizeof(InvResponseHeader); static uint32_t InvResponseHeaderSize = sizeof(InvResponseHeader);
*/
#endif #endif
\ No newline at end of file
...@@ -23,8 +23,9 @@ class RdmaClientEndpointGroup : public RdmaEndpointGroup ...@@ -23,8 +23,9 @@ class RdmaClientEndpointGroup : public RdmaEndpointGroup
int _compQueueSize; int _compQueueSize;
int _sendMsgSize; int _sendMsgSize;
int _recvMsgSize; int _recvMsgSize;
int _timeoutMs;
int _maxInLine; int _maxInLine;
int _timeoutMs;
public: public:
RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize, RdmaClientEndpointGroup(int sendQueueSize, int recvQueueSize, int compQueueSize, int sendMsgSize,
......
...@@ -5,12 +5,15 @@ ...@@ -5,12 +5,15 @@
#include <atomic> #include <atomic>
#include <iostream> #include <iostream>
#include <mutex> #include <mutex>
#include <condition_variable>
class RdmaFuture class RdmaFuture
{ {
static int DONE; static int DONE;
static int PENDING; static int PENDING;
uint64_t _requestId; uint64_t _requestId;
std::mutex stateMutex; std::mutex stateMutex;
std::condition_variable stateCv;
uint8_t state{0}; uint8_t state{0};
char *_data; char *_data;
uint8_t _status; uint8_t _status;
......
#include <iostream> #include <iostream>
#include "RdmaClientEndpointGroup.hpp" #include "RdmaClientEndpointGroup.hpp"
#include "MessageFormats.hpp" #include "MessageFormats.hpp"
#include "Properties.cpp"
int main() int main()
{ {
Properties prop("prop.config");
int sendQS = stoi(prop.getValue("sendQS"));
int recvQS = stoi(prop.getValue("recvQS"));
int compQS = stoi(prop.getValue("compQS"));
int sendMS = stoi(prop.getValue("sendMS"));
int recvMS = stoi(prop.getValue("recvMS"));
RdmaClientEndpointGroup *group = new RdmaClientEndpointGroup(5, 5, 5, 500, 500, 0, 1000); RdmaClientEndpointGroup *group = new RdmaClientEndpointGroup(sendQS, recvQS, compQS, sendMS, recvMS, 0, 1000);
RdmaClientEndpoint *clientEp = group->createEndpoint(); RdmaClientEndpoint *clientEp = group->createEndpoint();
clientEp->connect("192.168.200.20", "1921", "sal"); clientEp->connect("192.168.200.20", "1921", "sal");
while (!clientEp->isConnected()); while (!clientEp->isConnected())
std::cout << "client : connected" << std::endl; ;
std::cout << "client : connected" << std::endl;
auto r1 =clientEp->put("paras",5,"garg",4); auto r1 = clientEp->put("paras", 5, "garg", 4);
auto r2 = clientEp->get("paras",5); auto r2 = clientEp->get("paras", 5);
auto r3 = clientEp->deleteKey("paras",5); auto r3 = clientEp->deleteKey("paras", 5);
std::cout<<"waiting for output"<<std::endl; std::cout << "waiting for output" << std::endl;
if (r3 != nullptr)
{
r3->get(); r3->get();
delete r3; delete r3;
struct SalResponseHeader *response = (struct SalResponseHeader *)r1->get(); }
std::cout << "Recieve response for id " << response->id << " status " << response->status; if (r1 != nullptr)
{
if(r1->get() == nullptr)
std::cout<<"nd"<<std::endl;
struct MessageHeader *response = (struct MessageHeader *)r1->get();
std::cout << "Recieve response for id " << response->id << " type " << response->type;
std::cout << " size " << response->valueSize << std::endl; std::cout << " size " << response->valueSize << std::endl;
delete r1; delete r1;
response = (struct SalResponseHeader *)r2->get(); }
std::cout << "Recieve response for id " << response->id << " status " << response->status; if (r2 != nullptr)
{
if(r2->get() == nullptr)
std::cout<<"nd"<<std::endl;
struct MessageHeader *response = (struct MessageHeader *)r2->get();
std::cout << "Recieve response for id " << response->id << " status " << response->type;
std::cout << " size " << response->valueSize << std::endl; std::cout << " size " << response->valueSize << std::endl;
delete r2; delete r2;
}
/*char *message = new char[100]; /*char *message = new char[100];
struct SalRequestHeader *request = (struct SalRequestHeader *)message; struct SalRequestHeader *request = (struct SalRequestHeader *)message;
request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed); request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed);
request->type = RequestType::PUT; request->type = RequestType::PUT;
request->keySize = 10; request->keySize = 10;
request->valueSize = 5; request->valueSize = 5;
memcpy((char *)request + SalRequestHeaderSize, "parasgarg1", 10); memcpy((char *)request + SalRequestHeaderSize, "parasgarg1", 10);
memcpy((char *)request + SalRequestHeaderSize + request->keySize, "paras", 5); memcpy((char *)request + SalRequestHeaderSize + request->keySize, "paras", 5);
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl; std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl; std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl; std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed); request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed);
request->type = RequestType::GET; request->type = RequestType::GET;
request->keySize = 10; request->keySize = 10;
memcpy((char *)request + SalRequestHeaderSize, "parasgarg1", 10); memcpy((char *)request + SalRequestHeaderSize, "parasgarg1", 10);
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl; std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl; std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed); request->id = clientEp->_requestId.fetch_add(1, std::memory_order_relaxed);
request->type = RequestType::DELETE; request->type = RequestType::DELETE;
request->keySize = 10; request->keySize = 10;
memcpy((char *)request + SalRequestHeaderSize, "parasgarg1", 10); memcpy((char *)request + SalRequestHeaderSize, "parasgarg1", 10);
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
std::cout << "send" << clientEp->sendMessage(message, SalRequestHeaderSize + 14) << std::endl;
*/ */
// memcpy(re->value,"aa",2); // memcpy(re->value,"aa",2);
int send = 0; int send = 0;
while (1) while (1)
{
std::cin >> send;
if (send == 1)
{ {
std::cin >> send;
if (send == 1)
{
// std::cout << "send" << clientEp->sendMessage(message, 10) << std::endl; // std::cout << "send" << clientEp->sendMessage(message, 10) << std::endl;
}
} }
}
} }
\ No newline at end of file
...@@ -68,16 +68,16 @@ JNIEXPORT jbyteArray JNICALL Java_JClient_get(JNIEnv *jenv, jobject jobj, jlong ...@@ -68,16 +68,16 @@ JNIEXPORT jbyteArray JNICALL Java_JClient_get(JNIEnv *jenv, jobject jobj, jlong
} }
auto data = future->get(); auto data = future->get();
delete future; delete future;
struct SalResponseHeader *response = (struct SalResponseHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
if (response->status == ResponseStatus::FAILURE) if (response->type == MessageType::FAILURE)
{ {
jbyteArray jvalue = jenv->NewByteArray(0); jbyteArray jvalue = jenv->NewByteArray(0);
return jvalue; return jvalue;
} }
jbyteArray jvalue = jenv->NewByteArray(response->valueSize); jbyteArray jvalue = jenv->NewByteArray(response->valueSize);
std::cout<<"value size "<<response->valueSize<<" "<< (data+SalResponseHeaderSize); std::cout << "value size " << response->valueSize << " " << (data + MessageHeaderSize);
jenv->SetByteArrayRegion(jvalue, 0, response->valueSize - 1, jenv->SetByteArrayRegion(jvalue, 0, response->valueSize - 1,
reinterpret_cast<const jbyte *>(data + SalResponseHeaderSize)); reinterpret_cast<const jbyte *>(data + MessageHeaderSize));
return jvalue; return jvalue;
} }
...@@ -95,7 +95,7 @@ JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclien ...@@ -95,7 +95,7 @@ JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclien
int valLen = jenv->GetArrayLength(jval); int valLen = jenv->GetArrayLength(jval);
char *val = new char[valLen]; char *val = new char[valLen];
jenv->GetByteArrayRegion(jval, 0, valLen, reinterpret_cast<jbyte *>(val)); jenv->GetByteArrayRegion(jval, 0, valLen, reinterpret_cast<jbyte *>(val));
std::cout<<"put got "<<val<<" len "<<valLen; std::cout << "put got " << val << " len " << valLen;
if (jenv->ExceptionCheck()) if (jenv->ExceptionCheck())
{ {
CPPLog::LOG_ERROR("exception occurs in jni put"); CPPLog::LOG_ERROR("exception occurs in jni put");
...@@ -113,8 +113,8 @@ JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclien ...@@ -113,8 +113,8 @@ JNIEXPORT jint JNICALL Java_JClient_put(JNIEnv *jenv, jobject jobj, jlong jclien
} }
auto data = future->get(); auto data = future->get();
delete future; delete future;
struct SalResponseHeader *response = (struct SalResponseHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
if (response->status == ResponseStatus::FAILURE) if (response->type == MessageType::FAILURE)
{ {
return 0; return 0;
} }
...@@ -142,8 +142,8 @@ JNIEXPORT jint JNICALL Java_JClient_delete(JNIEnv *jenv, jobject jobj, jlong jcl ...@@ -142,8 +142,8 @@ JNIEXPORT jint JNICALL Java_JClient_delete(JNIEnv *jenv, jobject jobj, jlong jcl
} }
auto data = future->get(); auto data = future->get();
delete future; delete future;
struct SalResponseHeader *response = (struct SalResponseHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
if (response->status == ResponseStatus::FAILURE) if (response->type == MessageType::FAILURE)
{ {
return 0; return 0;
} }
......
...@@ -97,8 +97,8 @@ void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event) ...@@ -97,8 +97,8 @@ void RdmaClientEndpoint::processCmEvent(struct rdma_cm_event *event)
else else
{ {
std::ostringstream ss; std::ostringstream ss;
ss << "RdmaClientEndpoint : Not able to procces CM EVent"; ss << "RdmaClientEndpoint : Not able to procces CM EVent ";
ss << rdma_event_str(event->event) << event->id << " " << event->listen_id; ss << rdma_event_str(event->event) <<" "<< event->id << " " << event->listen_id;
CPPLog::LOG_ERROR(ss); CPPLog::LOG_ERROR(ss);
} }
} }
...@@ -262,14 +262,13 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc) ...@@ -262,14 +262,13 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc)
memcpy(data, (void *)wc.wr_id, wc.byte_len); memcpy(data, (void *)wc.wr_id, wc.byte_len);
rdma_post_recv(_cm_id, (void *)wc.wr_id, (void *)wc.wr_id, _recvMsgSize, _recvMr); rdma_post_recv(_cm_id, (void *)wc.wr_id, (void *)wc.wr_id, _recvMsgSize, _recvMr);
struct SalResponseHeader *response = (struct SalResponseHeader *)data; struct MessageHeader *response = (struct MessageHeader *)data;
/*std::ostringstream ss; /*std::ostringstream ss;
ss << "Recieve response for id " << response->id << " status " << response->status; ss << "Recieve response for id " << response->id << " status " << response->status;
ss << " size " << response->valueSize << std::endl; ss << " size " << response->valueSize << std::endl;
CPPLog::LOG_INFO(ss);*/ CPPLog::LOG_INFO(ss);*/
if (response->status != ResponseStatus::INVALIDATE) if (response->type != MessageType::INVALIDATE)
{ {
std::cout<<"f"<<response->status<<std::endl;
auto it = futures.find(response->id); auto it = futures.find(response->id);
if (it == futures.end()) if (it == futures.end())
{ {
...@@ -283,7 +282,7 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc) ...@@ -283,7 +282,7 @@ void RdmaClientEndpoint::processRecvComp(struct ibv_wc wc)
RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *value, int valueSize) RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *value, int valueSize)
{ {
if (keySize + valueSize + (int)SalRequestHeaderSize > _sendMsgSize) if (keySize + valueSize + (int)MessageHeaderSize > _sendMsgSize)
return nullptr; return nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
...@@ -291,14 +290,14 @@ RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *va ...@@ -291,14 +290,14 @@ RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *va
char *sendBuffer = _sendBuffers.front(); char *sendBuffer = _sendBuffers.front();
_sendBuffers.pop(); _sendBuffers.pop();
lock.unlock(); lock.unlock();
struct SalRequestHeader *request = (struct SalRequestHeader *)sendBuffer; struct MessageHeader *request = (struct MessageHeader *)sendBuffer;
request->id = _requestId.fetch_add(1, std::memory_order_relaxed); request->id = _requestId.fetch_add(1, std::memory_order_relaxed);
request->type = RequestType::PUT; request->type = MessageType::PUT;
request->keySize = keySize; request->keySize = keySize;
request->valueSize = valueSize; request->valueSize = valueSize;
memcpy(sendBuffer + SalRequestHeaderSize, key, keySize); memcpy(sendBuffer + MessageHeaderSize, key, keySize);
memcpy(sendBuffer + SalRequestHeaderSize + keySize, value, valueSize); memcpy(sendBuffer + MessageHeaderSize + keySize, value, valueSize);
rdma_post_send(_cm_id, sendBuffer, sendBuffer, SalRequestHeaderSize + keySize + valueSize, rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize + valueSize,
_sendMr, 0); _sendMr, 0);
RdmaFuture *future = new RdmaFuture(request->id); RdmaFuture *future = new RdmaFuture(request->id);
futures[request->id] = future; futures[request->id] = future;
...@@ -306,7 +305,7 @@ RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *va ...@@ -306,7 +305,7 @@ RdmaFuture *RdmaClientEndpoint::put(const char *key, int keySize, const char *va
} }
RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize) RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize)
{ {
if (keySize + (int)SalRequestHeaderSize > _sendMsgSize) if (keySize + (int)MessageHeaderSize > _sendMsgSize)
return nullptr; return nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
...@@ -314,12 +313,12 @@ RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize) ...@@ -314,12 +313,12 @@ RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize)
char *sendBuffer = _sendBuffers.front(); char *sendBuffer = _sendBuffers.front();
_sendBuffers.pop(); _sendBuffers.pop();
lock.unlock(); lock.unlock();
struct SalRequestHeader *request = (struct SalRequestHeader *)sendBuffer; struct MessageHeader *request = (struct MessageHeader *)sendBuffer;
request->id = _requestId.fetch_add(1, std::memory_order_relaxed); request->id = _requestId.fetch_add(1, std::memory_order_relaxed);
request->type = RequestType::GET; request->type = MessageType::GET;
request->keySize = keySize; request->keySize = keySize;
memcpy(sendBuffer + SalRequestHeaderSize, key, keySize); memcpy(sendBuffer + MessageHeaderSize, key, keySize);
rdma_post_send(_cm_id, sendBuffer, sendBuffer, SalRequestHeaderSize + keySize, rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize,
_sendMr, 0); _sendMr, 0);
RdmaFuture *future = new RdmaFuture(request->id); RdmaFuture *future = new RdmaFuture(request->id);
futures[request->id] = future; futures[request->id] = future;
...@@ -327,7 +326,7 @@ RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize) ...@@ -327,7 +326,7 @@ RdmaFuture *RdmaClientEndpoint::get(const char *key, int keySize)
} }
RdmaFuture *RdmaClientEndpoint::deleteKey(const char *key, int keySize) RdmaFuture *RdmaClientEndpoint::deleteKey(const char *key, int keySize)
{ {
if (keySize + (int)SalRequestHeaderSize > _sendMsgSize) if (keySize + (int)MessageHeaderSize > _sendMsgSize)
return nullptr; return nullptr;
std::unique_lock<std::mutex> lock(_sendBuffersM); std::unique_lock<std::mutex> lock(_sendBuffersM);
if (_sendBuffers.size() == 0) if (_sendBuffers.size() == 0)
...@@ -335,12 +334,12 @@ RdmaFuture *RdmaClientEndpoint::deleteKey(const char *key, int keySize) ...@@ -335,12 +334,12 @@ RdmaFuture *RdmaClientEndpoint::deleteKey(const char *key, int keySize)
char *sendBuffer = _sendBuffers.front(); char *sendBuffer = _sendBuffers.front();
_sendBuffers.pop(); _sendBuffers.pop();
lock.unlock(); lock.unlock();
struct SalRequestHeader *request = (struct SalRequestHeader *)sendBuffer; struct MessageHeader *request = (struct MessageHeader *)sendBuffer;
request->id = _requestId.fetch_add(1, std::memory_order_relaxed); request->id = _requestId.fetch_add(1, std::memory_order_relaxed);
request->type = RequestType::DELETE; request->type = MessageType::DELETE;
request->keySize = keySize; request->keySize = keySize;
memcpy(sendBuffer + SalRequestHeaderSize, key, keySize); memcpy(sendBuffer + MessageHeaderSize, key, keySize);
rdma_post_send(_cm_id, sendBuffer, sendBuffer, SalRequestHeaderSize + keySize, rdma_post_send(_cm_id, sendBuffer, sendBuffer, MessageHeaderSize + keySize,
_sendMr, 0); _sendMr, 0);
RdmaFuture *future = new RdmaFuture(request->id); RdmaFuture *future = new RdmaFuture(request->id);
futures[request->id] = future; futures[request->id] = future;
......
...@@ -7,16 +7,16 @@ RdmaFuture::RdmaFuture(uint64_t id) ...@@ -7,16 +7,16 @@ RdmaFuture::RdmaFuture(uint64_t id)
char *RdmaFuture::get() char *RdmaFuture::get()
{ {
//std::cout << (unsigned)state << std::endl; // std::cout << (unsigned)state << std::endl;
int current = 0; std::unique_lock<std::mutex> lock(stateMutex);
do while(state!= DONE)
{ {
std::unique_lock<std::mutex> lock(stateMutex); stateCv.wait(lock);
current = state; }
// [this](){return state!=DONE;});
lock.unlock(); //lock.unlock();
} while (current != DONE); // stateCv.wait(stateMutex, [](state != DONE;));
//std::cout<<"get"<<std::endl; // std::cout<<"get"<<std::endl;
return _data; return _data;
} }
char *RdmaFuture::wait_for(int timeout) char *RdmaFuture::wait_for(int timeout)
...@@ -25,14 +25,18 @@ char *RdmaFuture::wait_for(int timeout) ...@@ -25,14 +25,18 @@ char *RdmaFuture::wait_for(int timeout)
if (state == DONE) if (state == DONE)
return _data; return _data;
lock.unlock(); lock.unlock();
//add wait logic // add wait logic
return nullptr; return nullptr;
} }
void RdmaFuture::put(char *data) void RdmaFuture::put(char *data)
{ {
_data = data;
//std::cout << "got data current state" <<data<< (unsigned)state;
std::unique_lock<std::mutex> lock(stateMutex); std::unique_lock<std::mutex> lock(stateMutex);
state = DONE; _data = data;
//std::cout << "updated" << (unsigned)state; state = DONE;
lock.unlock();
stateCv.notify_one();
// std::cout << "got data current state" <<data<< (unsigned)state;
// std::unique_lock<std::mutex> lock(stateMutex);
// std::cout << "updated" << (unsigned)state;
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment