Commit 8b6c7aef authored by Kamal Khodabhai's avatar Kamal Khodabhai

Update plots.pdf, CMD.txt, CMakeLists.txt, README.txt, Backend.h,...

Update plots.pdf, CMD.txt, CMakeLists.txt, README.txt, Backend.h, client_test.cpp, client.cpp, LFUNode.h, LFU.h, keyvaluestore.proto, LRU.h, server.cpp, LRUNode.h, config files
parent 8b5594fb
#include <bits/stdc++.h>
#include "LRU.h"
#include "LFU.h"
using namespace std;
class memoryManagement {
public:
virtual string get(int *a, string s) {
return "This will never run";
}
virtual void put(string a, string b) {
return;
}
virtual void del(int *a, string s) {
return;
}
virtual void traverse() {
return;
}
virtual void pushAll() {
return;
}
};
class storageLRU : public memoryManagement {
public:
LRUcache mycache;
storageLRU(int capacity) {
mycache.setCap(capacity);
}
string get(int *status, string key) {
string result = "";
int status2 = 200;
result = mycache.get(key, &status2);
*status = status2;
return result;
}
void put(string key, string val) {
mycache.put(key, val);
}
void del(int *status, string key) {
int status2 = 200;
mycache.del(key, &status2);
*status = status2;
}
void traverse() {
mycache.traverse();
}
void pushAll() {
mycache.pushAll();
}
};
class storageLFU : public memoryManagement {
public:
LFUCache mycache;
storageLFU(int capacity) {
mycache.setCap(capacity);
}
string get(int *status, string key) {
string result = "";
int status2 = 200;
result = mycache.GET(key, &status2);
*status = status2;
return result;
}
void put(string key, string val) {
mycache.PUT(key, val);
}
void del(int *status, string key) {
int status2 = 200;
mycache.DEL(key, &status2);
*status = status2;
}
void traverse() {
mycache.traverse();
}
void pushAll() {
mycache.pushAll();
}
};
\ No newline at end of file
This diff is collapsed.
# Minimum CMake required
cmake_minimum_required(VERSION 3.15)
# Project
project(keyvaluestore)
# Protobuf
set(protobuf_MODULE_COMPATIBLE TRUE)
find_package(Protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${protobuf_VERSION}")
# Protobuf-compiler
set(_PROTOBUF_PROTOC $<TARGET_FILE:protobuf::protoc>)
# gRPC
find_package(gRPC CONFIG REQUIRED)
message(STATUS "Using gRPC ${gRPC_VERSION}")
set(_GRPC_GRPCPP gRPC::grpc++)
set(_GRPC_CPP_PLUGIN_EXECUTABLE $<TARGET_FILE:gRPC::grpc_cpp_plugin>)
# Proto file
get_filename_component(hw_proto "keyvaluestore.proto" ABSOLUTE)
get_filename_component(hw_proto_path "${hw_proto}" PATH)
# Generated sources
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/keyvaluestore.pb.cc")
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/keyvaluestore.pb.h")
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/keyvaluestore.grpc.pb.cc")
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/keyvaluestore.grpc.pb.h")
add_custom_command(
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${hw_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${hw_proto}"
DEPENDS "${hw_proto}")
# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# Targets (client|server)
foreach(_target
client client_test server)
add_executable(${_target} "${_target}.cpp"
${hw_proto_srcs}
${hw_grpc_srcs})
target_link_libraries(${_target}
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
endforeach()
#include <bits/stdc++.h>
#include <fstream>
#include "LFUNode.h"
#define CACHE_FULL "Cache Full"
#define INSERT_SUCCESS "Insert Success"
#define CACHE_EMPTY "Cache Empty"
#define DELETE_SUCCESS "Delete Success"
#define KEY_NOT_FOUND "Key not found"
#define CAPACITY 5
using namespace std;
class LFUCache {
private:
int capacity;
LFUNode **cacheHeap;
int curr_pos;
std::mutex mtx;
string deleted;
int parent(int i) {
if (i % 2 == 0)
return (i / 2) - 1;
else
return (i - 1) / 2;
}
int left_child(int i) {
return (2 * i) + 1;
}
int right_child(int i) {
return (2 * i) + 2;
}
int exists(string key) {
for (int i = curr_pos; i >= 0; i--)
if (key.compare(cacheHeap[i]->key) == 0)
return i;
return -1;
}
void swap(int i, int j) {
cacheHeap[i]->frequency = cacheHeap[i]->frequency + cacheHeap[j]->frequency;
cacheHeap[j]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency;
cacheHeap[i]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency;
string temp = cacheHeap[i]->key;
cacheHeap[i]->key = cacheHeap[j]->key;
cacheHeap[j]->key = temp;
temp = cacheHeap[i]->value;
cacheHeap[i]->value = cacheHeap[j]->value;
cacheHeap[j]->value = temp;
}
void heapify_up(int i) {
while (true) {
if (i == 0)
break;
if (cacheHeap[parent(i)]->frequency <= cacheHeap[i]->frequency)
break;
else
swap(i, parent(i));
i = parent(i);
}
}
void heapify_down(int i) {
while (true) {
if (left_child(i) > curr_pos)
break;
if (right_child(i) > curr_pos && cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency)
break;
if (right_child(i) <= curr_pos) {
if (cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency && cacheHeap[right_child(i)]->frequency >= cacheHeap[i]->frequency)
break;
}
if (right_child(i) <= curr_pos) {
if (cacheHeap[left_child(i)]->frequency < cacheHeap[right_child(i)]->frequency)
swap(i, left_child(i));
else
swap(i, right_child(i));
}
else
swap(i, left_child(i));
}
}
string insert(string key, string value) {
int i = exists(key);
if (i != -1) {
cacheHeap[i]->frequency++;
cacheHeap[i]->value = value;
heapify_down(i);
return INSERT_SUCCESS;
}
curr_pos++;
if (curr_pos == capacity) {
curr_pos--;
delete_min(true);
insert(key, value);
return INSERT_SUCCESS;
}
cacheHeap[curr_pos]->key = key;
cacheHeap[curr_pos]->value = value;
cacheHeap[curr_pos]->frequency = 1;
heapify_up(curr_pos);
return INSERT_SUCCESS;
}
string delete_min(bool keep) {
if (curr_pos == -1)
return CACHE_EMPTY;
swap(0, curr_pos);
curr_pos--;
if (curr_pos >= 0)
heapify_down(0);
if (keep) {
string filename = getFilename(cacheHeap[curr_pos + 1]->key);
ofstream fout;
fout.open(filename, ios::app);
fout << cacheHeap[curr_pos + 1]->key << "\n";
fout << cacheHeap[curr_pos + 1]->value << "\n";
fout.close();
}
return DELETE_SUCCESS;
}
void insertAll(unordered_map<string, string> flush) {
int to_empty = flush.size() - capacity + curr_pos + 1;
for (int i = 0; i < to_empty; i++)
delete_min(true);
unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end(); itr++)
insert(itr->first, itr->second);
}
string getNodeByKey(string key) {
string value;
int i = exists(key);
if (i == -1)
return KEY_NOT_FOUND;
else {
cacheHeap[i]->frequency++;
value = cacheHeap[i]->value;
heapify_down(i);
return value;
}
}
string deleteNodeByKey(string key) {
int i = exists(key);
if (i == -1)
return KEY_NOT_FOUND;
else {
cacheHeap[i]->frequency = 0;
heapify_up(i);
delete_min(false);
string filename = getFilename(key);
ofstream fout;
fout.open(filename, ios::app);
fout << key << "\n";
fout << deleted << "\n";
fout.close();
return DELETE_SUCCESS;
}
}
string getFilename(string key) {
string filename = ".";
int length = key.size();
if (length == 1)
return "_.kvm";
filename += char(length - 1);
filename += key;
filename.pop_back();
filename += ".kvm";
return filename;
}
bool fileExists(string filename) {
ifstream f(filename.c_str());
return f.good();
}
public:
LFUCache() {
curr_pos = -1;
deleted = "";
deleted += char(0);
}
void setCap(int capacity) {
this->capacity = capacity;
cacheHeap = (LFUNode **)malloc(sizeof(LFUNode *) * capacity);
for(int i = 0; i < capacity; i++)
cacheHeap[i] = new LFUNode(deleted, deleted);
}
string GET(string key, int *status) {
//std::lock_guard<std::mutex> guard(mtx);
string value = getNodeByKey(key);
if (value.compare(KEY_NOT_FOUND) != 0) {
*status = 200;
return value;
}
string filename = getFilename(key);
if (!fileExists(filename)) {
*status = 400;
return KEY_NOT_FOUND;
}
string key1, val;
ifstream fin;
unordered_map<string, string> flush;
fin.open(filename);
do {
getline(fin, key1);
if (key1.size() == 0)
break;
getline(fin, val);
if (val.compare(deleted) != 0)
flush[key1] = val;
else
flush.erase(key1);
if (fin.eof())
break;
} while (fin);
fin.close();
insertAll(flush);
ofstream fout;
fout.open(filename);
fout.close();
value = getNodeByKey(key);
if (value.compare(KEY_NOT_FOUND) == 0)
*status = 400;
else
*status = 200;
return value;
}
void PUT(string key, string value) {
//mtx.lock();
insert(key, value);
//mtx.unlock();
}
void DEL(string key, int *status) {
int status1;
string value = GET(key, &status1);
if (status1 == 400) {
*status = 400;
return;
}
deleteNodeByKey(key);
}
void pushAll() {
while (curr_pos >= 0)
delete_min(true);
}
void reformat(string filename) {
ifstream fin;
fin.open(filename);
unordered_map<string, string> flush;
string key, value;
do {
getline(fin, key);
if (key.size() == 0)
break;
getline(fin, value);
if (value.compare(deleted) == 0)
flush.erase(key);
else
flush[key] = value;
if (fin.eof())
break;
} while (fin);
fin.close();
ofstream fout;
fout.open(filename);
unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end(); itr++) {
fout << itr->first << "\n";
fout << itr->second << "\n";
}
}
void traverse() {
for (int i = 0; i <= curr_pos; i++)
cout << cacheHeap[i]->key << endl;
cout << endl;
}
};
#include<bits/stdc++.h>
using namespace std;
class LFUNode {
public:
string key,value;
int frequency;
LFUNode(string key,string value) {
this->key=key;
this->value=value;
frequency=1;
}
};
#include <bits/stdc++.h>
#include "LRUNode.h"
#include <sys/stat.h>
#define NOT_EXIST "KEY NOT EXIST"
using namespace std;
inline bool fileExists(const std::string &name) {
struct stat buffer;
return (stat(name.c_str(), &buffer) == 0);
}
string getFilename(string key) {
string filename = ".";
int length = key.size();
if (length == 1) {
return "_.kvm";
}
filename += char(length - 1);
filename += key;
filename.pop_back();
filename += ".kvm";
return filename;
}
class LRUcache {
private:
int capacity;
string deleted;
Node *head;
Node *tail;
std::mutex mtx;
unordered_map<string, Node *> cache;
public:
LRUcache() {
head = new Node("HEAD", "HEAD");
tail = new Node("TAIL", "TAIL");
head->next = tail;
tail->prev = head;
deleted = "";
deleted += char(0);
}
void setCap(int capacity) {
this->capacity = capacity;
}
LRUcache(int capacity) {
this->capacity = capacity;
head = new Node("HEAD", "HEAD");
tail = new Node("TAIL", "TAIL");
head->next = tail;
tail->prev = head;
deleted = "";
deleted += char(0);
}
Node *getUtil(string key) {
if (cache.find(key) == cache.end())
return NULL;
Node *x = cache[key];
x->prev->next = x->next;
x->next->prev = x->prev;
x->next = head->next;
x->next->prev = x;
head->next = x;
x->prev = head;
return x;
}
string get(string key, int *status) {
//std::lock_guard<std::mutex> guard(mtx);
Node *x = getUtil(key);
if (x) {
if (x->payload == deleted) {
*status = 400;
return NOT_EXIST;
}
return x->payload;
}
string fileName = getFilename(key);
if (!fileExists(fileName)) {
*status = 400;
return NOT_EXIST;
}
ifstream fin;
fin.open(fileName);
string _key, val;
unordered_map<string, string> flush;
do {
getline(fin, _key);
if (_key.size() == 0)
break;
getline(fin, val);
if (val != deleted)
flush[_key] = val;
else
flush.erase(_key);
if (fin.eof())
break;
} while (fin);
fin.close();
unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end() && cache.size() < capacity - 1; itr++) {
this->put(itr->first, itr->second);
flush[itr->first] = deleted;
}
if (flush.find(key) != flush.end()) {
this->put(key, flush[key]);
flush[key] = deleted;
}
const char *c = fileName.c_str();
remove(c);
ofstream fout;
fout.open(fileName);
for (itr = flush.begin(); itr != flush.end(); itr++) {
if (itr->second == deleted)
continue;
fout << itr->first << "\n";
fout << itr->second << "\n";
}
fout.close();
x = getUtil(key);
if (x) {
return x->payload;
}
*status = 400;
return NOT_EXIST;
}
void del(string key, int *status) {
int status2 = 200;
string result = get(key, &status2);
// mtx.lock();
if (status2 == 400) {
*status = 400;
// mtx.unlock();
return;
}
this->put(key, deleted);
// mtx.unlock();
}
void pushAll() {
unordered_map<string, Node *>::iterator itr;
for (itr = cache.begin(); itr != cache.end(); itr++) {
string fileName = getFilename(itr->first);
ofstream fout;
fout.open(fileName, ios::app);
fout << itr->first << "\n";
fout << itr->second->payload << "\n";
fout.close();
delete (itr->second);
}
cache.clear();
head->next = tail;
tail->prev = head;
}
void put(string key, string payload) {
//mtx.lock();
if (getUtil(key) != NULL)
cache[key]->payload = payload;
else {
if (cache.size() == capacity) {
Node *x = tail->prev;
string keyToBeFlushed = x->key;
cache.erase(keyToBeFlushed);
cache[key] = x;
// write to the file
ofstream fout;
string fileName = getFilename(keyToBeFlushed);
fout.open(fileName, ios::app);
fout << x->key << "\n";
fout << x->payload << "\n";
fout.close();
x->key = key;
x->payload = payload;
x->prev->next = x->next;
x->next->prev = x->prev;
x->next = head->next;
x->next->prev = x;
head->next = x;
x->prev = head;
} else {
Node *x = new Node(key, payload);
cache[key] = x;
x->next = head->next;
x->next->prev = x;
head->next = x;
x->prev = head;
}
}
//mtx.unlock();
}
void traverse() {
Node *temp = head->next;
while (temp->next) {
cout << temp->key << "\n";
temp = temp->next;
}
cout << "\n";
}
};
#include <iostream>
using namespace std;
class Node {
public:
string key, payload;
Node *next;
Node *prev;
Node(string key, string val) {
this->key = key;
this->payload = val;
this->next = this->prev = NULL;
}
};
213050001 KAKADIYA KAMAL KHODABHAI
213050026 VISHAL VISHNU SAHA
213050028 MAYANK MANOJ KAKAD
To build :-
1. create bulid folder in project directory
2. open terminal in build folder
3. run command "cmake .." in build folder
4. then, run command "make" in build folder
To run :-
For server :-
- run command in build folder "./server"
For client :-
- run command in build folder "./client"
\ No newline at end of file
#include <vector>
#include <bits/stdc++.h>
#include <grpcpp/grpcpp.h>
#include <string>
#include "keyvaluestore.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using keyvaluestore::Key;
using keyvaluestore::Value;
using keyvaluestore::KeyValue;
using keyvaluestore::ReqStatus;
using keyvaluestore::KeyValueServices;
std::map<std::string, std::string> params;
std::string config_filename = "../config";
void getConfig() {
std::string line;
std::ifstream config(config_filename);
while (getline(config, line)) {
char temp[line.length()];
strcpy(temp, line.c_str());
char *token1 = strtok(temp, "=");
char *token2 = strtok(NULL, "-");
params[token1] = token2;
}
config.close();
}
class KeyValueServicesClient {
public:
KeyValueServicesClient(std::shared_ptr<Channel> channel)
: stub_(KeyValueServices::NewStub(channel)) {}
// Assembles client payload, sends it to the server, and returns its response
std::string GET(std::string key) {
// Data to be sent to server
Key request;
request.set_key(key);
// Container for server response
Value reply;
// Context can be used to send meta data to server or modify RPC behaviour
ClientContext context;
// Actual Remote Procedure Call
Status status = stub_->GET(&context, request, &reply);
// Returns results based on RPC status
if (status.ok()) {
if(reply.value()=="Key not found")
std::cout<<"Status : 400"<<std::endl;
else
std::cout<<"Status : 200"<<std::endl;
return reply.value();
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return "RPC Failed";
}
}
int PUT(std::string key, std::string value) {
// Data to be sent to server
KeyValue request;
request.set_key(key);
request.set_value(value);
// Container for server response
ReqStatus reply;
// Context can be used to send meta data to server or modify RPC behaviour
ClientContext context;
// Actual Remote Procedure Call
Status status = stub_->PUT(&context, request, &reply);
// Returns results based on RPC status
if (status.ok()) {
if(reply.status()==400)
std::cout<<"Error: "<<reply.error()<<std::endl;
return reply.status();
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return -1;
}
}
int DEL(std::string key) {
// Data to be sent to server
Key request;
request.set_key(key);
// Container for server response
ReqStatus reply;
// Context can be used to send meta data to server or modify RPC behaviour
ClientContext context;
// Actual Remote Procedure Call
Status status = stub_->DEL(&context, request, &reply);
// Returns results based on RPC status
if (status.ok()) {
if(reply.status()==400)
std::cout<<"Error: "<<reply.error()<<std::endl;
return reply.status();
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return -1;
}
}
private:
std::unique_ptr<KeyValueServices::Stub> stub_;
};
void parse(std::string& str, std::string& cmd, std::string& key, std::string& value) {
int i = 0;
cmd = key = value = "";
while(i < str.length() && str[i] == ' ')
i++;
for( ; i < str.length(); i++) {
if(str[i] != ' ')
cmd += str[i];
else break;
}
while(i < str.length() && str[i] == ' ')
i++;
for( ; i < str.length(); i++) {
if(str[i] != ' ')
key += str[i];
else break;
}
while(i < str.length() && str[i] == ' ')
i++;
for( ; i < str.length(); i++) {
if(str[i] != ' ')
value += str[i];
else break;
}
}
void RunClient() {
std::string target_address("0.0.0.0:"+params.find("LISTENING_PORT")->second);
// Instantiates the client
KeyValueServicesClient client(
// Channel from which RPCs are made - endpoint is the target_address
grpc::CreateChannel(target_address,
// Indicate when channel is not authenticated
grpc::InsecureChannelCredentials()));
// std::string response;
// std::string key = "KAMAL";
// std::string value = "+91 98254 00172";
// // RPC is created and response is stored
// client.PUT(key, value);
// response = client.GET(key);
// // Prints results
// std::cout << "Value string: " << response << std::endl;
std::string cmd, key, value;
/*std::cout<<"GET: "<<client.GET("a")<<std::endl;
std::cout<<"PUT: "<<client.PUT("a","b")<<std::endl;
std::cout<<"DEL: "<<client.DEL("a")<<std::endl;*/
while(1) {
int choice;
std::cout<<std::endl<<"1. Batch Mode"<<std::endl<<"2. Interactive Mode"<<std::endl<<"3. exit"<<std::endl;
std::cout<<std::endl<<"Enter Your choice : ";
std::cin>>choice;
if(choice == 1) {
std::string file_name;
std::cout<<"Enter the file name : ";
std::cin>>file_name;
std::ifstream file(file_name);
std::string str;
double minTime = INT_MAX, maxTime = 0, avgTime = 0, lineCount = 0;
clock_t begin, end;
while (std::getline(file, str)) {
parse(str, cmd, key, value);
if(cmd == "GET") {
begin = clock();
std::string response = client.GET(key);
end = clock();
std::cout<<"Response from Server: "<<response<<std::endl;
} else if(cmd == "DEL") {
begin = clock();
int response = client.DEL(key);
std::cout<<"Status : "<<response<<std::endl;
end = clock();
} else if(cmd == "PUT") {
begin = clock();
int response = client.PUT(key, value);
std::cout<<"Status : "<<response<<std::endl;
end = clock();
}
lineCount++;
minTime = std::min((double(end - begin) / CLOCKS_PER_SEC*1000), minTime);
maxTime = std::max((double(end - begin) / CLOCKS_PER_SEC*1000), maxTime);
avgTime += (double(end - begin) / CLOCKS_PER_SEC*1000);
}
if(lineCount == 0)
minTime = avgTime = maxTime = 0;
else
avgTime /= lineCount;
std::cout<<std::endl<<"Average Response In: "<<avgTime<<" ms"<<std::endl;
std::cout<<"Minimum Response In: "<<minTime<<" ms"<<std::endl;
std::cout<<"Maximum Response In: "<<maxTime<<" ms"<<std::endl;
} else if(choice == 2) {
std::cout<<"Enter the command : ";
std::cin>>cmd;
if(cmd == "GET") {
std::cout<<"Enter the key : ";
std::cin>>key;
// std::cout<<cmd<<"-"<<key<<"-"<<std::endl;
clock_t begin = clock();
std::string response = client.GET(key);
clock_t end = clock();
std::cout<<"Response from Server : "<<response<<std::endl;
std::cout<<"Response In: "<<double(end - begin) / CLOCKS_PER_SEC*1000<<" ms"<<std::endl;
} else if(cmd == "DEL") {
std::cout<<"Enter the key : ";
std::cin>>key;
clock_t begin = clock();
int response = client.DEL(key);
std::cout<<"Status : "<<response<<std::endl;
clock_t end = clock();
std::cout<<"Response In: "<<double(end - begin) / CLOCKS_PER_SEC*1000<<" ms"<<std::endl;
} else if(cmd == "PUT") {
std::cout<<"Enter the key : ";
std::cin>>key;
std::cout<<"Enter the value : ";
std::cin>>value;
// std::cout<<cmd<<"-"<<key<<"-"<<value<<std::endl;
clock_t begin = clock();
int response = client.PUT(key, value);
std::cout<<"Status : "<<response<<std::endl;
clock_t end = clock();
std::cout<<"Response In: "<<double(end - begin) / CLOCKS_PER_SEC*1000<<" ms"<<std::endl;
}
} else if(choice == 3) {
exit(-1);
}
else {
}
}
}
int main(int argc, char* argv[]) {
getConfig();
RunClient();
return 0;
}
#include <vector>
#include <bits/stdc++.h>
#include <grpcpp/grpcpp.h>
#include <string>
#include "keyvaluestore.grpc.pb.h"
using namespace std;
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using keyvaluestore::Key;
using keyvaluestore::KeyValue;
using keyvaluestore::KeyValueServices;
using keyvaluestore::ReqStatus;
using keyvaluestore::Value;
std::map<std::string, std::string> params;
std::string config_filename = "../config";
int requests;
void getConfig() {
std::string line;
std::ifstream config(config_filename);
while (getline(config, line)) {
char temp[line.length()];
strcpy(temp, line.c_str());
char *token1 = strtok(temp, "=");
char *token2 = strtok(NULL, "-");
params[token1] = token2;
}
config.close();
}
class KeyValueServicesClient {
public:
KeyValueServicesClient(std::shared_ptr<Channel> channel)
: stub_(KeyValueServices::NewStub(channel)) {}
// Assembles client payload, sends it to the server, and returns its response
std::string GET(std::string key)
{
// Data to be sent to server
Key request;
request.set_key(key);
// Container for server response
Value reply;
// Context can be used to send meta data to server or modify RPC behaviour
ClientContext context;
// Actual Remote Procedure Call
Status status = stub_->GET(&context, request, &reply);
// Returns results based on RPC status
if (status.ok())
{
return reply.value();
}
else
{
return "RPC Failed";
}
}
int PUT(std::string key, std::string value)
{
// Data to be sent to server
KeyValue request;
request.set_key(key);
request.set_value(value);
// Container for server response
ReqStatus reply;
// Context can be used to send meta data to server or modify RPC behaviour
ClientContext context;
// Actual Remote Procedure Call
Status status = stub_->PUT(&context, request, &reply);
// Returns results based on RPC status
if (status.ok())
{
if (reply.status() == 400)
// std::cout<<"Error: "<<reply.error()<<std::endl;
return reply.status();
}
else
{
return -1;
}
// std::cout<<"200"<<std::endl;
return 0;
}
int DEL(std::string key)
{
// Data to be sent to server
Key request;
request.set_key(key);
// Container for server response
ReqStatus reply;
// Context can be used to send meta data to server or modify RPC behaviour
ClientContext context;
// Actual Remote Procedure Call
Status status = stub_->DEL(&context, request, &reply);
// Returns results based on RPC status
if (status.ok())
{
if (reply.status() == 400)
return reply.status();
}
else
{
return -1;
}
}
private:
std::unique_ptr<KeyValueServices::Stub> stub_;
};
std::string gen_random(const int len)
{
std::string tmp_s;
string alphanum =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
tmp_s.reserve(len);
for (int i = 0; i < len; ++i)
tmp_s += alphanum[rand() % alphanum.size()];
return tmp_s;
}
int main(int argc, char *argv[])
{
srand((unsigned)time(NULL) * getpid());
requests = stoi(argv[1]);
vector<string> keys(requests);
double total = 0;
std::string target_address("0.0.0.0:50051");
KeyValueServicesClient client(
grpc::CreateChannel(target_address,
grpc::InsecureChannelCredentials()));
for (int i = 0; i < requests; i++)
{
std::string key = gen_random(30);
keys[i] = key;
std::string value = "Kamal Vishal Mayank kvm are the author";
clock_t begin = clock();
client.PUT(key, value);
clock_t end = clock();
total += (double(end - begin) / CLOCKS_PER_SEC * 1000);
}
cout << total / requests << " ";
total = 0;
for (string key : keys)
{
clock_t begin = clock();
client.GET(key);
clock_t end = clock();
total += (double(end - begin) / CLOCKS_PER_SEC * 1000);
}
cout << total / requests << "\n";
return 0;
}
LISTENING_PORT=50051
CACHE_REPLACEMENT_TYPE=LRU
CACHE_SIZE=256
NUM_SERVER_THREADS=4
syntax = "proto3";
package keyvaluestore;
// The key value store service definition.
service KeyValueServices {
// Function invoked to send the request
rpc GET(Key) returns (Value) {}
rpc PUT(KeyValue) returns (ReqStatus) {}
rpc DEL(Key) returns (ReqStatus) {}
}
message Key {
string key = 1;
}
message Value {
string value = 1;
int32 status = 2;
string error = 3;
}
message KeyValue {
string key = 1;
string value = 2;
}
message ReqStatus {
int32 status = 1;
string error = 2;
}
File added
#include <bits/stdc++.h>
#include <pthread.h>
#include <grpcpp/grpcpp.h>
#include "keyvaluestore.grpc.pb.h"
#include "../Backend.h"
using namespace std;
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using keyvaluestore::Key;
using keyvaluestore::KeyValue;
using keyvaluestore::KeyValueServices;
using keyvaluestore::ReqStatus;
using keyvaluestore::Value;
pthread_mutex_t _masterLock;
enum RequestType {
GET,
PUT,
DEL
};
map<string, string> params;
string config_filename = "../config";
string log_file = "../log";
ServerBuilder builder;
KeyValueServices::AsyncService service;
std::unique_ptr<Server> server;
pthread_t *workers;
int *worker_id;
pthread_cond_t startRpcs;
pthread_mutex_t myLock;
bool start;
memoryManagement *memManager;
void getConfig() {
string line;
ifstream config(config_filename);
while (getline(config, line)) {
char temp[line.length()];
strcpy(temp, line.c_str());
char *token1 = strtok(temp, "=");
char *token2 = strtok(NULL, "-");
params[token1] = token2;
}
config.close();
}
class CallData {
public:
CallData(KeyValueServices::AsyncService *service, ServerCompletionQueue *cq, RequestType reqType) : service(service), cq(cq), getResponder(&context), putResponder(&context), delResponder(&context), status(CREATE), reqType(reqType) {
Proceed();
}
void Proceed() {
if (status == CREATE) {
status = PROCESS;
if (reqType == GET)
service->RequestGET(&context, &key, &getResponder, cq, cq, this);
else if (reqType == PUT)
service->RequestPUT(&context, &keyvalue, &putResponder, cq, cq, this);
else
service->RequestDEL(&context, &key, &delResponder, cq, cq, this);
} else if (status == PROCESS) {
new CallData(service, cq, reqType);
if (reqType == GET) {
cout << "SERVER SERVES A GET REQUEST WITH PARAMETER KEY : " << key.key();
int status = 200;
pthread_mutex_lock(&_masterLock);
string v = memManager->get(&status, key.key());
pthread_mutex_unlock(&_masterLock);
value.set_value(v);
if (status == 200)
value.set_status(200);
else {
value.set_status(400);
value.set_error(v);
}
cout << " RETURN VALUE : " << value.value() << endl;
getResponder.Finish(value, Status::OK, this);
} else if (reqType == PUT) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl;
pthread_mutex_lock(&_masterLock);
memManager->put(keyvalue.key(), keyvalue.value());
pthread_mutex_unlock(&_masterLock);
stat.set_status(200);
putResponder.Finish(stat, Status::OK, this);
} else {
cout << "SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : " << key.key() << endl;
int status = 200;
pthread_mutex_lock(&_masterLock);
memManager->del(&status, key.key());
pthread_mutex_unlock(&_masterLock);
if (status == 200)
stat.set_status(200);
else {
stat.set_status(400);
stat.set_error("KEY NOT EXIST");
}
delResponder.Finish(stat, Status::OK, this);
}
/* --------------------------------CONTENT OF CACHE ONLY KEY-------------------------------- */
memManager->traverse();
status = FINISH;
} else {
GPR_ASSERT(status == FINISH);
delete this;
}
}
private:
KeyValueServices::AsyncService *service;
ServerCompletionQueue *cq;
ServerContext context;
Key key;
Value value;
KeyValue keyvalue;
ReqStatus stat;
ServerAsyncResponseWriter<Value> getResponder;
ServerAsyncResponseWriter<ReqStatus> putResponder;
ServerAsyncResponseWriter<ReqStatus> delResponder;
enum CallStatus {
CREATE,
PROCESS,
FINISH
};
CallStatus status;
RequestType reqType;
};
void setupServer(string server_address) {
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
}
void *handleRpcs(void *thread_id) {
unique_ptr<ServerCompletionQueue> comp_queue = builder.AddCompletionQueue();
pthread_mutex_lock(&myLock);
while (!start)
pthread_cond_wait(&startRpcs, &myLock);
pthread_mutex_unlock(&myLock);
new CallData(&service, comp_queue.get(), GET);
new CallData(&service, comp_queue.get(), PUT);
new CallData(&service, comp_queue.get(), DEL);
void *tag;
bool ok;
while (true) {
GPR_ASSERT(comp_queue->Next(&tag, &ok));
GPR_ASSERT(ok);
// cout << "Thread id:\t" << (*(int *)thread_id) << "\n";
static_cast<CallData *>(tag)->Proceed();
}
return 0;
}
void assignThreads(int num_threads) {
workers = (pthread_t *)malloc(sizeof(pthread_t) * num_threads);
worker_id = (int *)malloc(sizeof(int) * num_threads);
for (int i = 0; i < num_threads; i++) {
worker_id[i] = i;
pthread_create(&workers[i], NULL, handleRpcs, (void *)&worker_id[i]);
}
}
void signalHandler(int signum) {
memManager->pushAll();
cout << "SERVER SHUTDOWN" << endl;
server->Shutdown();
exit(0);
}
int main(int agrc, char **argv) {
pthread_mutex_init(&_masterLock, NULL);
start = false;
getConfig();
int num_threads = stoi(params.find("NUM_SERVER_THREADS")->second);
int port = stoi(params.find("LISTENING_PORT")->second);
string server_address("0.0.0.0:" + to_string(port));
string cache_type = params.find("CACHE_REPLACEMENT_TYPE")->second;
pthread_cond_init(&startRpcs, NULL);
pthread_mutex_init(&myLock, NULL);
if (cache_type.compare("LFU") == 0)
memManager = new storageLFU(stoi(params.find("CACHE_SIZE")->second));
else
memManager = new storageLRU(stoi(params.find("CACHE_SIZE")->second));
setupServer(server_address);
assignThreads(num_threads);
sleep(1);
signal(SIGINT, signalHandler);
server = builder.BuildAndStart();
start = true;
cout << "SERVER COMES UP SUCCESSFULLY" << endl;
pthread_cond_broadcast(&startRpcs);
for (int i = 0; i < num_threads; i++)
pthread_join(workers[i], NULL);
free(workers);
free(worker_id);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment