Commit ce5b49e6 authored by mayankkakad's avatar mayankkakad

forwarding requests done(untested)

parent 31ac490a
......@@ -441,33 +441,576 @@ public:
} else if (status == PROCESS) {
new CallData(service, cq, reqType);
if (reqType == GET) {
int succ;
int key_id=hash(key.key());
int my_id=stoi(params.find("LISTENING_PORT")->second);
ifstream fin;
fin.open(NEIGHBOURS);
string pred;
getline(fin,pred);
getline(fin,pred);
fin.close();
int pred_id;
if(pred=="-1")
pred_id=-1;
else
pred_id=stoi(pred.substr(pred.find(':')+1));
if(my_id<key_id&&!(pred_id<key_id&&pred_id>my_id)) {
//transfer request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->GET(&cont1,key,&value);
}
else if(my_id==key_id||(my_id<key_id&&pred_id<key_id&&pred_id>my_id)) {
cout << "SERVER SERVES A GET REQUEST WITH PARAMETER KEY : " << key.key();
int status = 200;
pthread_mutex_lock(&_masterLock);
string v = memManager->get(&status, key.key());
pthread_mutex_unlock(&_masterLock);
value.set_value(v);
if (status == 200)
value.set_status(200);
else {
value.set_status(400);
value.set_error(v);
}
cout << " RETURN VALUE : " << value.value() << endl;
}
else {
if(pred_id==-1||pred_id<key_id) {
cout << "SERVER SERVES A GET REQUEST WITH PARAMETER KEY : " << key.key();
int status = 200;
pthread_mutex_lock(&_masterLock);
string v = memManager->get(&status, key.key());
string v = memManager->get(&status, key.key());
pthread_mutex_unlock(&_masterLock);
value.set_value(v);
if (status == 200)
value.set_status(200);
else {
value.set_status(400);
value.set_error(v);
}
cout << " RETURN VALUE : " << value.value() << endl;
}
else {
//transfer the request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->GET(&cont1,key,&value);
}
}
getResponder.Finish(value, Status::OK, this);
} else if (reqType == PUT) {
int succ;
int key_id=hash(keyvalue.key());
int my_id=stoi(params.find("LISTENING_PORT")->second);
ifstream fin;
fin.open(NEIGHBOURS);
string pred;
getline(fin,pred);
getline(fin,pred);
fin.close();
int pred_id;
if(pred=="-1")
pred_id=-1;
else
pred_id=stoi(pred.substr(pred.find(':')+1));
if(my_id<key_id&&!(pred_id<key_id&&pred_id>my_id)) {
//transfer request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->PUT(&cont1,keyvalue,&stat);
}
else if(my_id==key_id||(my_id<key_id&&pred_id<key_id&&pred_id>my_id)) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl;
pthread_mutex_lock(&_masterLock);
memManager->put(keyvalue.key(), keyvalue.value());
pthread_mutex_unlock(&_masterLock);
stat.set_status(200);
}
else {
if(pred_id==-1||pred_id<key_id) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl;
pthread_mutex_lock(&_masterLock);
memManager->put(keyvalue.key(), keyvalue.value());
pthread_mutex_unlock(&_masterLock);
stat.set_status(200);
}
else {
//transfer the request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->PUT(&cont1,keyvalue,&stat);
}
}
putResponder.Finish(stat, Status::OK, this);
} else {
int succ;
int key_id=hash(key.key());
int my_id=stoi(params.find("LISTENING_PORT")->second);
ifstream fin;
fin.open(NEIGHBOURS);
string pred;
getline(fin,pred);
getline(fin,pred);
fin.close();
int pred_id;
if(pred=="-1")
pred_id=-1;
else
pred_id=stoi(pred.substr(pred.find(':')+1));
if(my_id<key_id&&!(pred_id<key_id&&pred_id>my_id)) {
//transfer request
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->DEL(&cont1,key,&stat);
}
else if(my_id==key_id||(my_id<key_id&&pred_id<key_id&&pred_id>my_id)) {
cout << "SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : " << key.key() << endl;
int status = 200;
pthread_mutex_lock(&_masterLock);
memManager->del(&status, key.key());
pthread_mutex_unlock(&_masterLock);
value.set_value(v);
if (status == 200)
value.set_status(200);
stat.set_status(200);
else {
//as this server does not have the keyvalue pair, send request to next server using CHORD
value.set_status(400);
value.set_error(v);
stat.set_status(400);
stat.set_error("KEY NOT EXIST");
}
cout << " RETURN VALUE : " << value.value() << endl;
getResponder.Finish(value, Status::OK, this);
} else if (reqType == PUT) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl;
pthread_mutex_lock(&_masterLock);
memManager->put(keyvalue.key(), keyvalue.value());
pthread_mutex_unlock(&_masterLock);
stat.set_status(200);
putResponder.Finish(stat, Status::OK, this);
} else {
}
else {
if(pred_id==-1||pred_id<key_id) {
cout << "SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : " << key.key() << endl;
int status = 200;
......@@ -482,6 +1025,97 @@ public:
stat.set_status(400);
stat.set_error("KEY NOT EXIST");
}
}
else {
int fingers[16];
ifstream fin;
//cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE);
int nums=0;
do {
string temp;
getline(fin,temp);
if(temp=="null"||temp.size()==0)
break;
fingers[nums++]=stoi(temp);
}while(fin);
fin.close();
int node=-1;
int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
node=fingers[nums-1];
next=my_id;
}
else if(nums>0&&my_id<key_id&&fingers[0]>=key_id) {
node=my_id;
next=fingers[0];
}
else {
for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<key_id&&fingers[i]>=key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>key_id) {
node=fingers[i-1];
next=fingers[i];
break;
}
else if(i==nums-1) {
node=fingers[i];
next=my_id;
break;
}
}
}
string target_address("0.0.0.0:"+to_string(next));
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred;
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
}
if(mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ=x.id();
}
else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
//cout<<"Yes. We got the successor"<<endl;
succ=x.id();
}
string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext cont1;
stub->DEL(&cont1,key,&stat);
}
}
delResponder.Finish(stat, Status::OK, this);
}
/* --------------------------------CONTENT OF CACHE ONLY KEY-------------------------------- */
......@@ -492,6 +1126,9 @@ public:
delete this;
}
}
int hash(string s) {
return (((int)s.at(0))<<8)+((int)s.at(1));
}
private:
KeyValueServices::AsyncService *service;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment