Commit 26eb00cd authored by mayankkakad's avatar mayankkakad

updating finger table done with minor errors (will be resolved in the next commit)

parent 8ac3f93b
...@@ -12,9 +12,12 @@ using grpc::ServerBuilder; ...@@ -12,9 +12,12 @@ using grpc::ServerBuilder;
using grpc::ServerCompletionQueue; using grpc::ServerCompletionQueue;
using grpc::ServerContext; using grpc::ServerContext;
using grpc::Status; using grpc::Status;
using grpc::Channel;
using grpc::ClientContext;
using keyvaluestore::KeyValueServices; using keyvaluestore::KeyValueServices;
using keyvaluestore::Info; using keyvaluestore::Info;
using keyvaluestore::Null; using keyvaluestore::Null;
using keyvaluestore::Addresses;
ServerBuilder builder; ServerBuilder builder;
KeyValueServices::AsyncService service; KeyValueServices::AsyncService service;
...@@ -23,12 +26,13 @@ std::unique_ptr<Server> server; ...@@ -23,12 +26,13 @@ std::unique_ptr<Server> server;
enum RequestType { enum RequestType {
GETADDRESS, GETADDRESS,
ADDADDRESS, ADDADDRESS,
UPDATEFINGERTABLES UPDATEFINGERTABLES,
GETSERVERS
}; };
class DNSData { class DNSData {
public: public:
DNSData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getAddressResponder(&context), addAddressResponder(&context), status(CREATE), reqType(reqType) { DNSData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getAddressResponder(&context), addAddressResponder(&context),updateFingerTablesResponder(&context),getServersResponder(&context), status(CREATE), reqType(reqType) {
Proceed(); Proceed();
} }
...@@ -39,6 +43,10 @@ public: ...@@ -39,6 +43,10 @@ public:
service->RequestGETADDRESS(&context, &null, &getAddressResponder, cq, cq, this); service->RequestGETADDRESS(&context, &null, &getAddressResponder, cq, cq, this);
else if(reqType==ADDADDRESS) else if(reqType==ADDADDRESS)
service->RequestADDADDRESS(&context, &info, &addAddressResponder, cq, cq, this); service->RequestADDADDRESS(&context, &info, &addAddressResponder, cq, cq, this);
else if(reqType==UPDATEFINGERTABLES)
service->RequestUPDATEFINGERTABLES(&context,&null,&updateFingerTablesResponder,cq,cq,this);
else
service->RequestGETSERVERS(&context,&null,&getServersResponder,cq,cq,this);
} }
else if (status == PROCESS) { else if (status == PROCESS) {
new DNSData(service, cq, reqType); new DNSData(service, cq, reqType);
...@@ -72,7 +80,55 @@ public: ...@@ -72,7 +80,55 @@ public:
cout<<info.address()<<endl; cout<<info.address()<<endl;
addAddressResponder.Finish(null,Status::OK,this); addAddressResponder.Finish(null,Status::OK,this);
} }
else if(reqType==UPDATEFINGERTABLES){
ifstream fin;
int index=0;
map<int,string> servers;
fin.open(SERVERS);
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
servers[index++]=temp;
}while(fin);
fin.close();
string addressarr="";
for(int i=0;i<index;i++)
addressarr+=servers[i]+";";
for(int i=0;i<index;i++) {
string target_address(servers[i]);
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
Null null;
ClientContext cont;
Addresses addr;
addr.set_addresses(addressarr);
addr.set_servers(index);
stub->UPDATETABLE(&cont,addr,&null);
}
updateFingerTablesResponder.Finish(null,Status::OK,this);
}
else { else {
ifstream fin;
int index=0;
map<int,string> servers;
fin.open(SERVERS);
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
servers[index++]=temp;
}while(fin);
fin.close();
string addressarr="";
for(int i=0;i<index;i++)
addressarr+=servers[i]+";";
addr1.set_addresses(addressarr);
addr1.set_servers(index);
getServersResponder.Finish(addr1,Status::OK,this);
} }
status = FINISH; status = FINISH;
} }
...@@ -88,8 +144,11 @@ private: ...@@ -88,8 +144,11 @@ private:
ServerContext context; ServerContext context;
Null null; Null null;
Info info; Info info;
Addresses addr1;
ServerAsyncResponseWriter<Info> getAddressResponder; ServerAsyncResponseWriter<Info> getAddressResponder;
ServerAsyncResponseWriter<Null> addAddressResponder; ServerAsyncResponseWriter<Null> addAddressResponder;
ServerAsyncResponseWriter<Null> updateFingerTablesResponder;
ServerAsyncResponseWriter<Addresses> getServersResponder;
enum CallStatus { enum CallStatus {
CREATE, CREATE,
PROCESS, PROCESS,
...@@ -110,6 +169,7 @@ int main(int argc,char **argv) { ...@@ -110,6 +169,7 @@ int main(int argc,char **argv) {
new DNSData(&service,comp_queue.get(),GETADDRESS); new DNSData(&service,comp_queue.get(),GETADDRESS);
new DNSData(&service,comp_queue.get(),ADDADDRESS); new DNSData(&service,comp_queue.get(),ADDADDRESS);
new DNSData(&service,comp_queue.get(),UPDATEFINGERTABLES); new DNSData(&service,comp_queue.get(),UPDATEFINGERTABLES);
new DNSData(&service,comp_queue.get(),GETSERVERS);
void *tag; void *tag;
bool ok; bool ok;
while(true) { while(true) {
......
...@@ -16,6 +16,8 @@ service KeyValueServices { ...@@ -16,6 +16,8 @@ service KeyValueServices {
rpc UPDATEFINGERTABLES(Null) returns (Null) {} rpc UPDATEFINGERTABLES(Null) returns (Null) {}
rpc GETSUCCESSOR(Id) returns (Id) {} rpc GETSUCCESSOR(Id) returns (Id) {}
rpc GETPREDECESSOR(Id) returns (Id) {} rpc GETPREDECESSOR(Id) returns (Id) {}
rpc UPDATETABLE(Addresses) returns (Null) {}
rpc GETSERVERS(Null) returns (Addresses) {}
} }
message Key { message Key {
...@@ -58,4 +60,9 @@ message Null { ...@@ -58,4 +60,9 @@ message Null {
message Id { message Id {
int64 id = 1; int64 id = 1;
}
message Addresses {
string addresses = 1;
int64 servers = 2;
} }
\ No newline at end of file
...@@ -28,6 +28,7 @@ using keyvaluestore::SuccessorInfo; ...@@ -28,6 +28,7 @@ using keyvaluestore::SuccessorInfo;
using keyvaluestore::KeyValues; using keyvaluestore::KeyValues;
using keyvaluestore::Null; using keyvaluestore::Null;
using keyvaluestore::Id; using keyvaluestore::Id;
using keyvaluestore::Addresses;
pthread_mutex_t _masterLock; pthread_mutex_t _masterLock;
...@@ -42,7 +43,8 @@ enum ServerRequest { ...@@ -42,7 +43,8 @@ enum ServerRequest {
INFORMSUCCESSOR, INFORMSUCCESSOR,
INFORMPREDECESSOR, INFORMPREDECESSOR,
GETSUCCESSOR, GETSUCCESSOR,
GETPREDECESSOR GETPREDECESSOR,
UPDATETABLE
}; };
map<string, string> params; map<string, string> params;
...@@ -79,7 +81,7 @@ void getConfig() { ...@@ -79,7 +81,7 @@ void getConfig() {
class ServerData { class ServerData {
public: public:
ServerData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, ServerRequest reqType) : service(service), cq(cq), newResponder(&context), informSuccessorResponder(&context),informPredecessorResponder(&context),getSuccessorResponder(&context),getPredecessorResponder(&context), status(CREATE), reqType(reqType) { ServerData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, ServerRequest reqType) : service(service), cq(cq), newResponder(&context), informSuccessorResponder(&context),informPredecessorResponder(&context),getSuccessorResponder(&context),getPredecessorResponder(&context),updateTableResponder(&context), status(CREATE), reqType(reqType) {
Proceed(); Proceed();
} }
...@@ -96,6 +98,8 @@ public: ...@@ -96,6 +98,8 @@ public:
service->RequestGETSUCCESSOR(&context,&idvar1,&getSuccessorResponder,cq,cq,this); service->RequestGETSUCCESSOR(&context,&idvar1,&getSuccessorResponder,cq,cq,this);
else if(reqType==GETPREDECESSOR) else if(reqType==GETPREDECESSOR)
service->RequestGETPREDECESSOR(&context,&idvar1,&getPredecessorResponder,cq,cq,this); service->RequestGETPREDECESSOR(&context,&idvar1,&getPredecessorResponder,cq,cq,this);
else
service->RequestUPDATETABLE(&context,&addressarr,&updateTableResponder,cq,cq,this);
} }
else if (status == PROCESS) { else if (status == PROCESS) {
new ServerData(service, cq, reqType); new ServerData(service, cq, reqType);
...@@ -108,27 +112,31 @@ public: ...@@ -108,27 +112,31 @@ public:
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl; //cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0;
for(int i=0;i<16;i++) { for(int i=0;i<16;i++) {
string temp; string temp;
getline(fin,temp); getline(fin,temp);
if(temp=="null")
break;
nums++;
fingers[i]=stoi(temp); fingers[i]=stoi(temp);
} }
fin.close(); fin.close();
int node=-1; int node=-1;
int next=-1; int next=-1;
//cout<<"Looking for the possible successor and predecessor"<<endl; //cout<<"Looking for the possible successor and predecessor"<<endl;
for(int i=0;i<16;i++) { for(int i=0;i<nums;i++) {
if(i>0&&fingers[i]<fingers[i-1]) { if(i>0&&fingers[i]<fingers[i-1]) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
break; break;
} }
if(fingers[i]>id||i==15) { if(fingers[i]>id||i==nums-1) {
if(i==15&&fingers[i]>id) { if(i==nums-1&&fingers[i]>id) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
} }
else if(i==15) else if(i==nums-1)
node=fingers[i]; node=fingers[i];
else { else {
node=fingers[i-1]; node=fingers[i-1];
...@@ -256,26 +264,29 @@ public: ...@@ -256,26 +264,29 @@ public:
ifstream fin; ifstream fin;
//cout<<"Getting the finger table"<<endl; //cout<<"Getting the finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0;
for(int i=0;i<16;i++) { for(int i=0;i<16;i++) {
string temp; string temp;
getline(fin,temp); getline(fin,temp);
if(temp=="null")
break;
fingers[i]=stoi(temp); fingers[i]=stoi(temp);
} }
fin.close(); fin.close();
int node=-1; int node=-1;
int next=-1; int next=-1;
for(int i=0;i<16;i++) { for(int i=0;i<nums;i++) {
if(i>0&&fingers[i]>fingers[i-1]) { if(i>0&&fingers[i]>fingers[i-1]) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
break; break;
} }
if(fingers[i]>idtofind||i==15) { if(fingers[i]>idtofind||i==nums-1) {
if(i==15&&fingers[i]>idtofind) { if(i==nums-1&&fingers[i]>idtofind) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
} }
else if(i==15) else if(i==nums-1)
node=fingers[i]; node=fingers[i];
else { else {
node=fingers[i-1]; node=fingers[i-1];
...@@ -339,7 +350,7 @@ public: ...@@ -339,7 +350,7 @@ public:
//cout<<"Sent my predecessor"<<endl; //cout<<"Sent my predecessor"<<endl;
getPredecessorResponder.Finish(idvar2,Status::OK,this); getPredecessorResponder.Finish(idvar2,Status::OK,this);
} }
else { else if(reqType==INFORMPREDECESSOR) {
//cout<<"Okay, i got the information that my successor has changed"<<endl; //cout<<"Okay, i got the information that my successor has changed"<<endl;
//cout<<"My new successor: "<<info.address()<<endl; //cout<<"My new successor: "<<info.address()<<endl;
ifstream fin; ifstream fin;
...@@ -358,6 +369,46 @@ public: ...@@ -358,6 +369,46 @@ public:
//cout<<"Okay, i made the necessary changes"<<endl; //cout<<"Okay, i made the necessary changes"<<endl;
informPredecessorResponder.Finish(null,Status::OK,this); informPredecessorResponder.Finish(null,Status::OK,this);
} }
else {
string addresses=addressarr.addresses();
int num=addressarr.servers();
int ids[num];
string addr[num];
for(int i=0;i<num;i++) {
addr[i]=addresses.substr(0,addresses.find(';'));
ids[i]=stoi(addr[i].substr(addr[i].find(':')+1));
addresses=addresses.substr(addresses.find(';')+1);
}
ofstream fout;
fout.open(FINGER_TABLE);
int my_id=stoi(params.find("LISTENING_PORT")->second);
int prev_entry=my_id;
int i=0;
for(i=0;i<16;i++) {
int next_entry=(my_id+(1<<i))%(1<<16);
if(prev_entry<my_id&&next_entry>=my_id)
break;
for(int j=0;j<num;j++) {
if(ids[j]==my_id)
continue;
if(ids[j]>=next_entry) {
fout<<ids[j]<<endl;
continue;
}
if(j==num-1)
fout<<"null"<<endl;
}
prev_entry=next_entry;
}
while(i<16) {
fout<<"null"<<endl;
i++;
}
fout.close();
Null n;
n.set_nothing(0);
updateTableResponder.Finish(n,Status::OK,this);
}
status = FINISH; status = FINISH;
} }
else { else {
...@@ -376,11 +427,13 @@ private: ...@@ -376,11 +427,13 @@ private:
KeyValues keyValues; KeyValues keyValues;
Id idvar1; Id idvar1;
Id idvar2; Id idvar2;
Addresses addressarr;
ServerAsyncResponseWriter<SuccessorInfo> newResponder; ServerAsyncResponseWriter<SuccessorInfo> newResponder;
ServerAsyncResponseWriter<KeyValues> informSuccessorResponder; ServerAsyncResponseWriter<KeyValues> informSuccessorResponder;
ServerAsyncResponseWriter<Null> informPredecessorResponder; ServerAsyncResponseWriter<Null> informPredecessorResponder;
ServerAsyncResponseWriter<Id> getSuccessorResponder; ServerAsyncResponseWriter<Id> getSuccessorResponder;
ServerAsyncResponseWriter<Id> getPredecessorResponder; ServerAsyncResponseWriter<Id> getPredecessorResponder;
ServerAsyncResponseWriter<Null> updateTableResponder;
enum CallStatus { enum CallStatus {
CREATE, CREATE,
PROCESS, PROCESS,
...@@ -496,6 +549,7 @@ void *handleServerRequests(void *thread_id) { ...@@ -496,6 +549,7 @@ void *handleServerRequests(void *thread_id) {
new ServerData(&service,comp_queue.get(),INFORMPREDECESSOR); new ServerData(&service,comp_queue.get(),INFORMPREDECESSOR);
new ServerData(&service,comp_queue.get(),GETSUCCESSOR); new ServerData(&service,comp_queue.get(),GETSUCCESSOR);
new ServerData(&service,comp_queue.get(),GETPREDECESSOR); new ServerData(&service,comp_queue.get(),GETPREDECESSOR);
new ServerData(&service,comp_queue.get(),UPDATETABLE);
void *tag; void *tag;
bool ok; bool ok;
while(true) { while(true) {
...@@ -553,7 +607,45 @@ void updateAllFingerTables() { ...@@ -553,7 +607,45 @@ void updateAllFingerTables() {
null1.set_nothing(0); null1.set_nothing(0);
Null null2; Null null2;
ClientContext context; ClientContext context;
Status status=stub->UPDATEFINGERTABLES(&context,null1,&null2); Addresses addr;
stub->GETSERVERS(&context,null1,&addr);
string addresses=addr.addresses();
int num=addr.servers();
string ad[num];
int ids[num];
for(int i=0;i<num;i++) {
ad[i]=addresses.substr(0,addresses.find(';'));
ids[i]=stoi(ad[i].substr(ad[i].find(':')+1));
addresses=addresses.substr(addresses.find(';')+1);
}
ofstream fout;
fout.open(FINGER_TABLE);
int my_id=stoi(params.find("LISTENING_PORT")->second);
int prev_entry=my_id;
int i=0;
for(i=0;i<16;i++) {
int next_entry=(my_id+(1<<i))%(1<<16);
if(prev_entry<my_id&&next_entry>=my_id)
break;
for(int j=0;j<num;j++) {
if(ids[j]==my_id)
continue;
if(ids[j]>=next_entry) {
fout<<ids[j]<<endl;
continue;
}
if(j==num-1)
fout<<"null"<<endl;
}
prev_entry=next_entry;
}
while(i<16) {
fout<<"null"<<endl;
i++;
}
fout.close();
ClientContext context2;
stub->UPDATEFINGERTABLES(&context2,null1,&null2);
} }
void register_server_DNS(string my_address) { void register_server_DNS(string my_address) {
...@@ -580,7 +672,7 @@ void register_server_DNS(string my_address) { ...@@ -580,7 +672,7 @@ void register_server_DNS(string my_address) {
//cout<<"Generating initial finger table"<<endl; //cout<<"Generating initial finger table"<<endl;
fout.open(FINGER_TABLE); fout.open(FINGER_TABLE);
for(int i=0;i<16;i++) for(int i=0;i<16;i++)
fout<<my_address.substr(my_address.find(':')+1)<<endl; fout<<"null"<<endl;
fout.close(); fout.close();
//cout<<"Initial finger table generated"<<endl; //cout<<"Initial finger table generated"<<endl;
if(old_server=="null") { if(old_server=="null") {
...@@ -625,7 +717,7 @@ void register_server_DNS(string my_address) { ...@@ -625,7 +717,7 @@ void register_server_DNS(string my_address) {
//cout<<"Informing predecessor about my presence"<<endl; //cout<<"Informing predecessor about my presence"<<endl;
status=stub->INFORMPREDECESSOR(&context3,info,&null); status=stub->INFORMPREDECESSOR(&context3,info,&null);
//cout<<"Informed predecessor"<<endl; //cout<<"Informed predecessor"<<endl;
//updateAllFingerTables(); updateAllFingerTables();
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment