Commit 8ac3f93b authored by mayankkakad's avatar mayankkakad

adding new server tested and corrected

parent 45626f12
...@@ -69,6 +69,7 @@ public: ...@@ -69,6 +69,7 @@ public:
fout<<info.address()<<endl; fout<<info.address()<<endl;
fout.close(); fout.close();
null.set_nothing(0); null.set_nothing(0);
cout<<info.address()<<endl;
addAddressResponder.Finish(null,Status::OK,this); addAddressResponder.Finish(null,Status::OK,this);
} }
else { else {
...@@ -76,30 +77,11 @@ public: ...@@ -76,30 +77,11 @@ public:
status = FINISH; status = FINISH;
} }
else { else {
if(reqType==ADDADDRESS)
print_servers_list();
GPR_ASSERT(status == FINISH); GPR_ASSERT(status == FINISH);
delete this; delete this;
} }
} }
void print_servers_list() {
ifstream fin;
map<int,string> servers;
int size=0;
fin.open(SERVERS);
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
servers[size++]=temp;
}while(fin);
fin.close();
for(int i=0;i<size;i++)
cout<<servers.find(i)->second<<endl;
}
private: private:
KeyValueServices::AsyncService *service; KeyValueServices::AsyncService *service;
ServerCompletionQueue *cq; ServerCompletionQueue *cq;
......
...@@ -100,11 +100,13 @@ public: ...@@ -100,11 +100,13 @@ public:
else if (status == PROCESS) { else if (status == PROCESS) {
new ServerData(service, cq, reqType); new ServerData(service, cq, reqType);
if (reqType == NEW) { if (reqType == NEW) {
//calculate id of node, return it's successor //cout<<"New Server to join:"<<info.address()<<endl;
//calculate id of node, return it's successor and predecessor
string address=info.address(); string address=info.address();
int id=stoi(address.substr(address.find(':')+1)); int id=stoi(address.substr(address.find(':')+1));
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
for(int i=0;i<16;i++) { for(int i=0;i<16;i++) {
string temp; string temp;
...@@ -114,18 +116,19 @@ public: ...@@ -114,18 +116,19 @@ public:
fin.close(); fin.close();
int node=-1; int node=-1;
int next=-1; int next=-1;
//cout<<"Looking for the possible successor and predecessor"<<endl;
for(int i=0;i<16;i++) { for(int i=0;i<16;i++) {
if(i>0&&fingers[i]>fingers[i-1]) { if(i>0&&fingers[i]<fingers[i-1]) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
break; break;
} }
if(fingers[i]>id||i==16) { if(fingers[i]>id||i==15) {
if(i==16&&fingers[i]>id) { if(i==15&&fingers[i]>id) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
} }
else if(i==16) else if(i==15)
node=fingers[i]; node=fingers[i];
else { else {
node=fingers[i-1]; node=fingers[i-1];
...@@ -143,52 +146,91 @@ public: ...@@ -143,52 +146,91 @@ public:
Id x; Id x;
x.set_id(next); x.set_id(next);
Id y; Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
stub->GETPREDECESSOR(&context,x,&y); stub->GETPREDECESSOR(&context,x,&y);
if(y.id()<id) { if(y.id()<id) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<y.id()<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(x.id())); successorInfo.set_succaddress("0.0.0.0:"+to_string(x.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(y.id())); successorInfo.set_predaddress("0.0.0.0:"+to_string(y.id()));
} }
else { else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node)); string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
ClientContext context1; ClientContext context1;
x.set_id(id); x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y); stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
Id z; Id z;
string t_address("0.0.0.0:"+to_string(y.id())); string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
ClientContext context2; ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z); stub->GETPREDECESSOR(&context2,y,&z);
//cout<<"Yes. We got the predecessor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
//cout<<"Predecessor: "<<z.id()<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id())); successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id())); successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
} }
} }
else { else {
if(node==id) { //cout<<"We got no node with id greater than new node"<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(id)); if(node==stoi(params.find("LISTENING_PORT")->second)) {
successorInfo.set_predaddress("0.0.0.0:"+to_string(id)); //cout<<"Only one node present right now, that is me"<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(node));
successorInfo.set_predaddress("0.0.0.0:"+to_string(node));
} }
else if(node>stoi(params.find("LISTENING_PORT")->second)){
string tar_address("0.0.0.0:"+to_string(node)); string tar_address("0.0.0.0:"+to_string(node));
shared_ptr<Channel> channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext context1; ClientContext context1;
Id x,y,z; Id x,y,z;
x.set_id(id); x.set_id(id);
//cout<<"Asking the last node about the successor"<<endl;
stub->GETSUCCESSOR(&context1,x,&y); stub->GETSUCCESSOR(&context1,x,&y);
if(y.id()!=stoi(params.find("LISTENING_PORT")->second)) {
string t_address("0.0.0.0:"+to_string(y.id())); string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
ClientContext context2; ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z); stub->GETPREDECESSOR(&context2,y,&z);
//cout<<"We got the successor and the predecessor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
//cout<<"Predecessor: "<<z.id()<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id())); successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id())); successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
} }
else {
//cout<<"We got the successor and the predecessor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
string pre;
ifstream fin;
fin.open(NEIGHBOURS);
getline(fin,pre);
getline(fin,pre);
fin.close();
int a=stoi(pre.substr(pre.find(':')+1));
//cout<<"Predecessor: "<<a<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(a));
}
}
else {
successorInfo.set_succaddress("0.0.0.0:"+to_string(node));
successorInfo.set_predaddress("0.0.0.0:"+params.find("LISTENING_PORT")->second);
}
}
//cout<<"Sending the successor and predecessor back to the new server"<<endl;
newResponder.Finish(successorInfo,Status::OK,this); newResponder.Finish(successorInfo,Status::OK,this);
} }
else if(reqType==INFORMSUCCESSOR){ else if(reqType==INFORMSUCCESSOR){
//return half of the keyvalue pairs to the requesting node //return half of the keyvalue pairs to the requesting node
//cout<<"Okay, my new predecessor is: "<<info.address()<<endl;
string address=info.address(); string address=info.address();
int id=stoi(address.substr(address.find(':')+1)); int id=stoi(address.substr(address.find(':')+1));
ifstream fin; ifstream fin;
...@@ -204,12 +246,15 @@ public: ...@@ -204,12 +246,15 @@ public:
fout<<predecessor<<endl; fout<<predecessor<<endl;
fout.close(); fout.close();
null.set_nothing(0); null.set_nothing(0);
//cout<<"Done making changes accordingly"<<endl;
informSuccessorResponder.Finish(keyValues,Status::OK,this); informSuccessorResponder.Finish(keyValues,Status::OK,this);
} }
else if(reqType==GETSUCCESSOR) { else if(reqType==GETSUCCESSOR) {
//cout<<"Some server asked me to find the successor of "<<idvar1.id()<<endl;
int idtofind=idvar1.id(); int idtofind=idvar1.id();
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting the finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
for(int i=0;i<16;i++) { for(int i=0;i<16;i++) {
string temp; string temp;
...@@ -225,12 +270,12 @@ public: ...@@ -225,12 +270,12 @@ public:
next=fingers[i]; next=fingers[i];
break; break;
} }
if(fingers[i]>idtofind||i==16) { if(fingers[i]>idtofind||i==15) {
if(i==16&&fingers[i]>idtofind) { if(i==15&&fingers[i]>idtofind) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
} }
else if(i==16) else if(i==15)
node=fingers[i]; node=fingers[i];
else { else {
node=fingers[i-1]; node=fingers[i-1];
...@@ -248,8 +293,11 @@ public: ...@@ -248,8 +293,11 @@ public:
Id x; Id x;
x.set_id(next); x.set_id(next);
Id y; Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
stub->GETPREDECESSOR(&context,x,&y); stub->GETPREDECESSOR(&context,x,&y);
if(y.id()<idtofind) { if(y.id()<idtofind) {
//cout<<"Yes it is. We found the successor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
idvar2.set_id(y.id()); idvar2.set_id(y.id());
} }
else { else {
...@@ -258,10 +306,14 @@ public: ...@@ -258,10 +306,14 @@ public:
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
ClientContext context1; ClientContext context1;
x.set_id(idtofind); x.set_id(idtofind);
//cout<<"No it is not. Asking the possible predecessor to find its successor"<<endl;
stub->GETSUCCESSOR(&context1,x,&y); stub->GETSUCCESSOR(&context1,x,&y);
idvar2.set_id(y.id()); idvar2.set_id(y.id());
} }
} }
else {
if(node<stoi(params.find("LISTENING_PORT")->second))
idvar2.set_id(node);
else { else {
string tar_address("0.0.0.0:"+to_string(node)); string tar_address("0.0.0.0:"+to_string(node));
shared_ptr<Channel> channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
...@@ -272,9 +324,11 @@ public: ...@@ -272,9 +324,11 @@ public:
stub->GETSUCCESSOR(&context1,x,&y); stub->GETSUCCESSOR(&context1,x,&y);
idvar2.set_id(y.id()); idvar2.set_id(y.id());
} }
}
getSuccessorResponder.Finish(idvar2,Status::OK,this); getSuccessorResponder.Finish(idvar2,Status::OK,this);
} }
else if(reqType==GETPREDECESSOR) { else if(reqType==GETPREDECESSOR) {
//cout<<"Someone asked me for my predecessor. Sending them"<<endl;
ifstream fin; ifstream fin;
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string successor,predecessor; string successor,predecessor;
...@@ -282,9 +336,12 @@ public: ...@@ -282,9 +336,12 @@ public:
getline(fin,predecessor); getline(fin,predecessor);
fin.close(); fin.close();
idvar2.set_id(stoi(predecessor.substr(predecessor.find(':')+1))); idvar2.set_id(stoi(predecessor.substr(predecessor.find(':')+1)));
//cout<<"Sent my predecessor"<<endl;
getPredecessorResponder.Finish(idvar2,Status::OK,this); getPredecessorResponder.Finish(idvar2,Status::OK,this);
} }
else { else {
//cout<<"Okay, i got the information that my successor has changed"<<endl;
//cout<<"My new successor: "<<info.address()<<endl;
ifstream fin; ifstream fin;
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string successor,predecessor; string successor,predecessor;
...@@ -298,6 +355,7 @@ public: ...@@ -298,6 +355,7 @@ public:
fout<<predecessor<<endl; fout<<predecessor<<endl;
fout.close(); fout.close();
null.set_nothing(0); null.set_nothing(0);
//cout<<"Okay, i made the necessary changes"<<endl;
informPredecessorResponder.Finish(null,Status::OK,this); informPredecessorResponder.Finish(null,Status::OK,this);
} }
status = FINISH; status = FINISH;
...@@ -499,6 +557,7 @@ void updateAllFingerTables() { ...@@ -499,6 +557,7 @@ void updateAllFingerTables() {
} }
void register_server_DNS(string my_address) { void register_server_DNS(string my_address) {
//cout<<"Registering to DNS"<<endl;
string target_address(DNS_SERVER); string target_address(DNS_SERVER);
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub; unique_ptr<KeyValueServices::Stub> stub;
...@@ -508,22 +567,29 @@ void register_server_DNS(string my_address) { ...@@ -508,22 +567,29 @@ void register_server_DNS(string my_address) {
Info info; Info info;
ClientContext context; ClientContext context;
Status status=stub->GETADDRESS(&context,null,&info); Status status=stub->GETADDRESS(&context,null,&info);
//cout<<"Address received:"<<info.address()<<endl;
string old_server; string old_server;
if(status.ok()) { if(status.ok()) {
old_server=info.address(); old_server=info.address();
info.set_address(my_address); info.set_address(my_address);
ClientContext new_context; ClientContext new_context;
//cout<<"Adding address to DNS"<<endl;
stub->ADDADDRESS(&new_context,info,&null); stub->ADDADDRESS(&new_context,info,&null);
//cout<<"Address added to DNS"<<endl;
ofstream fout; ofstream fout;
//cout<<"Generating initial finger table"<<endl;
fout.open(FINGER_TABLE); fout.open(FINGER_TABLE);
for(int i=0;i<16;i++) for(int i=0;i<16;i++)
fout<<my_address.substr(my_address.find(':')+1)<<endl; fout<<my_address.substr(my_address.find(':')+1)<<endl;
fout.close(); fout.close();
//cout<<"Initial finger table generated"<<endl;
if(old_server=="null") { if(old_server=="null") {
//cout<<"Initializing initial neighbours"<<endl;
fout.open(NEIGHBOURS); fout.open(NEIGHBOURS);
fout<<"-1"<<endl; fout<<"-1"<<endl;
fout<<"-1"<<endl; fout<<"-1"<<endl;
fout.close(); fout.close();
//cout<<"Initialized initial neighbours"<<endl;
return; return;
} }
channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials());
...@@ -531,27 +597,35 @@ void register_server_DNS(string my_address) { ...@@ -531,27 +597,35 @@ void register_server_DNS(string my_address) {
info.set_address(my_address); info.set_address(my_address);
SuccessorInfo successorInfo; SuccessorInfo successorInfo;
ClientContext context1; ClientContext context1;
//cout<<"Sending request to server: "<<old_server<<endl;
status=stub->NEW(&context1,info,&successorInfo); status=stub->NEW(&context1,info,&successorInfo);
//cout<<"Request sent. Successor and predecessor info received"<<endl;
if(status.ok()) { if(status.ok()) {
string successor=successorInfo.succaddress(); string successor=successorInfo.succaddress();
string predecessor=successorInfo.predaddress(); string predecessor=successorInfo.predaddress();
ofstream fout; ofstream fout;
//cout<<"Storing neighbours info"<<endl;
fout.open(NEIGHBOURS); fout.open(NEIGHBOURS);
fout<<successor<<endl; fout<<successor<<endl;
fout<<predecessor<<endl; fout<<predecessor<<endl;
fout.close(); fout.close();
//cout<<"Stored neighbours info"<<endl;
channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
info.set_address(my_address); info.set_address(my_address);
KeyValues keyValues; KeyValues keyValues;
ClientContext context2; ClientContext context2;
//cout<<"Informing successor about my presence"<<endl;
status=stub->INFORMSUCCESSOR(&context2,info,&keyValues); status=stub->INFORMSUCCESSOR(&context2,info,&keyValues);
//cout<<"Informed succesor"<<endl;
channel=grpc::CreateChannel(predecessor,grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(predecessor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
info.set_address(my_address); info.set_address(my_address);
ClientContext context3; ClientContext context3;
//cout<<"Informing predecessor about my presence"<<endl;
status=stub->INFORMPREDECESSOR(&context3,info,&null); status=stub->INFORMPREDECESSOR(&context3,info,&null);
updateAllFingerTables(); //cout<<"Informed predecessor"<<endl;
//updateAllFingerTables();
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment