Update repo

parent 6ca1c1e7
# Minimum CMake required
cmake_minimum_required(VERSION 3.5.1)
# Project
project(stringreverse)
#Here change common.cmake path acoriding to your installtion.
include(/Users/jaiminchauhan/grpc/examples/cpp/cmake/common.cmake)
# Proto file
get_filename_component(hw_proto "keyval.proto" ABSOLUTE)
get_filename_component(hw_proto_path "${hw_proto}" PATH)
# Generated sources
set(CMAKE_CXX_STANDARD 17)
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/keyval.pb.cc")
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/keyval.pb.h")
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/keyval.grpc.pb.cc")
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/keyval.grpc.pb.h")
add_custom_command(
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${hw_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${hw_proto}"
DEPENDS "${hw_proto}")
# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# Targets (client|server)
foreach(_target
client server)
add_executable(${_target} "${_target}.cc"
${hw_proto_srcs}
${hw_grpc_srcs}
"interface.h"
"interface2_new.cc")
target_link_libraries(${_target}
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
endforeach()
\ No newline at end of file
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include <memory>
#include <string>
#include <sstream>
#include<fstream>
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include <thread>
#include <sys/time.h>
#include <chrono>
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "keyval.grpc.pb.h"
#endif
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using keyval::KeyVal;
using keyval::GetRequest;
using keyval::GetReply;
using keyval::PutRequest;
using keyval::PutReply;
using keyval::DelRequest;
using keyval::DelReply;
class KeyValClient {
public:
explicit KeyValClient(std::shared_ptr<Channel> channel)
: stub_(KeyVal::NewStub(channel)) {}
// Assembles the client's payload and sends it to the server.
void GET(const std::string& user) {
// Data we are sending to the server.
GetRequest request;
request.set_key(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
call->type=0;
// stub_->PrepareAsyncSayHello() creates an RPC object, returning
// an instance to store in "call" but does not actually start the RPC
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader =
stub_->PrepareAsyncGET(&call->context, request, &cq_);
gettimeofday(&call->time, 0);
// StartCall initiates the RPC call
call->response_reader->StartCall();
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the memory address of the call
// object.
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
void PUT(const std::string& key,const std::string& val) {
// Data we are sending to the server.
PutRequest request;
request.set_key(key);
request.set_val(val);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
call->type=1;
// stub_->PrepareAsyncSayHello() creates an RPC object, returning
// an instance to store in "call" but does not actually start the RPC
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader_put =
stub_->PrepareAsyncPUT(&call->context, request, &cq_);
// StartCall initiates the RPC call
gettimeofday(&call->time, 0);
call->response_reader_put->StartCall();
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the memory address of the call
// object.
call->response_reader_put->Finish(&call->reply_put, &call->status_put, (void*)call);
}
void DEL(const std::string& key) {
// Data we are sending to the server.
DelRequest request;
request.set_key(key);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
// stub_->PrepareAsyncSayHello() creates an RPC object, returning
// an instance to store in "call" but does not actually start the RPC
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->type=2;
call->response_reader_del =
stub_->PrepareAsyncDEL(&call->context, request, &cq_);
// StartCall initiates the RPC call
gettimeofday(&call->time, 0);
call->response_reader_del->StartCall();
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the memory address of the call
// object.
call->response_reader_del->Finish(&call->reply_del, &call->status_del, (void*)call);
}
// Loop while listening for completed responses.
// Prints out the response from the server.
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call object
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
struct timeval end;
gettimeofday(&end, 0);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if(call->type==0){
if (call->status.ok())
{
if(call->reply.status()==200)
std::cout << "Get received: " << call->reply.val() << std::endl;
}
else
std::cout << call->reply.val() << std::endl;
}
else if(call->type==1){
if (call->status.ok())
{
if(call->reply_put.status()==200)
std::cout << "Put complete" << std::endl;
}
else
std::cout << call->reply_put.err() << std::endl;
}
else{
if (call->status.ok())
{
if(call->reply_del.status()==200)
std::cout << "Del complete" << std::endl;
}
else
std::cout << call->reply_del.err() << std::endl;
}
// Once we're complete, deallocate the call object.
float interv=(end.tv_sec - call->time.tv_sec)+ (end.tv_usec - call->time.tv_usec)*1e-6;
std::cout << "Time elapsed for this call is "<< interv << " secs \n";
delete call;
}
}
private:
// struct for keeping state and data information
struct AsyncClientCall {
// Container for the data we expect from the server.
int type;
struct timeval time;
GetReply reply;
PutReply reply_put;
DelReply reply_del;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// Storage for the status of the RPC upon completion.
Status status;
Status status_put;
Status status_del;
std::unique_ptr<ClientAsyncResponseReader<GetReply>> response_reader;
std::unique_ptr<ClientAsyncResponseReader<PutReply>> response_reader_put;
std::unique_ptr<ClientAsyncResponseReader<DelReply>> response_reader_del;
};
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr<KeyVal::Stub> stub_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq_;
};
void handlebatchmode(std::string filename,KeyValClient *kv){
std::fstream fin;
fin.open(filename,std::ios::in);
std::string line;
std::string command;
std::string key;
std::string value;
if(fin.is_open()){
while(getline(fin,line))
{
std::stringstream ss(line);
ss>>command;
ss>>key;
if(command.compare("PUT")==0){
ss>>value;
kv->PUT(key,value);
}
else if(command.compare("DEL")==0){
kv->DEL(key);
}
else if(command.compare("GET")==0){
kv->GET(key);
}
else{
std::cout << "Invalid command in file please check and rerun";
exit(-1);
}
}
}
else{
std::cout << "Error opening file\n";
exit(-1);
}
fin.close();
}
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint (in this case,
// localhost at port 50051). We indicate that the channel isn't authenticated
// (use of InsecureChannelCredentials()).
KeyValClient kv(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&KeyValClient::AsyncCompleteRpc, &kv);
// for (int i = 0; i < 10; i++) {
// std::string user("world " + std::to_string(i));
// kv.GET(user); // The actual RPC call!
// kv.PUT("abc","xyz");
// }
//looping infinetely
int mode;
std::string command;
std::string key;
std::string value;
std::string filename;
while (1)
{
std::cout << "Enter 1 for Batch mode and 2 for Interactive mode: \n";
std::cout << "Press control-c to quit" << std::endl << std::endl;
std::cin >> mode;
if(mode==1){
std::cout << "Enter filename: ";
std::cin >> filename;
handlebatchmode(filename,&kv);
}
else if(mode==2){
std::cout << "Enter Command (e.g GET, PUT, DEL): ";
std::cin >> command;
if(command.compare("DEL")==0){
std::cout << "Enter Key :\n";
std::cin >> key;
kv.DEL(key);
}
else if(command.compare("PUT")==0){
std::cout << "Enter Key & Value: ";
std::cin>>key>>value;
kv.PUT(key,value);
}
else if(command.compare("GET")==0){
std::cout << "Enter Key: ";
std::cin>>key;
kv.GET(key);
}
else{
std::cout<< "Invalid Command\n";
exit(-1);
}
}
else{
std::cout << "Invalid please Enter valid\n";
}
}
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); // blocks forever
return 0;
}
LISTENING_PORT=50051
CACHE_REPLACEMENT_TYPE=LRU
CACHE_SIZE=512
THREAD_POOL_SIZE=8
\ No newline at end of file
#include <chrono>
#include <thread>
#include <iostream>
#include <iterator>
#include <unistd.h>
#include <fstream>
#include <chrono>
#include <map>
#include <cstdio>
#include <fcntl.h>
#include <sstream>
#include <vector>
#include <algorithm>
#include <condition_variable>
#include <mutex>
#include <shared_mutex>
#include <memory>
#include <string>
#include <cctype>
using namespace std;
shared_timed_mutex _protect;
map<string, string> kv;
map<string, string> kv1;
//mutex(mu); //Global variable or place within class
condition_variable condition; //A signal that can be used to communicate between functions
map<string, string>::iterator itr1;
vector<string> file_name;
std::vector<std::mutex *> my_mutexes(26);
std::vector<shared_timed_mutex *> my_mutexes1(26);
//Initialize mutexes
vector<string> keyy;
map<string, string>::iterator itr;
map<string, string>::iterator it;
int change = 0;
class Cache_base
{
public:
virtual string getValue(string key) { return NULL; };
virtual void set(string key, string value){};
virtual void deletekey(string key){};
};
Cache_base *cache;
class LRUCache : public Cache_base
{
public:
class Node
{
public:
string key, value;
Node *left;
Node *right;
Node(string k, string v)
{
key = k;
value = v;
}
};
LRUCache(int sz)
{
size = sz;
head->right = tail;
tail->left = head;
tail->right = NULL;
}
Node *head = new Node("0", "0");
Node *tail = new Node("0", "0");
void addNode(string key, string value)
{
Node *temp = new Node(key, value);
temp->right = head->right;
temp->left = head;
(head->right)->left = temp;
head->right = temp;
}
void deleteNode(Node *node)
{
Node *prev = node->left;
Node *next = node->right;
prev->right = next;
next->left = prev;
delete node;
}
map<string, Node *> map;
int size;
string getValue(string key)
{
string value;
if (map.find(key) != map.end())
{
//Node* temp= cache;
//cout << "here 111 " << key << " \n";
Node *val = map[key];
map.erase(key);
value = val->value;
deleteNode(val);
addNode(key, value);
map[key] = head->right;
return value;
}
else
{
//should refer from file here and add value
//value=readfromfile(key);
//set(key,value);
value = "-1";
return value;
}
}
void deleteValues(string key)
{
if (map.find(key) != map.end())
{
Node *temp = map[key];
map.erase(key);
deleteNode(temp);
}
//call delete from file routine from here
}
void set(string key, string value)
{
if (map.find(key) != map.end())
{
Node *temp = map[key];
map.erase(key);
deleteNode(temp);
}
if (map.size() == size)
{
//cout << key <<" here" << "\n";
map.erase(tail->left->key);
deleteNode(tail->left);
}
addNode(key, value);
map[key] = head->right;
}
void getall()
{
Node *temp = head->right;
while (temp->right)
{
temp = temp->right;
}
}
};
class LFUCache : public Cache_base
{
public:
class Node
{
public:
string key, value;
unsigned int freq;
Node *left;
Node *right;
Node(string k, string v)
{
key = k;
value = v;
freq = 1;
}
};
class DLL
{
public:
Node *head, *tail;
int size;
DLL()
{
head = new Node("0", "0");
tail = new Node("0", "0");
head->right = tail;
tail->left = head;
tail->right = NULL;
size = 0;
}
void addNode(Node *temp)
{
if (size < 0)
return;
// Node *temp=new Node(key,value);
temp->right = head->right;
temp->left = head;
(head->right)->left = temp;
head->right = temp;
size++;
}
void deleteNode(Node *node)
{
Node *prev = node->left;
Node *next = node->right;
prev->right = next;
next->left = prev;
size--;
}
};
int size, minfreq;
map<string, Node *> nodes;
map<int, DLL *> freqs;
LFUCache(int sz)
{
size = sz;
minfreq = 0;
}
void update(Node *temp)
{
DLL *olddll = freqs[temp->freq];
olddll->deleteNode(temp);
if (temp->freq == minfreq && olddll->size == 0)
minfreq++;
if (olddll->size == 0)
{
freqs.erase(temp->freq);
delete olddll;
}
temp->freq++;
DLL *dll;
if (freqs.find(temp->freq) != freqs.end())
{
dll = freqs[temp->freq];
}
else
{
dll = new DLL();
}
dll->addNode(temp);
freqs[temp->freq] = dll;
}
string getValue(string key)
{
if (nodes.find(key) != nodes.end())
{
Node *temp = nodes[key];
update(temp);
return temp->value;
}
else
{
// return value from file;
//insert into cache
return "-1";
}
}
void set(string key, string value)
{
if (nodes.find(key) != nodes.end())
{
Node *temp = nodes[key];
temp->value = value;
update(temp);
}
else
{
Node *node = new Node(key, value);
if (nodes.size() == size)
{
//max size delete small node
DLL *mindll = freqs[minfreq];
nodes.erase(mindll->tail->left->key);
mindll->deleteNode(mindll->tail->left);
}
minfreq = 1;
nodes[key] = node;
DLL *mindll;
if (freqs.find(minfreq) != freqs.end())
{
mindll = freqs[minfreq];
}
else
{
mindll = new DLL();
}
mindll->addNode(node);
freqs[minfreq] = mindll;
}
}
int getMaxNext(int minfreq)
{
int min = -1;
for (map<int, DLL *>::iterator it = freqs.begin(); it != freqs.end(); ++it)
{
if (it->first > min && freqs[it->first]->size != 0)
{
min = it->first;
}
}
return min;
}
void deletekey(string key)
{
if (nodes.find(key) == nodes.end())
{
return;
}
else
{
Node *temp = nodes[key];
nodes.erase(key);
DLL *dll = freqs[temp->freq];
dll->deleteNode(temp);
if (dll->size == 0 && minfreq == temp->freq)
{
freqs.erase(temp->freq);
minfreq = getMaxNext(minfreq);
}
if (dll->size == 0)
freqs.erase(temp->freq);
delete temp;
}
}
};
void config_cache(int cachetype, int size)
{
if (cachetype == 1)
{
cache = new LFUCache(size);
}
else
{
cache = new LRUCache(size);
}
}
int put(string key, string value)
{
if (key.empty() || value.empty())
{
return 4; //key or value is empty
}
else
{
if (key.size() > 256 && value.size() > 256)
{
return 3; //key or value string byte is more than 256 byte
}
else
{
int index1 = 0;
index1 = tolower(key.at(0)) - 'a';
if (find(keyy.begin(), keyy.end(), key) != keyy.end())
{
string res = cache->getValue(key);
if (res == "-1")
{
unique_lock<shared_timed_mutex> w(*my_mutexes1[index1]);
std::ifstream inFile;
inFile.open(file_name[index1]); //open the input file
std::stringstream strStream;
strStream << inFile.rdbuf(); //read the file
std::string str = strStream.str(); //str holds the content of the file
int index = str.find(key);
int count = index + key.length();
int found = str.find('\n', count);
string r = str.substr(count + 1, found - count - 1);
str.replace(count + 1,found - count - 1, value);
ofstream fp(file_name[index1], std::ofstream::out | std::ofstream::trunc);
fp << str;
fp.flush();
cache->set(key, r);
}
else
{
cache->set(key, value);
int index1 = 0;
index1 = tolower(key.at(0)) - 'a';
unique_lock<shared_timed_mutex> w(*my_mutexes1[index1]);
std::ifstream inFile;
inFile.open(file_name[index1]); //open the input file
std::stringstream strStream;
strStream << inFile.rdbuf(); //read the file
std::string str = strStream.str(); //str holds the content of the file
int index = str.find(key);
int count = index + key.length();
int found = str.find('\n', count);
string r = str.substr(count + 1, found - count - 1);
str.replace(count + 1,found - count - 1, value);
ofstream fp(file_name[index1], std::ios_base::app | std::ofstream::trunc);
fp << str;
fp.flush();
fp.close();
}
}
else
{
keyy.push_back(key);
int index1 = 0;
index1 = tolower(key.at(0)) - 'a';
unique_lock<shared_timed_mutex> w(*my_mutexes1[index1]);
ofstream fp(file_name[index1], std::ios_base::app);
fp << key << "=" << value << endl;
fp.flush();
fp.close();
}
}
}
return 1;
}
int delete1(string key)
{
if (key.empty())
{
return 4; //key or value is empty
}
else
{
if (key.size() > 256)
{
return 3; //key or value string byte is more than 256 byte
}
else
{
//unique_lock<shared_timed_mutex> w(_protect);
//Do Stuff
remove(keyy.begin(), keyy.end(), key);
string res = cache->getValue(key);
if (res == "-1")
{
int index1 = 0;
index1 = tolower(key.at(0)) - 'a';
unique_lock<shared_timed_mutex> w(*my_mutexes1[index1]);
std::ifstream inFile;
inFile.open(file_name[index1]); //open the input file
if (inFile.is_open())
{
std::stringstream strStream;
strStream << inFile.rdbuf(); //read the file
std::string str = strStream.str(); //str holds the content of the file
int index = str.find(key);
int count = index + key.length();
int found = str.find('\n', count);
if (index == -1)
{
return 2; //key not found
}
string r = str.substr(count + 1, found - count - 1);
str.replace(index, key.length() + 2 + found - count - 1, "");
ofstream fp(file_name[index1],std::ofstream::out | std::ofstream::trunc);
fp << str;
fp.flush();
fp.close();
}
else
{
//cout << "File is not there\n";
}
}
else
{
cache->deletekey(key);
int index1 = 0;
index1 = tolower(key.at(0)) - 'a';
unique_lock<shared_timed_mutex> w(*my_mutexes1[index1]);
std::ifstream inFile;
inFile.open(file_name[index1]); //open the input file
std::stringstream strStream;
strStream << inFile.rdbuf(); //read the file
std::string str = strStream.str(); //str holds the content of the file
int index = str.find(key);
int count = index + key.length();
int found = str.find('\n', count);
string r = str.substr(count + 1, found - count - 1);
str.replace(index, key.length() + 2 + found - count - 1, "");
ofstream fp(file_name[index1], std::ofstream::out | std::ofstream::trunc);
fp << str;
fp.flush();
fp.close();
}
}
return 1;
}
}
string get12(string key)
{
if (key.empty())
{
return "4"; //key or value is empty
}
else
{
if (key.size() > 256)
{
return "3"; //key or value string byte is more than 256 byte
}
else
{
string res = cache->getValue(key);
if (res == "-1")
{
int index1 = 0;
index1 = tolower(key.at(0)) - 'a';
shared_lock<shared_timed_mutex> r(*my_mutexes1[index1]); //std::shared_lock will shared_lock() the mutex.
std::ifstream inFile;
inFile.open(file_name[index1]); //open the input file
std::stringstream strStream;
strStream << inFile.rdbuf(); //read the file
std::string str = strStream.str(); //str holds the content of the file
int index = str.find(key);
int count = index + key.length();
int found = str.find('\n', count);
if (index == -1)
{
return "2"; //key not found
}
else
{
string r = str.substr(count + 1, found - count - 1);
cache->set(key, r);
return r;
}
}
else
{
return res;
}
}
return "1";
}
}
void run()
{
for (int i = 0; i < 26; ++i)
{
my_mutexes[i] = new std::mutex();
}
for (int i = 0; i < 26; ++i)
{
my_mutexes1[i] = new std::shared_timed_mutex();
}
// Constructs the new thread and runs it. Does not block execution.
string res;
for (int i = 0; i < 26; i++)
{
string base = ".txt";
string dir_path = "./data/";
base.insert(0, 1, char('a' + i));
file_name.push_back(dir_path + base.c_str());
}
file_name.push_back("./data/extra.txt");
}
syntax = "proto3";
option java_package = "ex.grpc";
package keyval;
// Defines the service
service KeyVal {
// Function invoked to send the request
rpc GET (GetRequest) returns (GetReply) {}
rpc PUT (PutRequest) returns (PutReply) {}
rpc DEL (DelRequest) returns (DelReply) {}
}
// The request message containing requested numbers
message GetRequest {
string key = 1;
}
message GetReply {
string val = 2;
int32 status = 3;
}
message PutRequest {
string key = 4;
string val = 5;
}
message PutReply {
string err = 6;
int32 status = 7;
}
message DelRequest {
string key = 8;
}
message DelReply {
string err = 9;
int32 status = 10;
}
\ No newline at end of file
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <chrono>
#include <iostream>
#include <iterator>
#include <unistd.h>
#include <fstream>
#include <sstream>
#include <map>
#include <cstdio>
#include <fcntl.h>
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include <mutex>
#include <shared_mutex>
#include <algorithm>
#include "interface.h"
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "keyval.grpc.pb.h"
#endif
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using keyval::DelReply;
using keyval::DelRequest;
using keyval::GetReply;
using keyval::GetRequest;
using keyval::KeyVal;
using keyval::PutReply;
using keyval::PutRequest;
using namespace std;
int num_threads;
std::string port;
std::string cache_type;
int cache_size;
int cache_block;
struct CallData
{
KeyVal::AsyncService *service;
grpc::ServerCompletionQueue *cq;
};
class Call
{
public:
virtual void Proceed(bool ok) = 0;
};
class GetCall final : public Call
{
public:
explicit GetCall(CallData *data)
: data_(data), responder_(&ctx_), status_(REQUEST)
{
on_done = [&](bool ok)
{ OnDone(ok); };
proceed = [&](bool ok)
{ Proceed(ok); };
ctx_.AsyncNotifyWhenDone(&on_done);
data_->service->RequestGET(&ctx_, &request_, &responder_, data_->cq,
data_->cq, &proceed);
}
void Proceed(bool ok)
{
switch (status_)
{
case REQUEST:
if (!ok)
{
// Not ok in REQUEST means the server has been Shutdown
// before the call got matched to an incoming RPC.
delete this;
break;
}
new GetCall(data_);
//Impliment here
//Start
result = get12(request_.key());
if(result=="4")
{
response_.set_status(400);
response_.set_val("key or value is empty");
}
else if (result=="2")
{
response_.set_status(400);
response_.set_val("key not found");
}
else if(result=="3")
{
response_.set_status(400);
response_.set_val("key or value string byte is more than 256 byte");
}
else
{
response_.set_status(200);
response_.set_val(result);
}
//End
responder_.Finish(response_, grpc::Status::OK, &proceed);
status_ = FINISH;
break;
case FINISH:
finish_called_ = true;
if (on_done_called_)
delete this;
break;
}
}
void OnDone(bool ok)
{
assert(ok);
if (ctx_.IsCancelled())
std::cerr << ": Ping call cancelled" << std::endl;
on_done_called_ = true;
if (finish_called_)
delete this;
else
status_ = FINISH;
}
std::function<void(bool)> proceed;
std::function<void(bool)> on_done;
private:
std::string result;
CallData *data_;
grpc::ServerContext ctx_;
GetRequest request_;
GetReply response_;
ServerAsyncResponseWriter<GetReply> responder_;
enum CallStatus
{
REQUEST,
FINISH
};
CallStatus status_;
bool finish_called_ = false;
bool on_done_called_ = false;
};
class DelCall final : public Call
{
public:
explicit DelCall(CallData *data)
: data_(data), responder_(&ctx_), status_(REQUEST)
{
on_done = [&](bool ok)
{ OnDone(ok); };
proceed = [&](bool ok)
{ Proceed(ok); };
ctx_.AsyncNotifyWhenDone(&on_done);
data_->service->RequestDEL(&ctx_, &request_, &responder_, data_->cq,
data_->cq, &proceed);
}
void Proceed(bool ok)
{
switch (status_)
{
case REQUEST:
if (!ok)
{
// Not ok in REQUEST means the server has been Shutdown
// before the call got matched to an incoming RPC.
delete this;
break;
}
new DelCall(data_);
//Impliment here
//Start
result = delete1(request_.key());
if(result==1)
{
response_.set_status(200);
response_.set_err("Entry Deleted");
}
else if (result==2)
{
response_.set_status(400);
response_.set_err("key not found");
}
else if(result==3)
{
response_.set_status(400);
response_.set_err("key or value string byte is more than 256 byte");
}
else
{
response_.set_status(400);
response_.set_err("key or value is empty");
}
//End
responder_.Finish(response_, grpc::Status::OK, &proceed);
status_ = FINISH;
break;
case FINISH:
finish_called_ = true;
if (on_done_called_)
delete this;
break;
}
}
void OnDone(bool ok)
{
assert(ok);
if (ctx_.IsCancelled())
std::cerr << ": Ping call cancelled" << std::endl;
on_done_called_ = true;
if (finish_called_)
delete this;
else
status_ = FINISH;
}
std::function<void(bool)> proceed;
std::function<void(bool)> on_done;
private:
int result;
CallData *data_;
grpc::ServerContext ctx_;
DelRequest request_;
DelReply response_;
ServerAsyncResponseWriter<DelReply> responder_;
enum CallStatus
{
REQUEST,
FINISH
};
CallStatus status_;
bool finish_called_ = false;
bool on_done_called_ = false;
};
class PutCall final : public Call
{
public:
explicit PutCall(CallData *data)
: data_(data), responder_(&ctx_), status_(REQUEST)
{
on_done = [&](bool ok)
{ OnDone(ok); };
proceed = [&](bool ok)
{ Proceed(ok); };
ctx_.AsyncNotifyWhenDone(&on_done);
data_->service->RequestPUT(&ctx_, &request_, &responder_, data_->cq,
data_->cq, &proceed);
}
void Proceed(bool ok)
{
switch (status_)
{
case REQUEST:
{
if (!ok)
{
// Not ok in REQUEST means the server has been Shutdown
// before the call got matched to an incoming RPC.
delete this;
break;
}
new PutCall(data_);
//Impliment here
//Start
result = put(request_.key(),request_.val());
response_.set_status(200);
//End
responder_.Finish(response_, grpc::Status::OK, &proceed);
status_ = FINISH;
}
break;
case FINISH:
finish_called_ = true;
if (on_done_called_)
delete this;
break;
}
}
void OnDone(bool ok)
{
assert(ok);
if (ctx_.IsCancelled())
std::cerr << ": Ping call cancelled" << std::endl;
on_done_called_ = true;
if (finish_called_)
delete this;
else
status_ = FINISH;
}
std::function<void(bool)> proceed;
std::function<void(bool)> on_done;
private:
CallData *data_;
int result;
std::string key;
std::string value;
grpc::ServerContext ctx_;
PutRequest request_;
PutReply response_;
ServerAsyncResponseWriter<PutReply> responder_;
enum CallStatus
{
REQUEST,
FINISH
};
CallStatus status_;
bool finish_called_ = false;
bool on_done_called_ = false;
};
void task1(string msg);
class ServerImpl final
{
public:
~ServerImpl()
{
server_->Shutdown();
// Always shutdown the completion queue after the server.
for (int i = 0; i < num_threads; i++)
cqs_[i]->Shutdown();
migrate_cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run()
{
std::string server_address("0.0.0.0:" + port);
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
for (int i = 0; i < num_threads; i++)
cqs_.emplace_back(builder.AddCompletionQueue());
migrate_cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
void HandleRpcs()
{
// Spawn a new CallData instance to serve new clients.
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; i++)
{
CallData data{&service_, cqs_[i].get()};
new GetCall(&data);
new PutCall(&data);
new DelCall(&data);
threads.emplace_back(std::thread(&ServerImpl::ServeThread, this, i));
}
CallData migrate_data{&service_, migrate_cq_.get()};
void *tag; // uniquely identifies a request.
bool ok;
while (true)
{
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(migrate_cq_->Next(&tag, &ok));
auto proceed = static_cast<std::function<void(bool)> *>(tag);
(*proceed)(ok);
}
server_->Shutdown(std::chrono::system_clock::now());
for (auto thr = threads.begin(); thr != threads.end(); thr++)
thr->join();
}
void ServeThread(int i)
{
void *tag;
bool ok;
while (cqs_[i]->Next(&tag, &ok))
{
auto proceed = static_cast<std::function<void(bool)> *>(tag);
(*proceed)(ok);
}
}
std::vector<std::unique_ptr<ServerCompletionQueue>> cqs_;
std::unique_ptr<ServerCompletionQueue> migrate_cq_;
KeyVal::AsyncService service_;
std::unique_ptr<Server> server_;
};
void config()
{
std::ifstream cFile("config.txt");
if (cFile.is_open())
{
std::string line;
while (getline(cFile, line))
{
auto delimiterPos = line.find("=");
auto name = line.substr(0, delimiterPos);
auto value = line.substr(delimiterPos + 1);
if (name == "LISTENING_PORT")
{
port = value;
}
else if (name == "CACHE_REPLACEMENT_TYPE")
{
cache_type = value;
}
else if (name == "CACHE_SIZE")
{
cache_size = stoi(value);
}
else if (name == "THREAD_POOL_SIZE")
{
num_threads = stoi(value);
}
else
{
std::cerr << "Property is not valid.\n";
}
}
}
else
{
std::cerr << "Couldn't open config file for reading.\n";
}
}
int main(int argc, char **argv)
{
freopen("log.txt", "w", stderr);
config();
run();
cache_block = cache_size / 512;
if(cache_type=="LFU")
{
config_cache(1,cache_block);
}
else
{
config_cache(2,cache_block);
}
ServerImpl server;
server.Run();
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment