Commit cb2baf3d authored by mayankkakad's avatar mayankkakad

dns server created, new RPCs added, server.cpp modified for new RPCs (just a template)

parent 40295e4f
#include <bits/stdc++.h>
#include <grpcpp/grpcpp.h>
#include "keyvaluestore.grpc.pb.h"
using namespace std;
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using keyvaluestore::KeyValueServices;
using keyvaluestore::Info;
using keyvaluestore::Null;
ServerBuilder builder;
KeyValueServices::AsyncService service;
std::unique_ptr<Server> server;
class DNSData {
public:
DNSData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq) : service(service), cq(cq), getAddressResponder(&context), status(CREATE) {
Proceed();
}
void Proceed() {
if (status == CREATE) {
status = PROCESS;
service->RequestGETADDRESS(&context, &null, &getAddressResponder, cq, cq, this);
}
else if (status == PROCESS) {
new DNSData(service, cq);
status = FINISH;
}
else {
GPR_ASSERT(status == FINISH);
delete this;
}
}
private:
KeyValueServices::AsyncService *service;
ServerCompletionQueue *cq;
ServerContext context;
Null null;
Info info;
ServerAsyncResponseWriter<Info> getAddressResponder;
enum CallStatus {
CREATE,
PROCESS,
FINISH
};
CallStatus status;
};
int main(int argc,char **argv) {
string server_address("0.0.0.0:1234");
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
unique_ptr<ServerCompletionQueue> comp_queue=builder.AddCompletionQueue();
server = builder.BuildAndStart();
new DNSData(&service,comp_queue.get());
void *tag;
bool ok;
while(true) {
GPR_ASSERT(comp_queue->Next(&tag,&ok));
GPR_ASSERT(ok);
static_cast<DNSData *>(tag)->Proceed();
}
}
\ No newline at end of file
......@@ -8,9 +8,11 @@ service KeyValueServices {
rpc GET(Key) returns (Value) {}
rpc PUT(KeyValue) returns (ReqStatus) {}
rpc DEL(Key) returns (ReqStatus) {}
rpc NEW(Info) returns (SuccessorInfo) {}
rpc INFORM(Info) returns (KeyValues) {}
rpc GETADDRESS(Null) returns (Info) {}
}
message Key {
string key = 1;
}
......@@ -30,3 +32,20 @@ message ReqStatus {
int32 status = 1;
string error = 2;
}
message Info {
string address = 1;
}
message SuccessorInfo {
string address = 1;
}
message KeyValues {
repeated string keys = 1;
repeated string values = 2;
}
message Null {
int32 nothing = 1;
}
\ No newline at end of file
......@@ -17,6 +17,9 @@ using keyvaluestore::KeyValue;
using keyvaluestore::KeyValueServices;
using keyvaluestore::ReqStatus;
using keyvaluestore::Value;
using keyvaluestore::Info;
using keyvaluestore::SuccessorInfo;
using keyvaluestore::KeyValues;
pthread_mutex_t _masterLock;
......@@ -26,6 +29,11 @@ enum RequestType {
DEL
};
enum ServerRequest {
NEW,
INFORM
};
map<string, string> params;
string config_filename = "../config";
string log_file = "../log";
......@@ -37,6 +45,9 @@ pthread_t *workers;
int *worker_id;
pthread_cond_t startRpcs;
pthread_mutex_t myLock;
pthread_t dist_worker;
int dist_worker_id;
bool start;
memoryManagement *memManager;
......@@ -55,6 +66,52 @@ void getConfig() {
config.close();
}
class ServerData {
public:
ServerData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), newResponder(&context), informResponder(&context), status(CREATE), reqType(reqType) {
Proceed();
}
void Proceed() {
if (status == CREATE) {
status = PROCESS;
if (reqType == NEW)
service->RequestNEW(&context, &info, &newResponder, cq, cq, this);
else
service->RequestINFORM(&context, &info, &informResponder, cq, cq, this);
} else if (status == PROCESS) {
new ServerData(service, cq, reqType);
if (reqType == NEW) {
//calculate id of node, return it's successor
}
else {
//return half of the keyvalue pairs to the requesting node
}
status = FINISH;
} else {
GPR_ASSERT(status == FINISH);
delete this;
}
}
private:
KeyValueServices::AsyncService *service;
ServerCompletionQueue *cq;
ServerContext context;
Info info;
SuccessorInfo successorInfo;
KeyValues keyValues;
ServerAsyncResponseWriter<SuccessorInfo> newResponder;
ServerAsyncResponseWriter<KeyValues> informResponder;
enum CallStatus {
CREATE,
PROCESS,
FINISH
};
CallStatus status;
ServerRequest reqType;
};
class CallData {
public:
CallData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getResponder(&context), putResponder(&context), delResponder(&context), status(CREATE), reqType(reqType) {
......@@ -83,6 +140,7 @@ public:
if (status == 200)
value.set_status(200);
else {
//as this server does not have the keyvalue pair, send request to next server using CHORD
value.set_status(400);
value.set_error(v);
}
......@@ -91,6 +149,7 @@ public:
} else if (reqType == PUT) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl;
pthread_mutex_lock(&_masterLock);
memManager->put(keyvalue.key(), keyvalue.value());
pthread_mutex_unlock(&_masterLock);
......@@ -108,6 +167,7 @@ public:
if (status == 200)
stat.set_status(200);
else {
//as this server does not have the keyvalue pair, send request to next server using CHORD
stat.set_status(400);
stat.set_error("KEY NOT EXIST");
}
......@@ -147,6 +207,24 @@ void setupServer(string server_address) {
builder.RegisterService(&service);
}
void *handleServerRequests(void *thread_id) {
unique_ptr<ServerCompletionQueue> comp_queue=builder.AddCompletionQueue();
pthread_mutex_lock(&myLock);
while(!start)
pthread_cond_wait(&startRpcs,&myLock);
pthread_mutex_unlock(&myLock);
new ServerData(&service,comp_queue.get(),NEW);
new ServerData(&service,comp_queue.get(),INFORM);
void *tag;
bool ok;
while(true) {
GPR_ASSERT(comp_queue->Next(&tag,&ok));
GPR_ASSERT(ok);
static_cast<ServerData *>(tag)->Proceed();
}
return 0;
}
void *handleRpcs(void *thread_id) {
unique_ptr<ServerCompletionQueue> comp_queue = builder.AddCompletionQueue();
pthread_mutex_lock(&myLock);
......@@ -168,6 +246,8 @@ void *handleRpcs(void *thread_id) {
}
void assignThreads(int num_threads) {
dist_worker_id=-1;
pthread_create(&dist_worker,NULL,handleServerRequests,(void *)&dist_worker_id);
workers = (pthread_t *)malloc(sizeof(pthread_t) * num_threads);
worker_id = (int *)malloc(sizeof(int) * num_threads);
for (int i = 0; i < num_threads; i++) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment