Commit e94cec16 authored by Mayank Manoj's avatar Mayank Manoj

Merge branch 'mayank' into 'master'

Mayank

See merge request !2
parents 40295e4f b69e736e
...@@ -20,6 +20,9 @@ public: ...@@ -20,6 +20,9 @@ public:
virtual void pushAll() { virtual void pushAll() {
return; return;
} }
virtual string getKeyValuePairs(int id) {
return "This will never run";
}
}; };
class storageLRU : public memoryManagement { class storageLRU : public memoryManagement {
...@@ -57,6 +60,10 @@ public: ...@@ -57,6 +60,10 @@ public:
void pushAll() { void pushAll() {
mycache.pushAll(); mycache.pushAll();
} }
string getKeyValuePairs(int id) {
return "keyvaluepairs";
}
}; };
class storageLFU : public memoryManagement { class storageLFU : public memoryManagement {
...@@ -93,4 +100,8 @@ public: ...@@ -93,4 +100,8 @@ public:
void pushAll() { void pushAll() {
mycache.pushAll(); mycache.pushAll();
} }
string getKeyValuePairs(int id) {
return "keyvaluepairs";
}
}; };
\ No newline at end of file
...@@ -42,7 +42,7 @@ include_directories("${CMAKE_CURRENT_BINARY_DIR}") ...@@ -42,7 +42,7 @@ include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# Targets (client|server) # Targets (client|server)
foreach(_target foreach(_target
client client_test server) client client_test server dns)
add_executable(${_target} "${_target}.cpp" add_executable(${_target} "${_target}.cpp"
${hw_proto_srcs} ${hw_proto_srcs}
${hw_grpc_srcs}) ${hw_grpc_srcs})
......
...@@ -13,6 +13,8 @@ using keyvaluestore::Value; ...@@ -13,6 +13,8 @@ using keyvaluestore::Value;
using keyvaluestore::KeyValue; using keyvaluestore::KeyValue;
using keyvaluestore::ReqStatus; using keyvaluestore::ReqStatus;
using keyvaluestore::KeyValueServices; using keyvaluestore::KeyValueServices;
using keyvaluestore::Info;
using keyvaluestore::Null;
std::map<std::string, std::string> params; std::map<std::string, std::string> params;
std::string config_filename = "../config"; std::string config_filename = "../config";
...@@ -150,7 +152,15 @@ void parse(std::string& str, std::string& cmd, std::string& key, std::string& va ...@@ -150,7 +152,15 @@ void parse(std::string& str, std::string& cmd, std::string& key, std::string& va
} }
void RunClient() { void RunClient() {
std::string target_address("0.0.0.0:"+params.find("LISTENING_PORT")->second); std::string dns_address("0.0.0.0:1234");
std::shared_ptr<Channel> channel=grpc::CreateChannel(dns_address, grpc::InsecureChannelCredentials());
std::unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
Null null;
null.set_nothing(0);
Info info;
ClientContext context;
Status status=stub->GETADDRESS(&context,null,&info);
std::string target_address(info.address());
// Instantiates the client // Instantiates the client
KeyValueServicesClient client( KeyValueServicesClient client(
// Channel from which RPCs are made - endpoint is the target_address // Channel from which RPCs are made - endpoint is the target_address
......
#include <bits/stdc++.h>
#include <grpcpp/grpcpp.h>
#include<fstream>
#include "keyvaluestore.grpc.pb.h"
#define SERVERS "serverlist.txt"
using namespace std;
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using grpc::Channel;
using grpc::ClientContext;
using keyvaluestore::KeyValueServices;
using keyvaluestore::Info;
using keyvaluestore::Null;
using keyvaluestore::Addresses;
ServerBuilder builder;
KeyValueServices::AsyncService service;
std::unique_ptr<Server> server;
enum RequestType {
GETADDRESS,
ADDADDRESS,
UPDATEFINGERTABLES,
GETSERVERS
};
class DNSData {
public:
DNSData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getAddressResponder(&context), addAddressResponder(&context),updateFingerTablesResponder(&context),getServersResponder(&context), status(CREATE), reqType(reqType) {
Proceed();
}
void Proceed() {
if (status == CREATE) {
status = PROCESS;
if(reqType==GETADDRESS)
service->RequestGETADDRESS(&context, &null, &getAddressResponder, cq, cq, this);
else if(reqType==ADDADDRESS)
service->RequestADDADDRESS(&context, &info, &addAddressResponder, cq, cq, this);
else if(reqType==UPDATEFINGERTABLES)
service->RequestUPDATEFINGERTABLES(&context,&null,&updateFingerTablesResponder,cq,cq,this);
else
service->RequestGETSERVERS(&context,&null,&getServersResponder,cq,cq,this);
}
else if (status == PROCESS) {
new DNSData(service, cq, reqType);
if(reqType==GETADDRESS) {
ifstream fin;
int size=0;
map<int,string> servers;
fin.open(SERVERS);
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
servers[size++]=temp;
}while(fin);
fin.close();
if(size==0)
info.set_address("null");
else {
int x=rand()%size;
info.set_address(servers.find(x)->second);
}
getAddressResponder.Finish(info,Status::OK,this);
}
else if(reqType==ADDADDRESS){
ifstream fin;
fin.open(SERVERS);
int size=0;
map<int,string> svs;
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
svs[size++]=temp;
} while(fin);
fin.close();
string addresses[size+1];
int count=0;
string addtoadd=info.address();
int porttoadd=stoi(addtoadd.substr(addtoadd.find(':')+1));
bool flag=false;
for(int i=0;i<size;i++) {
int curr_port=stoi(svs[i].substr(svs[i].find(':')+1));
if(porttoadd<curr_port&&flag==false) {
addresses[count++]=addtoadd;
flag=true;
}
addresses[count++]=svs[i];
}
if(flag==false)
addresses[count++]=addtoadd;
ofstream fout;
fout.open(SERVERS);
for(int i=0;i<count;i++)
fout<<addresses[i]<<endl;
fout.close();
null.set_nothing(0);
cout<<info.address()<<endl;
addAddressResponder.Finish(null,Status::OK,this);
}
else if(reqType==UPDATEFINGERTABLES){
ifstream fin;
int index=0;
map<int,string> servers;
fin.open(SERVERS);
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
servers[index++]=temp;
}while(fin);
fin.close();
string addressarr="";
for(int i=0;i<index;i++)
addressarr+=servers[i]+";";
for(int i=0;i<index;i++) {
string target_address(servers[i]);
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
Null null;
ClientContext cont;
Addresses addr;
addr.set_addresses(addressarr);
addr.set_servers(index);
stub->UPDATETABLE(&cont,addr,&null);
}
updateFingerTablesResponder.Finish(null,Status::OK,this);
}
else {
ifstream fin;
int index=0;
map<int,string> servers;
fin.open(SERVERS);
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
servers[index++]=temp;
}while(fin);
fin.close();
string addressarr="";
for(int i=0;i<index;i++)
addressarr+=servers[i]+";";
addr1.set_addresses(addressarr);
addr1.set_servers(index);
getServersResponder.Finish(addr1,Status::OK,this);
}
status = FINISH;
}
else {
GPR_ASSERT(status == FINISH);
delete this;
}
}
private:
KeyValueServices::AsyncService *service;
ServerCompletionQueue *cq;
ServerContext context;
Null null;
Info info;
Addresses addr1;
ServerAsyncResponseWriter<Info> getAddressResponder;
ServerAsyncResponseWriter<Null> addAddressResponder;
ServerAsyncResponseWriter<Null> updateFingerTablesResponder;
ServerAsyncResponseWriter<Addresses> getServersResponder;
enum CallStatus {
CREATE,
PROCESS,
FINISH
};
CallStatus status;
RequestType reqType;
};
int main(int argc,char **argv) {
srand(time(0));
string server_address("0.0.0.0:1234");
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
unique_ptr<ServerCompletionQueue> comp_queue=builder.AddCompletionQueue();
server = builder.BuildAndStart();
cout<<"DNS SERVER COMES UP SUCCESSFULLY"<<endl;
new DNSData(&service,comp_queue.get(),GETADDRESS);
new DNSData(&service,comp_queue.get(),ADDADDRESS);
new DNSData(&service,comp_queue.get(),UPDATEFINGERTABLES);
new DNSData(&service,comp_queue.get(),GETSERVERS);
void *tag;
bool ok;
while(true) {
GPR_ASSERT(comp_queue->Next(&tag,&ok));
GPR_ASSERT(ok);
static_cast<DNSData *>(tag)->Proceed();
}
}
\ No newline at end of file
...@@ -8,9 +8,18 @@ service KeyValueServices { ...@@ -8,9 +8,18 @@ service KeyValueServices {
rpc GET(Key) returns (Value) {} rpc GET(Key) returns (Value) {}
rpc PUT(KeyValue) returns (ReqStatus) {} rpc PUT(KeyValue) returns (ReqStatus) {}
rpc DEL(Key) returns (ReqStatus) {} rpc DEL(Key) returns (ReqStatus) {}
rpc NEW(Info) returns (SuccessorInfo) {}
rpc INFORMSUCCESSOR(Info) returns (KeyValues) {}
rpc INFORMPREDECESSOR(Info) returns (Null) {}
rpc GETADDRESS(Null) returns (Info) {}
rpc ADDADDRESS(Info) returns (Null) {}
rpc UPDATEFINGERTABLES(Null) returns (Null) {}
rpc GETSUCCESSOR(Id) returns (Id) {}
rpc GETPREDECESSOR(Id) returns (Id) {}
rpc UPDATETABLE(Addresses) returns (Null) {}
rpc GETSERVERS(Null) returns (Addresses) {}
} }
message Key { message Key {
string key = 1; string key = 1;
} }
...@@ -30,3 +39,30 @@ message ReqStatus { ...@@ -30,3 +39,30 @@ message ReqStatus {
int32 status = 1; int32 status = 1;
string error = 2; string error = 2;
} }
message Info {
string address = 1;
}
message SuccessorInfo {
string succaddress = 1;
string predaddress = 2;
}
message KeyValues {
string keys = 1;
string values = 2;
}
message Null {
int32 nothing = 1;
}
message Id {
int64 id = 1;
}
message Addresses {
string addresses = 1;
int64 servers = 2;
}
\ No newline at end of file
#include <bits/stdc++.h> #include <bits/stdc++.h>
#include <pthread.h> #include <pthread.h>
#include <fstream>
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
#include "keyvaluestore.grpc.pb.h" #include "keyvaluestore.grpc.pb.h"
#include "../Backend.h" #include "../Backend.h"
#define NEIGHBOURS "neighbours.txt"
#define FINGER_TABLE "fingertable.txt"
#define DNS_SERVER "0.0.0.0:1234"
using namespace std; using namespace std;
...@@ -11,12 +15,20 @@ using grpc::ServerAsyncResponseWriter; ...@@ -11,12 +15,20 @@ using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder; using grpc::ServerBuilder;
using grpc::ServerCompletionQueue; using grpc::ServerCompletionQueue;
using grpc::ServerContext; using grpc::ServerContext;
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status; using grpc::Status;
using keyvaluestore::Key; using keyvaluestore::Key;
using keyvaluestore::KeyValue; using keyvaluestore::KeyValue;
using keyvaluestore::KeyValueServices; using keyvaluestore::KeyValueServices;
using keyvaluestore::ReqStatus; using keyvaluestore::ReqStatus;
using keyvaluestore::Value; using keyvaluestore::Value;
using keyvaluestore::Info;
using keyvaluestore::SuccessorInfo;
using keyvaluestore::KeyValues;
using keyvaluestore::Null;
using keyvaluestore::Id;
using keyvaluestore::Addresses;
pthread_mutex_t _masterLock; pthread_mutex_t _masterLock;
...@@ -26,6 +38,15 @@ enum RequestType { ...@@ -26,6 +38,15 @@ enum RequestType {
DEL DEL
}; };
enum ServerRequest {
NEW,
INFORMSUCCESSOR,
INFORMPREDECESSOR,
GETSUCCESSOR,
GETPREDECESSOR,
UPDATETABLE
};
map<string, string> params; map<string, string> params;
string config_filename = "../config"; string config_filename = "../config";
string log_file = "../log"; string log_file = "../log";
...@@ -37,6 +58,9 @@ pthread_t *workers; ...@@ -37,6 +58,9 @@ pthread_t *workers;
int *worker_id; int *worker_id;
pthread_cond_t startRpcs; pthread_cond_t startRpcs;
pthread_mutex_t myLock; pthread_mutex_t myLock;
pthread_t dist_worker;
int dist_worker_id;
bool start; bool start;
memoryManagement *memManager; memoryManagement *memManager;
...@@ -55,6 +79,354 @@ void getConfig() { ...@@ -55,6 +79,354 @@ void getConfig() {
config.close(); config.close();
} }
class ServerData {
public:
ServerData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, ServerRequest reqType) : service(service), cq(cq), newResponder(&context), informSuccessorResponder(&context),informPredecessorResponder(&context),getSuccessorResponder(&context),getPredecessorResponder(&context),updateTableResponder(&context), status(CREATE), reqType(reqType) {
Proceed();
}
void Proceed() {
if (status == CREATE) {
status = PROCESS;
if (reqType == NEW)
service->RequestNEW(&context, &info, &newResponder, cq, cq, this);
else if(reqType==INFORMSUCCESSOR)
service->RequestINFORMSUCCESSOR(&context, &info, &informSuccessorResponder, cq, cq, this);
else if(reqType==INFORMPREDECESSOR)
service->RequestINFORMPREDECESSOR(&context,&info,&informPredecessorResponder,cq,cq,this);
else if(reqType==GETSUCCESSOR)
service->RequestGETSUCCESSOR(&context,&idvar1,&getSuccessorResponder,cq,cq,this);
else if(reqType==GETPREDECESSOR)
service->RequestGETPREDECESSOR(&context,&idvar1,&getPredecessorResponder,cq,cq,this);
else
service->RequestUPDATETABLE(&context,&addressarr,&updateTableResponder,cq,cq,this);
}
else if (status == PROCESS) {
new ServerData(service, cq, reqType);
if (reqType == NEW) {
//cout<<"New Server to join:"<<info.address()<<endl;
//calculate id of node, return it's successor and predecessor
string address=info.address();
int id=stoi(address.substr(address.find(':')+1));
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<id&&my_id>=id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<id&&fingers[0]>=id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<id&&fingers[i]>=id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
if(next!=-1) {
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(x.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(mypred));
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
Id z;
string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z);
//cout<<"Yes. We got the predecessor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
//cout<<"Predecessor: "<<z.id()<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
}
}
else {
//cout<<"We got no node with id greater than new node"<<endl;
//cout<<"Only one node present right now, that is me"<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(my_id));
successorInfo.set_predaddress("0.0.0.0:"+to_string(my_id));
}
//cout<<"Sending the successor and predecessor back to the new server"<<endl;
newResponder.Finish(successorInfo,Status::OK,this);
}
else if(reqType==INFORMSUCCESSOR){
//return half of the keyvalue pairs to the requesting node
string address=info.address();
int id=stoi(address.substr(address.find(':')+1));
string keyvalues=memManager->getKeyValuePairs(id);
string keys=keyvalues.substr(0,keyvalues.find(";;")+1);
string values=keyvalues.substr(keyvalues.find(";;")+2);
//cout<<"Okay, my new predecessor is: "<<info.address()<<endl;
ifstream fin;
fin.open(NEIGHBOURS);
string successor,predecessor;
getline(fin,successor);
getline(fin,predecessor);
fin.close();
predecessor=info.address();
ofstream fout;
fout.open(NEIGHBOURS);
fout<<successor<<endl;
fout<<predecessor<<endl;
fout.close();
keyValues.set_keys(keys);
keyValues.set_values(values);
//cout<<"Done making changes accordingly"<<endl;
informSuccessorResponder.Finish(keyValues,Status::OK,this);
}
else if(reqType==GETSUCCESSOR) {
//cout<<"Some server asked me to find the successor of "<<idvar1.id()<<endl;
int idtofind=idvar1.id();
int fingers[16];
ifstream fin;
//cout<<"Getting the finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
for(int i=0;i<16;i++) {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[i]=stoi(temp);
}
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<idtofind&&my_id>=idtofind) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<idtofind&&fingers[0]>=idtofind) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<idtofind&&fingers[i]>=idtofind) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>idtofind) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
stub->GETPREDECESSOR(&context,x,&y);
if(y.id()==node) {
//cout<<"Yes it is. We found the successor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
idvar2.set_id(x.id());
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(idtofind);
//cout<<"No it is not. Asking the possible predecessor to find its successor"<<endl;
stub->GETSUCCESSOR(&context1,x,&y);
idvar2.set_id(y.id());
}
getSuccessorResponder.Finish(idvar2,Status::OK,this);
}
else if(reqType==GETPREDECESSOR) {
//cout<<"Someone asked me for my predecessor. Sending them"<<endl;
ifstream fin;
fin.open(NEIGHBOURS);
string successor,predecessor;
getline(fin,successor);
getline(fin,predecessor);
fin.close();
idvar2.set_id(stoi(predecessor.substr(predecessor.find(':')+1)));
//cout<<"Sent my predecessor"<<endl;
getPredecessorResponder.Finish(idvar2,Status::OK,this);
}
else if(reqType==INFORMPREDECESSOR) {
//cout<<"Okay, i got the information that my successor has changed"<<endl;
//cout<<"My new successor: "<<info.address()<<endl;
ifstream fin;
fin.open(NEIGHBOURS);
string successor,predecessor;
getline(fin,successor);
getline(fin,predecessor);
fin.close();
successor=info.address();
ofstream fout;
fout.open(NEIGHBOURS);
fout<<successor<<endl;
fout<<predecessor<<endl;
fout.close();
null.set_nothing(0);
//cout<<"Okay, i made the necessary changes"<<endl;
informPredecessorResponder.Finish(null,Status::OK,this);
}
else {
string addresses=addressarr.addresses();
int num=addressarr.servers();
int ids[num];
string addr[num];
for(int i=0;i<num;i++) {
addr[i]=addresses.substr(0,addresses.find(';'));
ids[i]=stoi(addr[i].substr(addr[i].find(':')+1));
addresses=addresses.substr(addresses.find(';')+1);
}
ofstream fout;
fout.open(FINGER_TABLE);
int my_id=stoi(params.find("LISTENING_PORT")->second);
int prev_entry=my_id;
int i=0;
int my_index=0;
for(i=0;i<num;i++)
if(ids[i]==my_id) {
my_index=i;
break;
}
int fingernodes[num-1];
int count=0;
for(i=my_index+1;i<num;i++)
fingernodes[count++]=ids[i];
for(i=0;i<my_index;i++)
fingernodes[count++]=ids[i];
int curr=0;
for(i=0;i<16;i++) {
int next_entry=(my_id+(1<<i))%(1<<16);
if(curr!=count&&next_entry>ids[num-1]&&my_index!=0) {
fout<<ids[0]<<endl;
if(fingernodes[curr]!=ids[0]) {
for(int j=0;j<count;j++)
if(fingernodes[j]==ids[0])
curr=j;
}
}
else {
while(curr<count&&next_entry>fingernodes[curr])
curr++;
}
if(curr<count&&fingernodes[curr]>=next_entry)
fout<<fingernodes[curr]<<endl;
if(curr==count)
fout<<"null"<<endl;
}
fout.close();
Null n;
n.set_nothing(0);
updateTableResponder.Finish(n,Status::OK,this);
}
status = FINISH;
}
else {
GPR_ASSERT(status == FINISH);
delete this;
}
}
private:
KeyValueServices::AsyncService *service;
ServerCompletionQueue *cq;
ServerContext context;
Info info;
SuccessorInfo successorInfo;
Null null;
KeyValues keyValues;
Id idvar1;
Id idvar2;
Addresses addressarr;
ServerAsyncResponseWriter<SuccessorInfo> newResponder;
ServerAsyncResponseWriter<KeyValues> informSuccessorResponder;
ServerAsyncResponseWriter<Null> informPredecessorResponder;
ServerAsyncResponseWriter<Id> getSuccessorResponder;
ServerAsyncResponseWriter<Id> getPredecessorResponder;
ServerAsyncResponseWriter<Null> updateTableResponder;
enum CallStatus {
CREATE,
PROCESS,
FINISH
};
CallStatus status;
ServerRequest reqType;
};
class CallData { class CallData {
public: public:
CallData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getResponder(&context), putResponder(&context), delResponder(&context), status(CREATE), reqType(reqType) { CallData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getResponder(&context), putResponder(&context), delResponder(&context), status(CREATE), reqType(reqType) {
...@@ -73,43 +445,680 @@ public: ...@@ -73,43 +445,680 @@ public:
} else if (status == PROCESS) { } else if (status == PROCESS) {
new CallData(service, cq, reqType); new CallData(service, cq, reqType);
if (reqType == GET) { if (reqType == GET) {
cout << "SERVER SERVES A GET REQUEST WITH PARAMETER KEY : " << key.key(); int succ;
int status = 200; int key_id=hash(key.key());
pthread_mutex_lock(&_masterLock); int my_id=stoi(params.find("LISTENING_PORT")->second);
string v = memManager->get(&status, key.key()); ifstream fin;
pthread_mutex_unlock(&_masterLock); fin.open(NEIGHBOURS);
string pred;
value.set_value(v); getline(fin,pred);
if (status == 200) getline(fin,pred);
value.set_status(200); fin.close();
int pred_id;
if(pred=="-1")
pred_id=-1;
else
pred_id=stoi(pred.substr(pred.find(':')+1));
if(my_id<key_id&&!(pred_id<key_id&&pred_id>my_id)) {
//transfer request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->GET(&cont1,key,&value);
}
else if(my_id==key_id||(my_id<key_id&&pred_id<key_id&&pred_id>my_id)) {
cout << "SERVER SERVES A GET REQUEST WITH PARAMETER KEY : " << key.key();
int status = 200;
pthread_mutex_lock(&_masterLock);
string v = memManager->get(&status, key.key());
pthread_mutex_unlock(&_masterLock);
value.set_value(v);
if (status == 200)
value.set_status(200);
else {
value.set_status(400);
value.set_error(v);
}
cout << " RETURN VALUE : " << value.value() << endl;
}
else { else {
value.set_status(400); if(pred_id==-1||pred_id<key_id) {
value.set_error(v); cout << "SERVER SERVES A GET REQUEST WITH PARAMETER KEY : " << key.key();
int status = 200;
pthread_mutex_lock(&_masterLock);
string v = memManager->get(&status, key.key());
pthread_mutex_unlock(&_masterLock);
value.set_value(v);
if (status == 200)
value.set_status(200);
else {
value.set_status(400);
value.set_error(v);
}
cout << " RETURN VALUE : " << value.value() << endl;
}
else {
//transfer the request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->GET(&cont1,key,&value);
}
} }
cout << " RETURN VALUE : " << value.value() << endl;
getResponder.Finish(value, Status::OK, this); getResponder.Finish(value, Status::OK, this);
} else if (reqType == PUT) { } else if (reqType == PUT) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl; int succ;
int key_id=hash(keyvalue.key());
int my_id=stoi(params.find("LISTENING_PORT")->second);
ifstream fin;
fin.open(NEIGHBOURS);
string pred;
getline(fin,pred);
getline(fin,pred);
fin.close();
int pred_id;
if(pred=="-1")
pred_id=-1;
else
pred_id=stoi(pred.substr(pred.find(':')+1));
if(my_id<key_id&&!(pred_id<key_id&&pred_id>my_id)) {
//transfer request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->PUT(&cont1,keyvalue,&stat);
}
else if(my_id==key_id||(my_id<key_id&&pred_id<key_id&&pred_id>my_id)) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl;
pthread_mutex_lock(&_masterLock);
memManager->put(keyvalue.key(), keyvalue.value());
pthread_mutex_unlock(&_masterLock);
pthread_mutex_lock(&_masterLock); stat.set_status(200);
memManager->put(keyvalue.key(), keyvalue.value()); }
pthread_mutex_unlock(&_masterLock); else {
if(pred_id==-1||pred_id<key_id) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl;
stat.set_status(200);
pthread_mutex_lock(&_masterLock);
memManager->put(keyvalue.key(), keyvalue.value());
pthread_mutex_unlock(&_masterLock);
stat.set_status(200);
}
else {
//transfer the request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->PUT(&cont1,keyvalue,&stat);
}
}
putResponder.Finish(stat, Status::OK, this); putResponder.Finish(stat, Status::OK, this);
} else { } else {
cout << "SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : " << key.key() << endl; int succ;
int status = 200; int key_id=hash(key.key());
int my_id=stoi(params.find("LISTENING_PORT")->second);
ifstream fin;
fin.open(NEIGHBOURS);
string pred;
getline(fin,pred);
getline(fin,pred);
fin.close();
int pred_id;
if(pred=="-1")
pred_id=-1;
else
pred_id=stoi(pred.substr(pred.find(':')+1));
if(my_id<key_id&&!(pred_id<key_id&&pred_id>my_id)) {
//transfer request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->DEL(&cont1,key,&stat);
}
else if(my_id==key_id||(my_id<key_id&&pred_id<key_id&&pred_id>my_id)) {
cout << "SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : " << key.key() << endl;
int status = 200;
pthread_mutex_lock(&_masterLock); pthread_mutex_lock(&_masterLock);
memManager->del(&status, key.key()); memManager->del(&status, key.key());
pthread_mutex_unlock(&_masterLock); pthread_mutex_unlock(&_masterLock);
if (status == 200) if (status == 200)
stat.set_status(200); stat.set_status(200);
else {
//as this server does not have the keyvalue pair, send request to next server using CHORD
stat.set_status(400);
stat.set_error("KEY NOT EXIST");
}
}
else { else {
stat.set_status(400); if(pred_id==-1||pred_id<key_id) {
stat.set_error("KEY NOT EXIST"); cout << "SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : " << key.key() << endl;
int status = 200;
pthread_mutex_lock(&_masterLock);
memManager->del(&status, key.key());
pthread_mutex_unlock(&_masterLock);
if (status == 200)
stat.set_status(200);
else {
//as this server does not have the keyvalue pair, send request to next server using CHORD
stat.set_status(400);
stat.set_error("KEY NOT EXIST");
}
}
else {
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->DEL(&cont1,key,&stat);
}
} }
delResponder.Finish(stat, Status::OK, this); delResponder.Finish(stat, Status::OK, this);
} }
...@@ -121,6 +1130,9 @@ public: ...@@ -121,6 +1130,9 @@ public:
delete this; delete this;
} }
} }
int hash(string s) {
return (((int)s.at(0))<<8)+((int)s.at(1));
}
private: private:
KeyValueServices::AsyncService *service; KeyValueServices::AsyncService *service;
...@@ -147,6 +1159,28 @@ void setupServer(string server_address) { ...@@ -147,6 +1159,28 @@ void setupServer(string server_address) {
builder.RegisterService(&service); builder.RegisterService(&service);
} }
void *handleServerRequests(void *thread_id) {
unique_ptr<ServerCompletionQueue> comp_queue=builder.AddCompletionQueue();
pthread_mutex_lock(&myLock);
while(!start)
pthread_cond_wait(&startRpcs,&myLock);
pthread_mutex_unlock(&myLock);
new ServerData(&service,comp_queue.get(),NEW);
new ServerData(&service,comp_queue.get(),INFORMSUCCESSOR);
new ServerData(&service,comp_queue.get(),INFORMPREDECESSOR);
new ServerData(&service,comp_queue.get(),GETSUCCESSOR);
new ServerData(&service,comp_queue.get(),GETPREDECESSOR);
new ServerData(&service,comp_queue.get(),UPDATETABLE);
void *tag;
bool ok;
while(true) {
GPR_ASSERT(comp_queue->Next(&tag,&ok));
GPR_ASSERT(ok);
static_cast<ServerData *>(tag)->Proceed();
}
return 0;
}
void *handleRpcs(void *thread_id) { void *handleRpcs(void *thread_id) {
unique_ptr<ServerCompletionQueue> comp_queue = builder.AddCompletionQueue(); unique_ptr<ServerCompletionQueue> comp_queue = builder.AddCompletionQueue();
pthread_mutex_lock(&myLock); pthread_mutex_lock(&myLock);
...@@ -168,6 +1202,8 @@ void *handleRpcs(void *thread_id) { ...@@ -168,6 +1202,8 @@ void *handleRpcs(void *thread_id) {
} }
void assignThreads(int num_threads) { void assignThreads(int num_threads) {
dist_worker_id=-1;
pthread_create(&dist_worker,NULL,handleServerRequests,(void *)&dist_worker_id);
workers = (pthread_t *)malloc(sizeof(pthread_t) * num_threads); workers = (pthread_t *)malloc(sizeof(pthread_t) * num_threads);
worker_id = (int *)malloc(sizeof(int) * num_threads); worker_id = (int *)malloc(sizeof(int) * num_threads);
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
...@@ -183,6 +1219,157 @@ void signalHandler(int signum) { ...@@ -183,6 +1219,157 @@ void signalHandler(int signum) {
exit(0); exit(0);
} }
void updateAllFingerTables() {
string target_address(DNS_SERVER);
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
Null null1;
null1.set_nothing(0);
Null null2;
ClientContext context;
Addresses addr;
stub->GETSERVERS(&context,null1,&addr);
string addresses=addr.addresses();
int num=addr.servers();
string ad[num];
int ids[num];
for(int i=0;i<num;i++) {
ad[i]=addresses.substr(0,addresses.find(';'));
ids[i]=stoi(ad[i].substr(ad[i].find(':')+1));
addresses=addresses.substr(addresses.find(';')+1);
}
ofstream fout;
fout.open(FINGER_TABLE);
int my_id=stoi(params.find("LISTENING_PORT")->second);
int prev_entry=my_id;
int i=0;
int my_index=0;
for(i=0;i<num;i++)
if(ids[i]==my_id) {
my_index=i;
break;
}
int fingernodes[num-1];
int count=0;
for(i=my_index+1;i<num;i++)
fingernodes[count++]=ids[i];
for(i=0;i<my_index;i++)
fingernodes[count++]=ids[i];
int curr=0;
for(i=0;i<16;i++) {
int next_entry=(my_id+(1<<i))%(1<<16);
if(curr!=count&&next_entry>ids[num-1]&&my_index!=0) {
fout<<ids[0]<<endl;
if(fingernodes[curr]!=ids[0]) {
for(int j=0;j<count;j++)
if(fingernodes[j]==ids[0])
curr=j;
}
}
else {
while(curr<count&&next_entry>fingernodes[curr])
curr++;
}
if(curr<count&&fingernodes[curr]>next_entry)
fout<<fingernodes[curr]<<endl;
if(curr==count)
fout<<"null"<<endl;
}
fout.close();
ClientContext context2;
stub->UPDATEFINGERTABLES(&context2,null1,&null2);
}
void register_server_DNS(string my_address) {
//cout<<"Registering to DNS"<<endl;
string target_address(DNS_SERVER);
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
Null null;
null.set_nothing(0);
Info info;
ClientContext context;
Status status=stub->GETADDRESS(&context,null,&info);
//cout<<"Address received:"<<info.address()<<endl;
string old_server;
if(status.ok()) {
old_server=info.address();
info.set_address(my_address);
ClientContext new_context;
//cout<<"Adding address to DNS"<<endl;
stub->ADDADDRESS(&new_context,info,&null);
//cout<<"Address added to DNS"<<endl;
ofstream fout;
//cout<<"Generating initial finger table"<<endl;
fout.open(FINGER_TABLE);
for(int i=0;i<16;i++)
fout<<"null"<<endl;
fout.close();
//cout<<"Initial finger table generated"<<endl;
if(old_server=="null") {
//cout<<"Initializing initial neighbours"<<endl;
fout.open(NEIGHBOURS);
fout<<"-1"<<endl;
fout<<"-1"<<endl;
fout.close();
//cout<<"Initialized initial neighbours"<<endl;
return;
}
channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
info.set_address(my_address);
SuccessorInfo successorInfo;
ClientContext context1;
//cout<<"Sending request to server: "<<old_server<<endl;
status=stub->NEW(&context1,info,&successorInfo);
//cout<<"Request sent. Successor and predecessor info received"<<endl;
if(status.ok()) {
string successor=successorInfo.succaddress();
string predecessor=successorInfo.predaddress();
ofstream fout;
//cout<<"Storing neighbours info"<<endl;
fout.open(NEIGHBOURS);
fout<<successor<<endl;
fout<<predecessor<<endl;
fout.close();
//cout<<"Successor: "<<successor<<endl;
//cout<<"Predecessor: "<<predecessor<<endl;
//cout<<"Stored neighbours info"<<endl;
channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
info.set_address(my_address);
KeyValues keyValues;
ClientContext context2;
//cout<<"Informing successor about my presence"<<endl;
status=stub->INFORMSUCCESSOR(&context2,info,&keyValues);
string keys=keyValues.keys();
string values=keyValues.values();
while(true) {
string key=keys.substr(0,keys.find(';'));
string value=values.substr(0,values.find(';'));
if(key.size()==0)
break;
if(keys.find(';')+1==keys.size())
break;
keys=keys.substr(keys.find(';')+1);
values=values.substr(values.find(';')+1);
memManager->put(key,value);
}
//cout<<"Informed succesor"<<endl;
channel=grpc::CreateChannel(predecessor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
info.set_address(my_address);
ClientContext context3;
//cout<<"Informing predecessor about my presence"<<endl;
status=stub->INFORMPREDECESSOR(&context3,info,&null);
//cout<<"Informed predecessor"<<endl;
updateAllFingerTables();
}
}
}
int main(int agrc, char **argv) { int main(int agrc, char **argv) {
pthread_mutex_init(&_masterLock, NULL); pthread_mutex_init(&_masterLock, NULL);
start = false; start = false;
...@@ -200,6 +1387,8 @@ int main(int agrc, char **argv) { ...@@ -200,6 +1387,8 @@ int main(int agrc, char **argv) {
else else
memManager = new storageLRU(stoi(params.find("CACHE_SIZE")->second)); memManager = new storageLRU(stoi(params.find("CACHE_SIZE")->second));
register_server_DNS(server_address);
setupServer(server_address); setupServer(server_address);
assignThreads(num_threads); assignThreads(num_threads);
sleep(1); sleep(1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment