Commit 45626f12 authored by mayankkakad's avatar mayankkakad

new server adding code done (untested)

parent 178250d5
......@@ -22,7 +22,8 @@ std::unique_ptr<Server> server;
enum RequestType {
GETADDRESS,
ADDADDRESS
ADDADDRESS,
UPDATEFINGERTABLES
};
class DNSData {
......@@ -36,7 +37,7 @@ public:
status = PROCESS;
if(reqType==GETADDRESS)
service->RequestGETADDRESS(&context, &null, &getAddressResponder, cq, cq, this);
else
else if(reqType==ADDADDRESS)
service->RequestADDADDRESS(&context, &info, &addAddressResponder, cq, cq, this);
}
else if (status == PROCESS) {
......@@ -62,7 +63,7 @@ public:
}
getAddressResponder.Finish(info,Status::OK,this);
}
else {
else if(reqType==ADDADDRESS){
ofstream fout;
fout.open(SERVERS,ios::app);
fout<<info.address()<<endl;
......@@ -70,6 +71,8 @@ public:
null.set_nothing(0);
addAddressResponder.Finish(null,Status::OK,this);
}
else {
}
status = FINISH;
}
else {
......@@ -124,6 +127,7 @@ int main(int argc,char **argv) {
cout<<"DNS SERVER COMES UP SUCCESSFULLY"<<endl;
new DNSData(&service,comp_queue.get(),GETADDRESS);
new DNSData(&service,comp_queue.get(),ADDADDRESS);
new DNSData(&service,comp_queue.get(),UPDATEFINGERTABLES);
void *tag;
bool ok;
while(true) {
......
......@@ -9,9 +9,13 @@ service KeyValueServices {
rpc PUT(KeyValue) returns (ReqStatus) {}
rpc DEL(Key) returns (ReqStatus) {}
rpc NEW(Info) returns (SuccessorInfo) {}
rpc INFORM(Info) returns (KeyValues) {}
rpc INFORMSUCCESSOR(Info) returns (KeyValues) {}
rpc INFORMPREDECESSOR(Info) returns (Null) {}
rpc GETADDRESS(Null) returns (Info) {}
rpc ADDADDRESS(Info) returns (Null) {}
rpc UPDATEFINGERTABLES(Null) returns (Null) {}
rpc GETSUCCESSOR(Id) returns (Id) {}
rpc GETPREDECESSOR(Id) returns (Id) {}
}
message Key {
......@@ -39,7 +43,8 @@ message Info {
}
message SuccessorInfo {
string address = 1;
string succaddress = 1;
string predaddress = 2;
}
message KeyValues {
......@@ -50,3 +55,7 @@ message KeyValues {
message Null {
int32 nothing = 1;
}
message Id {
int64 id = 1;
}
\ No newline at end of file
#include <bits/stdc++.h>
#include <pthread.h>
#include <fstream>
#include <grpcpp/grpcpp.h>
#include "keyvaluestore.grpc.pb.h"
#include "../Backend.h"
#define NEIGHBOURS "neighbours.txt"
#define FINGER_TABLE "fingertable.txt"
#define DNS_SERVER "0.0.0.0:1234"
using namespace std;
......@@ -23,6 +27,7 @@ using keyvaluestore::Info;
using keyvaluestore::SuccessorInfo;
using keyvaluestore::KeyValues;
using keyvaluestore::Null;
using keyvaluestore::Id;
pthread_mutex_t _masterLock;
......@@ -34,7 +39,10 @@ enum RequestType {
enum ServerRequest {
NEW,
INFORM
INFORMSUCCESSOR,
INFORMPREDECESSOR,
GETSUCCESSOR,
GETPREDECESSOR
};
map<string, string> params;
......@@ -71,7 +79,7 @@ void getConfig() {
class ServerData {
public:
ServerData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, ServerRequest reqType) : service(service), cq(cq), newResponder(&context), informResponder(&context), status(CREATE), reqType(reqType) {
ServerData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, ServerRequest reqType) : service(service), cq(cq), newResponder(&context), informSuccessorResponder(&context),informPredecessorResponder(&context),getSuccessorResponder(&context),getPredecessorResponder(&context), status(CREATE), reqType(reqType) {
Proceed();
}
......@@ -80,18 +88,221 @@ public:
status = PROCESS;
if (reqType == NEW)
service->RequestNEW(&context, &info, &newResponder, cq, cq, this);
else
service->RequestINFORM(&context, &info, &informResponder, cq, cq, this);
} else if (status == PROCESS) {
else if(reqType==INFORMSUCCESSOR)
service->RequestINFORMSUCCESSOR(&context, &info, &informSuccessorResponder, cq, cq, this);
else if(reqType==INFORMPREDECESSOR)
service->RequestINFORMPREDECESSOR(&context,&info,&informPredecessorResponder,cq,cq,this);
else if(reqType==GETSUCCESSOR)
service->RequestGETSUCCESSOR(&context,&idvar1,&getSuccessorResponder,cq,cq,this);
else if(reqType==GETPREDECESSOR)
service->RequestGETPREDECESSOR(&context,&idvar1,&getPredecessorResponder,cq,cq,this);
}
else if (status == PROCESS) {
new ServerData(service, cq, reqType);
if (reqType == NEW) {
//calculate id of node, return it's successor
string address=info.address();
int id=stoi(address.substr(address.find(':')+1));
int fingers[16];
ifstream fin;
fin.open(FINGER_TABLE);
for(int i=0;i<16;i++) {
string temp;
getline(fin,temp);
fingers[i]=stoi(temp);
}
fin.close();
int node=-1;
int next=-1;
for(int i=0;i<16;i++) {
if(i>0&&fingers[i]>fingers[i-1]) {
node=fingers[i-1];
next=fingers[i];
break;
}
if(fingers[i]>id||i==16) {
if(i==16&&fingers[i]>id) {
node=fingers[i-1];
next=fingers[i];
}
else if(i==16)
node=fingers[i];
else {
node=fingers[i-1];
next=fingers[i];
}
break;
}
}
if(next!=-1) {
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
stub->GETPREDECESSOR(&context,x,&y);
if(y.id()<id) {
successorInfo.set_succaddress("0.0.0.0:"+to_string(x.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(y.id()));
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
Id z;
string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z);
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
}
}
else {
if(node==id) {
successorInfo.set_succaddress("0.0.0.0:"+to_string(id));
successorInfo.set_predaddress("0.0.0.0:"+to_string(id));
}
string tar_address("0.0.0.0:"+to_string(node));
shared_ptr<Channel> channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext context1;
Id x,y,z;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z);
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
}
newResponder.Finish(successorInfo,Status::OK,this);
}
else if(reqType==INFORMSUCCESSOR){
//return half of the keyvalue pairs to the requesting node
string address=info.address();
int id=stoi(address.substr(address.find(':')+1));
ifstream fin;
fin.open(NEIGHBOURS);
string successor,predecessor;
getline(fin,successor);
getline(fin,predecessor);
fin.close();
predecessor=info.address();
ofstream fout;
fout.open(NEIGHBOURS);
fout<<successor<<endl;
fout<<predecessor<<endl;
fout.close();
null.set_nothing(0);
informSuccessorResponder.Finish(keyValues,Status::OK,this);
}
else if(reqType==GETSUCCESSOR) {
int idtofind=idvar1.id();
int fingers[16];
ifstream fin;
fin.open(FINGER_TABLE);
for(int i=0;i<16;i++) {
string temp;
getline(fin,temp);
fingers[i]=stoi(temp);
}
fin.close();
int node=-1;
int next=-1;
for(int i=0;i<16;i++) {
if(i>0&&fingers[i]>fingers[i-1]) {
node=fingers[i-1];
next=fingers[i];
break;
}
if(fingers[i]>idtofind||i==16) {
if(i==16&&fingers[i]>idtofind) {
node=fingers[i-1];
next=fingers[i];
}
else if(i==16)
node=fingers[i];
else {
node=fingers[i-1];
next=fingers[i];
}
break;
}
}
if(next!=-1) {
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
stub->GETPREDECESSOR(&context,x,&y);
if(y.id()<idtofind) {
idvar2.set_id(y.id());
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(idtofind);
stub->GETSUCCESSOR(&context1,x,&y);
idvar2.set_id(y.id());
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
shared_ptr<Channel> channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext context1;
Id x,y;
x.set_id(idtofind);
stub->GETSUCCESSOR(&context1,x,&y);
idvar2.set_id(y.id());
}
getSuccessorResponder.Finish(idvar2,Status::OK,this);
}
else if(reqType==GETPREDECESSOR) {
ifstream fin;
fin.open(NEIGHBOURS);
string successor,predecessor;
getline(fin,successor);
getline(fin,predecessor);
fin.close();
idvar2.set_id(stoi(predecessor.substr(predecessor.find(':')+1)));
getPredecessorResponder.Finish(idvar2,Status::OK,this);
}
else {
ifstream fin;
fin.open(NEIGHBOURS);
string successor,predecessor;
getline(fin,successor);
getline(fin,predecessor);
fin.close();
successor=info.address();
ofstream fout;
fout.open(NEIGHBOURS);
fout<<successor<<endl;
fout<<predecessor<<endl;
fout.close();
null.set_nothing(0);
informPredecessorResponder.Finish(null,Status::OK,this);
}
status = FINISH;
} else {
}
else {
GPR_ASSERT(status == FINISH);
delete this;
}
......@@ -103,9 +314,15 @@ private:
ServerContext context;
Info info;
SuccessorInfo successorInfo;
Null null;
KeyValues keyValues;
Id idvar1;
Id idvar2;
ServerAsyncResponseWriter<SuccessorInfo> newResponder;
ServerAsyncResponseWriter<KeyValues> informResponder;
ServerAsyncResponseWriter<KeyValues> informSuccessorResponder;
ServerAsyncResponseWriter<Null> informPredecessorResponder;
ServerAsyncResponseWriter<Id> getSuccessorResponder;
ServerAsyncResponseWriter<Id> getPredecessorResponder;
enum CallStatus {
CREATE,
PROCESS,
......@@ -217,7 +434,10 @@ void *handleServerRequests(void *thread_id) {
pthread_cond_wait(&startRpcs,&myLock);
pthread_mutex_unlock(&myLock);
new ServerData(&service,comp_queue.get(),NEW);
new ServerData(&service,comp_queue.get(),INFORM);
new ServerData(&service,comp_queue.get(),INFORMSUCCESSOR);
new ServerData(&service,comp_queue.get(),INFORMPREDECESSOR);
new ServerData(&service,comp_queue.get(),GETSUCCESSOR);
new ServerData(&service,comp_queue.get(),GETPREDECESSOR);
void *tag;
bool ok;
while(true) {
......@@ -266,8 +486,20 @@ void signalHandler(int signum) {
exit(0);
}
void updateAllFingerTables() {
string target_address(DNS_SERVER);
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
Null null1;
null1.set_nothing(0);
Null null2;
ClientContext context;
Status status=stub->UPDATEFINGERTABLES(&context,null1,&null2);
}
void register_server_DNS(string my_address) {
string target_address("0.0.0.0:1234");
string target_address(DNS_SERVER);
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
......@@ -282,8 +514,18 @@ void register_server_DNS(string my_address) {
info.set_address(my_address);
ClientContext new_context;
stub->ADDADDRESS(&new_context,info,&null);
if(old_server=="null")
ofstream fout;
fout.open(FINGER_TABLE);
for(int i=0;i<16;i++)
fout<<my_address.substr(my_address.find(':')+1)<<endl;
fout.close();
if(old_server=="null") {
fout.open(NEIGHBOURS);
fout<<"-1"<<endl;
fout<<"-1"<<endl;
fout.close();
return;
}
channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
info.set_address(my_address);
......@@ -291,13 +533,25 @@ void register_server_DNS(string my_address) {
ClientContext context1;
status=stub->NEW(&context1,info,&successorInfo);
if(status.ok()) {
string successor=successorInfo.address();
string successor=successorInfo.succaddress();
string predecessor=successorInfo.predaddress();
ofstream fout;
fout.open(NEIGHBOURS);
fout<<successor<<endl;
fout<<predecessor<<endl;
fout.close();
channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
info.set_address(my_address);
KeyValues keyValues;
ClientContext context2;
status=stub->INFORM(&context2,info,&keyValues);
status=stub->INFORMSUCCESSOR(&context2,info,&keyValues);
channel=grpc::CreateChannel(predecessor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
info.set_address(my_address);
ClientContext context3;
status=stub->INFORMPREDECESSOR(&context3,info,&null);
updateAllFingerTables();
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment