Commit 776dafd4 authored by mayankkakad's avatar mayankkakad

new server adding to DNS, getting a server IP address from DNS

parent cb2baf3d
......@@ -18,25 +18,54 @@ ServerBuilder builder;
KeyValueServices::AsyncService service;
std::unique_ptr<Server> server;
enum RequestType {
GETADDRESS,
ADDADDRESS
}
vector<string> servers;
class DNSData {
public:
DNSData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq) : service(service), cq(cq), getAddressResponder(&context), status(CREATE) {
DNSData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getAddressResponder(&context), status(CREATE), reqType(reqType) {
Proceed();
}
void Proceed() {
if (status == CREATE) {
status = PROCESS;
service->RequestGETADDRESS(&context, &null, &getAddressResponder, cq, cq, this);
if(reqType==GETADDRESS)
service->RequestGETADDRESS(&context, &null, &getAddressResponder, cq, cq, this);
else
service->RequestADDADDRESS(&context, &info, &addAddressResponder, cq, cq, this);
}
else if (status == PROCESS) {
new DNSData(service, cq);
new DNSData(service, cq, reqType);
if(reqType==GETADDRESS) {
if(servers.size()==0)
info.set_address("null");
else {
int x=rand()%servers.size();
info.set_address(servers.at(x));
}
getAddressResponder.Finish(info,Status::OK,this);
}
else {
servers.push_back(info.address());
null.set_nothing(1);
addAddressResponder.Finish(null,Status::OK,this);
}
status = FINISH;
}
else {
GPR_ASSERT(status == FINISH);
delete this;
}
print_servers_list();
}
void print_servers_list() {
for(int i=0;i<servers.size();i++)
cout<<servers.at(i)<<endl;
}
private:
......@@ -46,15 +75,18 @@ private:
Null null;
Info info;
ServerAsyncResponseWriter<Info> getAddressResponder;
ServerAsyncResponseWriter<Null> addAddressResponder;
enum CallStatus {
CREATE,
PROCESS,
FINISH
};
CallStatus status;
RequestType reqType;
};
int main(int argc,char **argv) {
srand(time(0));
string server_address("0.0.0.0:1234");
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
......
......@@ -11,6 +11,7 @@ service KeyValueServices {
rpc NEW(Info) returns (SuccessorInfo) {}
rpc INFORM(Info) returns (KeyValues) {}
rpc GETADDRESS(Null) returns (Info) {}
rpc ADDADDRESS(Info) returns (Null) {}
}
message Key {
......
......@@ -11,6 +11,8 @@ using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using keyvaluestore::Key;
using keyvaluestore::KeyValue;
......@@ -20,6 +22,7 @@ using keyvaluestore::Value;
using keyvaluestore::Info;
using keyvaluestore::SuccessorInfo;
using keyvaluestore::KeyValues;
using keyvaluestore::Null;
pthread_mutex_t _masterLock;
......@@ -263,6 +266,39 @@ void signalHandler(int signum) {
exit(0);
}
void register_server_DNS(string my_address) {
string target_address("0.0.0.0:1234");
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub;
stub=KeyValueServices::NewStub(channel);
Null null;
null.set_nothing(0);
Info info;
ClientContext context;
Status status=stub->GETADDRESS(&context,null,&info);
string old_server;
if(status.ok()) {
old_server=info.address();
info.set_address(my_address);
status=stub->ADDADDRESS(&context,info,&null);
if(old_server=="null")
return;
channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
info.set_address(my_address);
SuccessorInfo successorInfo;
status=stub->NEW(&context,info,&successorInfo);
if(status.ok()) {
string successor=successorInfo.address();
channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
info.set_address(my_address);
KeyValues keyValues;
status=stub->INFORM(&context,info,&keyValues);
}
}
}
int main(int agrc, char **argv) {
pthread_mutex_init(&_masterLock, NULL);
start = false;
......@@ -280,6 +316,8 @@ int main(int agrc, char **argv) {
else
memManager = new storageLRU(stoi(params.find("CACHE_SIZE")->second));
register_server_DNS(server_address);
setupServer(server_address);
assignThreads(num_threads);
sleep(1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment