Commit 178250d5 authored by mayankkakad's avatar mayankkakad

minor errors resolved, DNS calls added to server, client

parent 776dafd4
...@@ -42,7 +42,7 @@ include_directories("${CMAKE_CURRENT_BINARY_DIR}") ...@@ -42,7 +42,7 @@ include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# Targets (client|server) # Targets (client|server)
foreach(_target foreach(_target
client client_test server) client client_test server dns)
add_executable(${_target} "${_target}.cpp" add_executable(${_target} "${_target}.cpp"
${hw_proto_srcs} ${hw_proto_srcs}
${hw_grpc_srcs}) ${hw_grpc_srcs})
......
...@@ -13,6 +13,8 @@ using keyvaluestore::Value; ...@@ -13,6 +13,8 @@ using keyvaluestore::Value;
using keyvaluestore::KeyValue; using keyvaluestore::KeyValue;
using keyvaluestore::ReqStatus; using keyvaluestore::ReqStatus;
using keyvaluestore::KeyValueServices; using keyvaluestore::KeyValueServices;
using keyvaluestore::Info;
using keyvaluestore::Null;
std::map<std::string, std::string> params; std::map<std::string, std::string> params;
std::string config_filename = "../config"; std::string config_filename = "../config";
...@@ -150,7 +152,15 @@ void parse(std::string& str, std::string& cmd, std::string& key, std::string& va ...@@ -150,7 +152,15 @@ void parse(std::string& str, std::string& cmd, std::string& key, std::string& va
} }
void RunClient() { void RunClient() {
std::string target_address("0.0.0.0:"+params.find("LISTENING_PORT")->second); std::string dns_address("0.0.0.0:1234");
std::shared_ptr<Channel> channel=grpc::CreateChannel(dns_address, grpc::InsecureChannelCredentials());
std::unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
Null null;
null.set_nothing(0);
Info info;
ClientContext context;
Status status=stub->GETADDRESS(&context,null,&info);
std::string target_address(info.address());
// Instantiates the client // Instantiates the client
KeyValueServicesClient client( KeyValueServicesClient client(
// Channel from which RPCs are made - endpoint is the target_address // Channel from which RPCs are made - endpoint is the target_address
......
#include <bits/stdc++.h> #include <bits/stdc++.h>
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
#include<fstream>
#include "keyvaluestore.grpc.pb.h" #include "keyvaluestore.grpc.pb.h"
#define SERVERS "serverlist.txt"
using namespace std; using namespace std;
...@@ -21,13 +23,11 @@ std::unique_ptr<Server> server; ...@@ -21,13 +23,11 @@ std::unique_ptr<Server> server;
enum RequestType { enum RequestType {
GETADDRESS, GETADDRESS,
ADDADDRESS ADDADDRESS
} };
vector<string> servers;
class DNSData { class DNSData {
public: public:
DNSData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getAddressResponder(&context), status(CREATE), reqType(reqType) { DNSData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getAddressResponder(&context), addAddressResponder(&context), status(CREATE), reqType(reqType) {
Proceed(); Proceed();
} }
...@@ -42,30 +42,59 @@ public: ...@@ -42,30 +42,59 @@ public:
else if (status == PROCESS) { else if (status == PROCESS) {
new DNSData(service, cq, reqType); new DNSData(service, cq, reqType);
if(reqType==GETADDRESS) { if(reqType==GETADDRESS) {
if(servers.size()==0) ifstream fin;
int size=0;
map<int,string> servers;
fin.open(SERVERS);
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
servers[size++]=temp;
}while(fin);
fin.close();
if(size==0)
info.set_address("null"); info.set_address("null");
else { else {
int x=rand()%servers.size(); int x=rand()%size;
info.set_address(servers.at(x)); info.set_address(servers.find(x)->second);
} }
getAddressResponder.Finish(info,Status::OK,this); getAddressResponder.Finish(info,Status::OK,this);
} }
else { else {
servers.push_back(info.address()); ofstream fout;
null.set_nothing(1); fout.open(SERVERS,ios::app);
fout<<info.address()<<endl;
fout.close();
null.set_nothing(0);
addAddressResponder.Finish(null,Status::OK,this); addAddressResponder.Finish(null,Status::OK,this);
} }
status = FINISH; status = FINISH;
} }
else { else {
if(reqType==ADDADDRESS)
print_servers_list();
GPR_ASSERT(status == FINISH); GPR_ASSERT(status == FINISH);
delete this; delete this;
} }
print_servers_list();
} }
void print_servers_list() { void print_servers_list() {
for(int i=0;i<servers.size();i++) ifstream fin;
cout<<servers.at(i)<<endl; map<int,string> servers;
int size=0;
fin.open(SERVERS);
do {
string temp;
getline(fin,temp);
if(temp.size()==0)
break;
servers[size++]=temp;
}while(fin);
fin.close();
for(int i=0;i<size;i++)
cout<<servers.find(i)->second<<endl;
} }
private: private:
...@@ -92,7 +121,9 @@ int main(int argc,char **argv) { ...@@ -92,7 +121,9 @@ int main(int argc,char **argv) {
builder.RegisterService(&service); builder.RegisterService(&service);
unique_ptr<ServerCompletionQueue> comp_queue=builder.AddCompletionQueue(); unique_ptr<ServerCompletionQueue> comp_queue=builder.AddCompletionQueue();
server = builder.BuildAndStart(); server = builder.BuildAndStart();
new DNSData(&service,comp_queue.get()); cout<<"DNS SERVER COMES UP SUCCESSFULLY"<<endl;
new DNSData(&service,comp_queue.get(),GETADDRESS);
new DNSData(&service,comp_queue.get(),ADDADDRESS);
void *tag; void *tag;
bool ok; bool ok;
while(true) { while(true) {
......
...@@ -71,7 +71,7 @@ void getConfig() { ...@@ -71,7 +71,7 @@ void getConfig() {
class ServerData { class ServerData {
public: public:
ServerData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), newResponder(&context), informResponder(&context), status(CREATE), reqType(reqType) { ServerData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, ServerRequest reqType) : service(service), cq(cq), newResponder(&context), informResponder(&context), status(CREATE), reqType(reqType) {
Proceed(); Proceed();
} }
...@@ -280,21 +280,24 @@ void register_server_DNS(string my_address) { ...@@ -280,21 +280,24 @@ void register_server_DNS(string my_address) {
if(status.ok()) { if(status.ok()) {
old_server=info.address(); old_server=info.address();
info.set_address(my_address); info.set_address(my_address);
status=stub->ADDADDRESS(&context,info,&null); ClientContext new_context;
stub->ADDADDRESS(&new_context,info,&null);
if(old_server=="null") if(old_server=="null")
return; return;
channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
info.set_address(my_address); info.set_address(my_address);
SuccessorInfo successorInfo; SuccessorInfo successorInfo;
status=stub->NEW(&context,info,&successorInfo); ClientContext context1;
status=stub->NEW(&context1,info,&successorInfo);
if(status.ok()) { if(status.ok()) {
string successor=successorInfo.address(); string successor=successorInfo.address();
channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
info.set_address(my_address); info.set_address(my_address);
KeyValues keyValues; KeyValues keyValues;
status=stub->INFORM(&context,info,&keyValues); ClientContext context2;
status=stub->INFORM(&context2,info,&keyValues);
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment