Commit 0834a3d7 authored by ajinkyatanksale's avatar ajinkyatanksale

Initial Commit of PA4 code

parent 7d269e06
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cmake build file for C++ helloworld example.
# Assumes protobuf and gRPC have been installed using cmake.
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build
# that automatically builds all the dependencies before building helloworld.
cmake_minimum_required(VERSION 3.5.1)
project(keyvaluestore C CXX)
include(../cmake/common.cmake)
# Proto file
get_filename_component(hw_proto "keyvaluestore.proto" ABSOLUTE)
get_filename_component(hw_proto_path "${hw_proto}" PATH)
# Generated sources
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/keyvaluestore.pb.cc")
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/keyvaluestore.pb.h")
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/keyvaluestore.grpc.pb.cc")
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/keyvaluestore.grpc.pb.h")
add_custom_command(
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${hw_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${hw_proto}"
DEPENDS "${hw_proto}")
# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# hw_grpc_proto
add_library(hw_grpc_proto
${hw_grpc_srcs}
${hw_grpc_hdrs}
${hw_proto_srcs}
${hw_proto_hdrs})
target_link_libraries(hw_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
# route_guide_helper
add_library(MyCache_
"MyCache.h"
"MyCache.cpp")
add_library(configReader_
"configReader.h"
"configReader.cc")
add_library(storage_
"storage.h"
"storage.cc")
# route_guide_helper
# add_library(LFUCache
# "LFUCache.h"
# "LFUCache.cpp")
# Targets greeter_[async_](client|server)
foreach(_target
server client)
add_executable(${_target} "${_target}.cc")
target_link_libraries(${_target}
hw_grpc_proto
MyCache_
storage_
configReader_
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
endforeach()
#include <iostream>
#include <vector>
#include <fstream>
#include <bits/stdc++.h>
#include <pthread.h>
#include "MyCache.h"
using namespace std;
int CACHE_SIZE=3;
MyCache::MyCache(string key, string value, int freq, int isUpdated) {
this->key = key;
this->value = value;
this->freq = freq;
this->isUpdated = isUpdated;
}
MyCache::MyCache(string key, string value, int isUpdated) {
this->key = key;
this->value = value;
this->freq = 0;
this->isUpdated = isUpdated;
}
string MyCache::get_key() {
return key;
}
string MyCache::get_value() {
return value;
}
int MyCache::get_freq() {
return freq;
}
int MyCache::get_isUpdated() {
return isUpdated;
}
void MyCache::set_value(string value) {
this->value = value;
}
void MyCache::set_freq(int freq) {
this->freq = freq;
}
void MyCache::set_isUpdated(int isUpdated) {
this->isUpdated = isUpdated;
}
std::vector<MyCache*> key_value_cache{};
std::vector<pthread_mutex_t> locks(100);
std::vector<std::string> key_queue{};
pthread_mutex_t KVlock;
pthread_mutex_t KQlock;
pthread_mutex_t KVClock;
// pthread_mutex_t temp;
int get_min() {
int min = INT32_MAX, index = 0;
for (int j=0; j<key_value_cache.size(); j++) {
if (key_value_cache[j]->get_freq() < min) {
min = key_value_cache[j]->get_freq();
index = j;
}
}
return index;
}
std::vector<std::string> parser(string s1) {
std::vector<std::string> str{};
for (int i=0; i<s1.size(); i++) {
if (s1[i] == ',') {
str.push_back(s1.substr(0, i));
str.push_back(s1.substr(i+1, s1.size()-i-1));
break;
}
}
return str;
}
string getCache(string key,int cacheType,Persistant *obj) {
if(cacheType==Cache_LFU){
auto it = key_value_cache.begin(); int index;
do {
pthread_mutex_lock(&KVlock);
it = std::find_if(key_value_cache.begin(), key_value_cache.end(), [key](MyCache* obj)->bool {return obj->get_key() == key;});
index = it - key_value_cache.begin();
pthread_mutex_unlock(&KVlock);
} while(pthread_mutex_lock(&locks[index]) == -1);
if (it != key_value_cache.end()) {
key_value_cache[index]->set_freq(key_value_cache[index]->get_freq()+1);
// for (int j=0; j<key_value_cache.size(); j++) {
// cout << key_value_cache[j]->get_key() << " " << key_value_cache[j]->get_freq() << " ";
// }
// cout << "\n";
std::string val = key_value_cache[index]->get_value();
pthread_mutex_unlock(&locks[index]);
return val;
}
else {
pthread_mutex_unlock(&locks[index]);
std::string value = obj->get_value(key); // getValFromStorage(key);
if (value.compare("ERROR") == 0) {
return ErrorMessage;
}
else {
auto it1 = key_value_cache.begin();
do {
pthread_mutex_lock(&KVlock);
it1 = std::find_if(key_value_cache.begin(), key_value_cache.end(), [key](MyCache* obj)->bool {return obj->get_key() == key;});
pthread_mutex_unlock(&KVlock);
} while (pthread_mutex_lock(&locks[it1 - key_value_cache.begin()]) == -1);
if (it1 != key_value_cache.end()) {
if (key_value_cache[it1-key_value_cache.begin()]->get_isUpdated() == 2) {
pthread_mutex_unlock(&locks[it1 - key_value_cache.begin()]);
return ErrorMessage;
}
else if (key_value_cache[it1-key_value_cache.begin()]->get_isUpdated() == 1) {
std::string val = key_value_cache[it1-key_value_cache.begin()]->get_value();
pthread_mutex_unlock(&locks[it1 - key_value_cache.begin()]);
return val;
}
else {
pthread_mutex_unlock(&locks[it1 - key_value_cache.begin()]);
return value;
}
}
else {
pthread_mutex_unlock(&locks[it1 - key_value_cache.begin()]);
MyCache *newKV = new MyCache(key, value, 1, 0);
pthread_mutex_lock(&KVlock);
if (key_value_cache.size() == CACHE_SIZE) {
pthread_mutex_unlock(&KVlock);
int index;
do {
pthread_mutex_lock(&KVlock);
index = get_min();
pthread_mutex_unlock(&KVlock);
} while (pthread_mutex_lock(&locks[index]));
MyCache* temp = key_value_cache[index];
key_value_cache.erase(key_value_cache.begin()+index);
key_value_cache.insert(key_value_cache.begin()+index, newKV);
pthread_mutex_unlock(&locks[index]);
if (temp->get_isUpdated() == 1) {
obj->put_value(temp->get_key(),temp->get_value());
//putIntoStorage(temp->get_key(), temp->get_value());
}
else if (temp->get_isUpdated() == 2) {
obj->delete_value(temp->get_key());
//deleteFromStorage(temp->get_key());
}
}
else {
key_value_cache.push_back(newKV);
pthread_mutex_unlock(&KVlock);
}
// for (int j=0; j<key_value_cache.size(); j++) {
// cout << key_value_cache[j]->get_key() << " " << key_value_cache[j]->get_freq() << " ";
// }
// cout << "\n";
return value;
}
}
}
}
else if(cacheType==Cache_LRU){
auto it1 = key_value_cache.begin();
int index2;
do {
pthread_mutex_lock(&KQlock);
it1 = std::find_if(key_value_cache.begin(), key_value_cache.end(), [key](MyCache* obj)->bool {return obj->get_key() == key;});
index2 = it1 - key_value_cache.begin();
pthread_mutex_unlock(&KQlock);
} while (pthread_mutex_lock(&locks[index2]) == -1);
// pthread_mutex_lock(&locks[index2]);
if (it1 != key_value_cache.end()) {
if (key_value_cache[index2]->get_isUpdated() == 2) {
pthread_mutex_unlock(&locks[index2]);
return ErrorMessage;
}
else {
std::string value = key_value_cache[index2]->get_value();
pthread_mutex_lock(&KQlock);
auto it = std::find_if(key_queue.begin(), key_queue.end(), [key](std::string obj)->bool {return obj == key;});
int index1 = it - key_queue.begin();
std::string temp_key = key_queue[index1];
key_queue.erase(it);
key_queue.push_back(temp_key);
pthread_mutex_unlock(&KQlock);
pthread_mutex_unlock(&locks[index2]);
return value;
}
}
else {
pthread_mutex_unlock(&locks[index2]);
std::string value = obj->get_value(key); // getValFromStorage(key);
if (value.compare(ErrorMessage) == 0) {
return ErrorMessage;
}
else {
auto it3 = key_value_cache.begin();
do {
pthread_mutex_lock(&KQlock);
it3 = std::find_if(key_value_cache.begin(), key_value_cache.end(), [key](MyCache* obj)->bool {return obj->get_key() == key;});
pthread_mutex_unlock(&KQlock);
} while (pthread_mutex_lock(&locks[it3-key_value_cache.begin()]) == -1);
// pthread_mutex_lock(&locks[it3-key_value_cache.begin()]);
if (it3 != key_value_cache.end()) {
if (key_value_cache[it3-key_value_cache.begin()]->get_isUpdated() == 2) {
pthread_mutex_unlock(&locks[it3-key_value_cache.begin()]);
return ErrorMessage;
}
else if (key_value_cache[it3-key_value_cache.begin()]->get_isUpdated() == 1) {
std::string val = key_value_cache[it3-key_value_cache.begin()]->get_value();
pthread_mutex_unlock(&locks[it3-key_value_cache.begin()]);
return val;
}
pthread_mutex_unlock(&locks[it3-key_value_cache.begin()]);
return value;
}
else {
pthread_mutex_unlock(&locks[it3-key_value_cache.begin()]);
MyCache *newKV = new MyCache(key, value, 0);
std::string temp_key;
pthread_mutex_lock(&KQlock);
if (key_queue.size()==CACHE_SIZE) {
temp_key = key_queue[0];
key_queue.erase(key_queue.begin());
key_queue.push_back(key);
pthread_mutex_unlock(&KQlock);
do {
pthread_mutex_lock(&KQlock);
it3 = std::find_if(key_value_cache.begin(), key_value_cache.end(), [temp_key](MyCache* obj)->bool {return obj->get_key() == temp_key;});
pthread_mutex_unlock(&KQlock);
} while (pthread_mutex_lock(&locks[it3-key_value_cache.begin()]) == -1);
// pthread_mutex_lock(&locks[it3-key_value_cache.begin()]);
MyCache *temp = key_value_cache[it3-key_value_cache.begin()];
key_value_cache.erase(it3);
key_value_cache.insert(it3, newKV);
pthread_mutex_unlock(&locks[it3-key_value_cache.begin()]);
if (temp->get_isUpdated() == 1) {
obj->put_value(temp->get_key(), temp->get_value());
// putIntoStorage(temp->get_key(), temp->get_value());
}
else if (temp->get_isUpdated() == 2) {
obj->delete_value(temp->get_key());
// deleteFromStorage(temp->get_key());
}
}
else {
key_queue.push_back(key);
pthread_mutex_unlock(&KQlock);
pthread_mutex_lock(&KQlock);
key_value_cache.push_back(newKV);
pthread_mutex_unlock(&KQlock);
}
return value;
}
}
}
}
return ErrorMessage;
}
void putCache(string key, string value,int cacheType,Persistant *obj) {
if(cacheType == Cache_LFU){
auto it = key_value_cache.begin();
int index;
do {
pthread_mutex_lock(&KVlock);
it = std::find_if(key_value_cache.begin(), key_value_cache.end(), [key](MyCache* obj)->bool {return obj->get_key() == key;});
index = it - key_value_cache.begin();
pthread_mutex_unlock(&KVlock);
} while (pthread_mutex_lock(&locks[index]) == -1);
if (it != key_value_cache.end()) {
key_value_cache[index]->set_freq(key_value_cache[index]->get_freq()+1);
key_value_cache[index]->set_value(value);
key_value_cache[index]->set_isUpdated(1);
// for (int j=0; j<key_value_cache.size(); j++) {
// cout << key_value_cache[j]->get_key() << " " << key_value_cache[j]->get_freq() << " ";
// }
// cout << "\n";
pthread_mutex_unlock(&locks[index]);
return;
}
else {
pthread_mutex_unlock(&locks[index]);
pthread_mutex_lock(&KVlock);
if (key_value_cache.size() == CACHE_SIZE) {
// for (int j=0; j<key_value_cache.size(); j++) {
// cout << key_value_cache[j]->get_key() << " " << key_value_cache[j]->get_freq() << " ";
// }
// cout << "\n";
pthread_mutex_unlock(&KVlock);
int index;
do {
pthread_mutex_lock(&KVlock);
index = get_min();
pthread_mutex_unlock(&KVlock);
} while (pthread_mutex_lock(&locks[index]) == -1);
MyCache* temp = key_value_cache[index];
key_value_cache.erase(key_value_cache.begin()+index);
MyCache *newKV = new MyCache(key, value, 1, 1);
key_value_cache.insert(key_value_cache.begin()+index, newKV);
pthread_mutex_unlock(&locks[index]);
if (temp->get_isUpdated() == 1) {
obj->put_value(temp->get_key(), temp->get_value());
//putIntoStorage(temp->get_key(), temp->get_value());
}
else if (temp->get_isUpdated() == 2) {
obj->delete_value(temp->get_key());
//deleteFromStorage(temp->get_key());
}
}
else {
// pthread_mutex_unlock(&KVlock);
// pthread_mutex_lock(&KVlock);
MyCache *newKV = new MyCache(key, value, 1, 1);
key_value_cache.push_back(newKV);
pthread_mutex_unlock(&KVlock);
}
pthread_mutex_lock(&KVlock);
// for (int j=0; j<key_value_cache.size(); j++) {
// cout << key_value_cache[j]->get_key() << " " << key_value_cache[j]->get_freq() << " ";
// }
// cout << "\n";
pthread_mutex_unlock(&KVlock);
return;
}
}
else if(cacheType == Cache_LRU){
auto it2 = key_value_cache.begin();
int index2;
do {
pthread_mutex_lock(&KQlock);
it2 = std::find_if(key_value_cache.begin(), key_value_cache.end(), [key](MyCache* obj)->bool {return obj->get_key() == key;});
index2 = it2 - key_value_cache.begin();
pthread_mutex_unlock(&KQlock);
} while (pthread_mutex_lock(&locks[index2]) == -1);
// pthread_mutex_lock(&locks[index2]);
if (it2 != key_value_cache.end()) {
key_value_cache[index2]->set_value(value);
key_value_cache[index2]->set_isUpdated(1);
pthread_mutex_lock(&KQlock);
auto it = std::find_if(key_queue.begin(), key_queue.end(), [key](std::string obj)->bool {return obj == key;});
int index1 = it - key_queue.begin();
std::string temp_key = key_queue[index1];
key_queue.erase(it);
key_queue.push_back(temp_key);
// for (int j=0; j<key_value_cache.size(); j++) {
// cout << key_value_cache[j]->get_key() << " " << key_value_cache[j]->get_freq() << " ";
// }
// cout << "\n";
pthread_mutex_unlock(&KQlock);
pthread_mutex_unlock(&locks[index2]);
return;
}
else {
pthread_mutex_unlock(&locks[index2]);
std::string temp_key;
pthread_mutex_lock(&KQlock);
if (key_queue.size()==CACHE_SIZE) {
temp_key = key_queue[0];
key_queue.erase(key_queue.begin());
key_queue.push_back(key);
// for (int j=0; j<key_value_cache.size(); j++) {
// cout << key_value_cache[j]->get_key() << " " << key_value_cache[j]->get_freq() << " ";
// }
// cout << "\n";
pthread_mutex_unlock(&KQlock);
auto it2 = key_value_cache.begin();
do {
pthread_mutex_lock(&KQlock);
it2 = std::find_if(key_value_cache.begin(), key_value_cache.end(), [temp_key](MyCache* obj)->bool {return obj->get_key() == temp_key;});
pthread_mutex_unlock(&KQlock);
} while (pthread_mutex_lock(&locks[it2-key_value_cache.begin()]) == -1);
// pthread_mutex_lock(&locks[it2-key_value_cache.begin()]);
MyCache *temp = key_value_cache[it2-key_value_cache.begin()];
key_value_cache.erase(it2);
MyCache *newKV = new MyCache(key, value, 1);
key_value_cache.insert(it2, newKV);
pthread_mutex_unlock(&locks[it2-key_value_cache.begin()]);
if (temp->get_isUpdated() == 1) {
obj->put_value(temp->get_key(), temp->get_value());
//putIntoStorage(temp->get_key(), temp->get_value());
}
else if (temp->get_isUpdated() == 2) {
obj->delete_value(temp->get_key());
//deleteFromStorage(temp->get_key());
}
}
else {
key_queue.push_back(key);
MyCache *newKV = new MyCache(key, value, 1);
key_value_cache.push_back(newKV);
// for (int j=0; j<key_value_cache.size(); j++) {
// cout << key_value_cache[j]->get_key() << " " << key_value_cache[j]->get_freq() << " ";
// }
// cout << "\n";
pthread_mutex_unlock(&KQlock);
}
return;
}
}
}
string delCache(string key,int cacheType,Persistant *obj) {
if(cacheType == Cache_LFU){
auto it = key_value_cache.begin();
int index;
do {
pthread_mutex_lock(&KVlock);
it = std::find_if(key_value_cache.begin(), key_value_cache.end(), [key](MyCache* obj)->bool {return obj->get_key() == key;});
index = it - key_value_cache.begin();
pthread_mutex_unlock(&KVlock);
} while (pthread_mutex_lock(&locks[index]) == -1);
if (it != key_value_cache.end()) {
if (key_value_cache[it-key_value_cache.begin()]->get_isUpdated() == 2) {
pthread_mutex_unlock(&locks[index]);
return ErrorMessage;
}
else {
key_value_cache[it-key_value_cache.begin()]->set_isUpdated(2);
pthread_mutex_unlock(&locks[index]);
return SuccessMessage;
}
}
else {
pthread_mutex_unlock(&locks[index]);
// deleteKeyFromStorage();
return obj->delete_value(key);
}
}
else if(cacheType == Cache_LRU){
auto it1 = key_value_cache.begin();
do {
pthread_mutex_lock(&KQlock);
it1 = std::find_if(key_value_cache.begin(), key_value_cache.end(), [key](MyCache* obj)->bool {return obj->get_key() == key;});
pthread_mutex_unlock(&KQlock);
} while (pthread_mutex_lock(&locks[it1-key_value_cache.begin()]) == -1);
// pthread_mutex_lock(&locks[it1-key_value_cache.begin()]);
if (it1 != key_value_cache.end()) {
if (key_value_cache[it1-key_value_cache.begin()]->get_isUpdated() == 2) {
pthread_mutex_unlock(&locks[it1-key_value_cache.begin()]);
return ErrorMessage;
}
else {
key_value_cache[it1-key_value_cache.begin()]->set_isUpdated(2);
pthread_mutex_unlock(&locks[it1-key_value_cache.begin()]);
return SuccessMessage;
}
}
else {
pthread_mutex_unlock(&locks[it1-key_value_cache.begin()]);
//deleteFromStorage(key);
return obj->delete_value(key);
}
}
return ErrorMessage;
}
void cacheFill(){
Persistant obj;
for(int i=0;i<key_value_cache.size();i++)
{
if(key_value_cache[i]->get_isUpdated()==1)
obj.put_value(key_value_cache[i]->get_key(),key_value_cache[i]->get_value());
else if (key_value_cache[i]->get_isUpdated()==2)
obj.delete_value(key_value_cache[i]->get_key());
}
}
void setCacheSize(int size){
CACHE_SIZE=size;
}
void init_cache(){
for (int i=0; i<=CACHE_SIZE; i++) {
pthread_mutex_init(&locks[i], NULL);
}
pthread_mutex_init(&KVlock, NULL);
pthread_mutex_init(&KQlock, NULL);
pthread_mutex_init(&KVClock, NULL);
}
// void* fun(void *arg) {
// Persistant obj;
// /*
// PUT k1 v1
// PUT k2 v2
// PUT k3 v3
// PUT k4 v4
// GET k3
// GET k5
// DEL k3
// DEL k3
// PUT k34 v1
// */
// putCache("k1", "v1",1,&obj);
// putCache("k2", "v2",1,&obj);
// putCache("k3", "v3",1,&obj);
// putCache("k4", "v4",1,&obj);
// std::cout << getCache("k3",1,&obj) << "\n";
// std::cout << getCache("k5",1,&obj) << "\n";
// std::cout << delCache("k3",1,&obj) << "\n";
// std::cout << delCache("k3",1,&obj) << "\n";
// putCache("k34", "v1",1,&obj);
// putCache("k15", "v1",1,&obj);
// putCache("k25", "v2",1,&obj);
// putCache("k35", "v3",1,&obj);
// putCache("k45", "v4",1,&obj);
// putCache("k15", "v1",1,&obj);
// putCache("k255", "v2",1,&obj);
// putCache("k53", "v3",1,&obj);
// putCache("k45", "v4",1,&obj);
// return 0;
// }
// #define tc 1
// int main() {
// init_storage();
// // string myText;
// // ifstream MyReadFile("keyValue.txt");
// // int count = 0;
// // while (getline (MyReadFile, myText) && count<8) {
// // std::vector<string> tempkey = parser(myText);
// // MyCache *newKV = new MyCache(tempkey[0], tempkey[1], 1, 0);
// // key_value_cache.push_back(newKV);
// // count++;
// // }
// setCacheSize(10);
// init_cache();
// pthread_t ths[tc];
// int tid[10];
// for (int i=0; i<tc; i++) {
// tid[i] = i;
// }
// for (int i=0; i<tc; i++) {
// pthread_create(&(ths[i]), NULL, &fun, (void *)&tid[i]);
// }
// for (int i=0; i<tc; i++) {
// pthread_join(ths[i], NULL);
// }
// cacheFill();
// // std::cout << get("key1") << "\n";
// // std::cout << get("key7") << "\n";
// // std::cout << get("key11") << "\n";
// // std::cout << get("key20") << "\n";
// // std::cout << get("key21") << "\n";
// // std::cout << get("key15") << "\n";
// // std::cout << get("key8") << "\n";
// // put("key21", "value21");
// // put("key15", "value22");
// // put("key17", "value23");
// // put("key1", "value24");
// // put("key16", "value16");
// // std::cout << del("key21") << "\n";
// // std::cout << del("key11") << "\n";
// // std::cout << del("key1") << "\n";
// // std::cout << del("key15") << "\n";
// return 0;
// }
#include <iostream>
#include <string>
#include "configReader.h"
#include "storage.h"
#define Cache_LFU 1
#define Cache_LRU 2
class MyCache {
std::string key;
std::string value;
int freq;
int isUpdated;
public:
MyCache(std::string, std::string, int, int);
MyCache(std::string, std::string, int);
std::string get_key();
std::string get_value();
int get_freq();
int get_isUpdated();
void set_value(std::string);
void set_freq(int);
void set_isUpdated(int);
};
void setCacheSize(int size);
std::string getCache(std::string key,int cacheType,Persistant * obj);
void putCache(std::string key, std::string value,int cacheType,Persistant *obj);
std::string delCache(std::string key,int cacheType,Persistant*obj);
void init_cache();
void cacheFill();
\ No newline at end of file
PUT k1 v1
PUT k2 v2
PUT k3 v3
PUT k4 v4
PUT k34 v1
PUT k12 v1
PUT k23 v2
PUT k31 v3
PUT k43 v4
PUT k34 v1
PUT k13 v1
PUT k24 v2
PUT k35 v3
PUT k46 v4
PUT k34 v1
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include <fstream>
#include <bits/stdc++.h>
#include <grpcpp/grpcpp.h>
#include <vector>
#include "configReader.h"
#ifdef BAZEL_BUILD
#include "examples/protos/keyvaluestore.grpc.pb.h"
#else
#include "keyvaluestore.grpc.pb.h"
#endif
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientAsyncResponseReader;
using grpc::CompletionQueue;
using grpc::Status;
using keyvaluestore::storeManager;
using keyvaluestore::requestKey;
using keyvaluestore::requestKeyValue;
using keyvaluestore::responseStatus;
using keyvaluestore::responseValue;
// using namespace std::chrono;
std::vector<std::string> parser(std::string request) {
std::vector<std::string> tokens{};
std::stringstream stream1(request);
std::string intermediate;
while(getline(stream1, intermediate, ' ')) {
tokens.push_back(intermediate);
}
return tokens;
}
class KeyValueStoreClient {
public:
KeyValueStoreClient(std::shared_ptr<Channel> channel)
: stub_(storeManager::NewStub(channel)) {
}
void BatchMode(const std::string filename) {
std::string myText;
std::ifstream MyReadFile(filename);
std::vector<std::string> result_set{};
std::vector<double> time_required{};
while (getline(MyReadFile, myText)) {
std::string result;
keyvaluestore::statusValue cmdStatus;
std::vector<std::string> tokens = parser(myText);
if (tokens[0].compare("GET") == 0) {
if (tokens[1].size()<1 || tokens[1].size()>256) {
result_set.push_back("Wrong key");
continue;
}
else {
auto start = std::chrono::high_resolution_clock::now();
cmdStatus = GET(tokens[1],&result);
auto stop = std::chrono::high_resolution_clock::now();
auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop-start);
time_required.push_back(time.count() * 1e-9);
}
}
else if (tokens[0].compare("PUT") == 0) {
if ((tokens[1].size()<1 || tokens[1].size()>256) && (tokens[2].size()<1 || tokens[2].size()>256)) {
result_set.push_back("Wrong key or value");
continue;
}
else {
auto start = std::chrono::high_resolution_clock::now();
cmdStatus = PUT(tokens[1],tokens[2],&result);
auto stop = std::chrono::high_resolution_clock::now();
auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop-start);
time_required.push_back(time.count() * 1e-9);
}
}
else if (tokens[0].compare("DEL") == 0) {
if (tokens[1].size()<1 || tokens[1].size()>256) {
result_set.push_back("Wrong key");
continue;
}
else {
auto start = std::chrono::high_resolution_clock::now();
cmdStatus = DEL(tokens[1],&result);
auto stop = std::chrono::high_resolution_clock::now();
auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop-start);
time_required.push_back(time.count() * 1e-9);
}
}
else {
result_set.push_back("Wrong command");
continue;
}
//"Command Status :"<<cmdStatus<<" | Result :"<<result
result_set.push_back("Return Status :"+std::to_string(cmdStatus)+" | Result :"+result);
}
for (int i=0; i<result_set.size(); i++) {
std::cout << result_set[i] << "\n";
}
double min=INT32_MAX, max=0, sum=0;
for (int i=0; i<time_required.size(); i++) {
if (time_required[i] < min) {
min = time_required[i];
}
if (time_required[i] > max) {
max = time_required[i];
}
sum = sum + time_required[i];
}
double avg = sum/time_required.size();
std::cout << "Min Time Required: " << min << "\n";
std::cout << "Max Time Required: " << max << "\n";
std::cout << "Avg Time Required: " << avg << "\n";
MyReadFile.close();
}
keyvaluestore::statusValue GET(std::string key, std::string *value) {
ClientContext context;
CompletionQueue cq;
requestKey request;
Status status;
responseValue response;
request.set_key(key);
std::unique_ptr<ClientAsyncResponseReader<responseValue> > rpc(stub_->PrepareAsyncGET(&context, request, &cq));
rpc->StartCall();
rpc->Finish(&response, &status, (void*)1);
void* got_tag;
bool ok = false;
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(got_tag == (void*)1);
GPR_ASSERT(ok);
*value = response.value();
if (!status.ok()) {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
std::cout << "RPC failed";
}
return response.status();
}
keyvaluestore::statusValue DEL(std::string key , std::string * strStatus) {
ClientContext context;
CompletionQueue cq;
requestKey request;
Status status;
responseValue response;
std::string value;
request.set_key(key);
std::unique_ptr<ClientAsyncResponseReader<responseValue> > rpc(stub_->PrepareAsyncDEL(&context, request, &cq));
rpc->StartCall();
rpc->Finish(&response, &status, (void*)1);
void* got_tag;
bool ok = false;
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(got_tag == (void*)1);
GPR_ASSERT(ok);
// std::cout<<response.status()<<std::endl;
*strStatus = response.value();
if (!status.ok()) {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
std::cout << "RPC failed";
}
return response.status();
}
keyvaluestore::statusValue PUT(std::string key, std::string value, std::string *strStatus) {
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
CompletionQueue cq;
requestKeyValue request;
Status status;
responseStatus response;
request.set_key(key);
request.set_value(value);
std::unique_ptr<ClientAsyncResponseReader<responseStatus> > rpc(stub_->PrepareAsyncPUT(&context, request, &cq));
rpc->StartCall();
rpc->Finish(&response, &status, (void*)1);
void* got_tag;
bool ok = false;
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(got_tag == (void*)1);
GPR_ASSERT(ok);
*strStatus = "Inserted";
if (!status.ok()) {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
std::cout << "RPC failed";
}
return response.status();
}
private:
std::unique_ptr<storeManager::Stub> stub_;
};
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint specified by
// the argument "--target=" which is the only expected argument.
// We indicate that the channel isn't authenticated (use of
// InsecureChannelCredentials()).
std::string target_str;
std::string arg_str("--target");
if (argc > 1) {
std::string arg_val = argv[1];
size_t start_pos = arg_val.find(arg_str);
if (start_pos != std::string::npos) {
start_pos += arg_str.size();
if (arg_val[start_pos] == '=') {
target_str = arg_val.substr(start_pos + 1);
} else {
std::cout << "The only correct argument syntax is --target="
<< std::endl;
return 0;
}
} else {
std::cout << "The only acceptable argument is --target=" << std::endl;
return 0;
}
} else {
target_str = "localhost:50051";
}
configReader config;
if(config.readConfigFile("../../config.in")==-1){
std::cout<<"Cannot Read config.in file\n";
return 0;
}
target_str = "localhost:"+std::to_string(config.getListeningPort());
KeyValueStoreClient client(grpc::CreateChannel(target_str, grpc::InsecureChannelCredentials()));
std::string mode;
while(1){
std::cout << "Select the method of requests";
std::cout << "Choose one of the following. \n1 For Batch Mode\n2 For Interactive Mode\n3 To Exit Client\n Enter mode: ";
std::cin >> mode;
if(mode.compare("3")==0)
break;
if (mode.compare("1")==0) {
std::string filepath;
std::cout << "Enter a file path: ";
std::cin >> filepath;
client.BatchMode(filepath);
}
else if (mode.compare("2")==0) {
while(1) {
std::string command, key, value,result;
keyvaluestore::statusValue cmdStatus;
cmdStatus=keyvaluestore::statusValue::FAILURE;
result="";
std::cout << "Write command in UpperCase only(\nGET, \nPUT, \nDEL,\nEXIT(To Exit Interactive Mode)): ";
std::cin >> command;
if(command.compare("EXIT") ==0 ){
std::cout<<"Exiting Interactive Mode..\n";
break;
}
std::cout << "Write key: ";
std::cin >> key;
if (key.size() < 1 || key.size() > 256) {
std::cout << "Wrong key\n";
continue;
}
if (command.compare("PUT") == 0) {
std::cout << "Write value: ";
std::cin >> value;
if (value.size() < 1 || value.size() > 256) {
std::cout << "Wrong value\n";
continue;
}
auto start = std::chrono::high_resolution_clock::now();
cmdStatus = client.PUT(key, value,&result);
auto stop = std::chrono::high_resolution_clock::now();
auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop-start);
std::cout << "The time taken by the PUT operation: " << time.count() * 1e-9 << "\n";
}
else if (command.compare("GET") == 0) {
auto start = std::chrono::high_resolution_clock::now();
cmdStatus = client.GET(key, &result);
auto stop = std::chrono::high_resolution_clock::now();
auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop-start);
std::cout << "The time taken by the GET operation: " << time.count() * 1e-9 << "\n";
}
else if (command.compare("DEL") == 0) {
auto start = std::chrono::high_resolution_clock::now();
cmdStatus = client.DEL(key,&result);
auto stop = std::chrono::high_resolution_clock::now();
auto time = std::chrono::duration_cast<std::chrono::nanoseconds>(stop-start);
std::cout << "The time taken by the DEL operation: " << time.count() * 1e-9 << "\n";
}
else {
std::cout << "Wrong command!!\n";
continue;
}
std::cout << "Return Status :"<<cmdStatus<<" | Result :"<<result<<std::endl;
}
}
else {
std::cout << "Wrong Mode";
// std::cin.clear();
}
}
return 0;
}
LISTENING_PORT=50051
CACHE_REPLACEMENT_TYPE=LRU
CACHE_SIZE=10
THREAD_POOL_SIZE=10
\ No newline at end of file
#include <iostream>
#include <memory>
#include <fstream>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include "MyCache.h"
std::vector<std::string> parser_1(std::string request) {
std::vector<std::string> tokens{};
std::stringstream stream1(request);
std::string intermediate;
while(getline(stream1, intermediate, '=')) {
tokens.push_back(intermediate);
}
return tokens;
}
configReader::configReader(){
LISTENING_PORT = 50051;
CACHE_REPLACEMENT_TYPE ="LRU";
CACHE_SIZE=10;
THREAD_POOL_SIZE=10;
}
int configReader::readConfigFile(std::string filename){
std::string myText;
std::ifstream MyReadFile;
MyReadFile.open(filename);
if(MyReadFile.is_open())
{
while (getline(MyReadFile, myText)) {
std::vector<std::string> tokens = parser_1(myText);
if(tokens[0]=="LISTENING_PORT")
LISTENING_PORT=atoi(tokens[1].c_str());
else if(tokens[0]=="CACHE_REPLACEMENT_TYPE")
CACHE_REPLACEMENT_TYPE=tokens[1];
else if (tokens[0]=="CACHE_SIZE"){
CACHE_SIZE = atoi(tokens[1].c_str());
// if(CACHE_SIZE > 100)
// CACHE_SIZE = 100;
}
else if(tokens[0]=="THREAD_POOL_SIZE"){
THREAD_POOL_SIZE= atoi(tokens[1].c_str());
// if(THREAD_POOL_SIZE > 50){
// THREAD_POOL_SIZE = 50;
// }
}
}
}
else{
return -1;
}
return 0;
}
int configReader::getListeningPort(){
return this->LISTENING_PORT;
}
int configReader::getCacheReplacementType(){
if(CACHE_REPLACEMENT_TYPE.compare("LFU")==0)
return Cache_LFU;
if(CACHE_REPLACEMENT_TYPE.compare("LRU")==0)
return Cache_LRU;
return -1;
}
int configReader::getCacheSize(){
return this->CACHE_SIZE;
}
size_t configReader::getThreadPoolSize(){
return this->THREAD_POOL_SIZE;
}
\ No newline at end of file
#include <iostream>
#include <string>
class configReader{
private:
int LISTENING_PORT;
std::string CACHE_REPLACEMENT_TYPE;
int CACHE_SIZE;
size_t THREAD_POOL_SIZE;
public:
configReader();
int readConfigFile(std::string filename);
int getListeningPort();
int getCacheReplacementType();
int getCacheSize();
size_t getThreadPoolSize();
};
\ No newline at end of file
syntax = "proto3";
package keyvaluestore;
// A simple key-value storage service
service storeManager {
// Provides a value for each key request
rpc GET ( requestKey) returns ( responseValue) {}
rpc DEL ( requestKey) returns ( responseValue) {}
rpc PUT ( requestKeyValue) returns ( responseStatus) {}
}
enum statusValue{
DEAULT = 0;
UPDATED = 100;
SUCCESS = 200;
FAILURE = 400;
}
// The request message containing the key
message requestKey {
string key = 1;
}
// The response message containing the value associated with the key
message responseValue {
statusValue status = 1;
string value = 2;
}
message requestKeyValue {
string key = 1;
string value = 2;
}
message responseStatus{
statusValue status = 1;
}
\ No newline at end of file
Team Information:
Arnav Mishra : 213059002
Ajinkya Tanksale : 213050034
Vedant Singh : 213050038
Steps to build and run the server and client
1) Put the folder in grpc/examples/cpp
2) cd KeyValueStore
3) mkdir -p cmake/build
4) pushd cmake/build
5) cmake -DCMAKE_PREFIX_PATH=$MY_INSTALL_DIR ../..
6) make
7) Open two terminals and write commands
./server in first and ./client in second
8) For exiting server press (clt+c) 2 time.
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <fstream>
#include <chrono>
#include <ctime>
#include <sys/stat.h>
#include <vector>
#include <condition_variable>
#include <mutex>
#include <csignal>
#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>
#include "MyCache.h"
// #include "configReader.h"
#ifdef BAZEL_BUILD
#include "examples/protos/keyvaluestore.grpc.pb.h"
#else
#include "keyvaluestore.grpc.pb.h"
#endif
//#define DebugFlag 1
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using keyvaluestore::storeManager;
using keyvaluestore::responseValue;
using keyvaluestore::requestKeyValue;
using keyvaluestore::requestKey;
using keyvaluestore::responseStatus;
int cacheType;
struct thread_arg{
int tid;
storeManager::AsyncService *service_;
std::unique_ptr<ServerCompletionQueue> cq_;
}arg[50];
struct kv_pair {
std::string key;
std::string value;
};
std::mutex log_mutex;
std::condition_variable log_cv;
static std::vector<struct kv_pair> kvs_map;
//thread function declaration
void *HandleRpcs_thread(void *);
void printKvsMap();
std::vector<std::string> log_strings;
void write_log(std::string data)
{
auto end = std::chrono::system_clock::now();
std::time_t end_time = std::chrono::system_clock::to_time_t(end);
data = data + "\tAT\t"+std::ctime(&end_time)+"\n";
std::unique_lock<std::mutex> lk(log_mutex);
log_strings.push_back(data);
lk.unlock();
log_cv.notify_one();
}
void * log_writer(void*)
{
mkdir("server_logs",S_IRWXU);
std::string log_file = "server_logs/server.log";
while(true){
std::string data="";
std::unique_lock<std::mutex> lk(log_mutex);
while(log_strings.size()==0){
log_cv.wait(lk);
}
for(size_t i=0;i<log_strings.size();i++)
data += log_strings[i];
log_strings.clear();
lk.unlock();
std::fstream w;
w.open(log_file,std::ios::app);
w << data;
w.close();
}
}
class baseClass{
public:
virtual void Proceed()=0;
};
class GetMethod: public baseClass{
private:
storeManager::AsyncService *service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
requestKey request_;
responseValue reply_;
ServerAsyncResponseWriter<responseValue> responder_;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
int tid;
Persistant *obj;
keyvaluestore::statusValue get_value_from_map(std::string key, std::string & value) {
// const std::lock_guard<std::mutex> lock(kvs_map_mutex);
// for (size_t i = 0; i < kvs_map.size(); ++i) {
// if (key.compare(kvs_map[i].key) == 0) {
// value = kvs_map[i].value;
// return keyvaluestore::statusValue::SUCCESS;
// }
// }
// value=ErrorMessage;
// return keyvaluestore::statusValue::FAILURE;
value = getCache(key,cacheType,obj);
if(value.compare(ErrorMessage)==0)
return keyvaluestore::statusValue::FAILURE;
return keyvaluestore::statusValue::SUCCESS;
}
public :
GetMethod(storeManager::AsyncService* service, ServerCompletionQueue* cq,int thread_id,Persistant *obj)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE), tid(thread_id),obj(obj) {
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
status_ = PROCESS;
service_->RequestGET(&ctx_, &request_, &responder_, cq_, cq_,this);
}
else if (status_ == PROCESS) {
new GetMethod(service_, cq_,this->tid,obj);
write_log("GET request :key-"+request_.key());
std::string value;
reply_.set_status(get_value_from_map(request_.key(),value));
reply_.set_value(value);
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
write_log("GET response :status-"+std::to_string(reply_.status())+(std::string)" value-"+reply_.value());
}
else {
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
};
class PutMethod: public baseClass{
private:
storeManager::AsyncService *service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
requestKeyValue request_;
responseStatus reply_;
ServerAsyncResponseWriter<responseStatus> responder_;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
int tid;
Persistant *obj;
keyvaluestore::statusValue put_key_value_into_map(std::string key, std::string value) {
// size_t i = 0;
// const std::lock_guard<std::mutex> lock(kvs_map_mutex);
// for (size_t i = 0; i < kvs_map.size(); ++i) {
// if (key.compare(kvs_map[i].key) == 0) {
// kvs_map[i].value=value;
// return keyvaluestore::statusValue::UPDATED;
// }
// }
// struct kv_pair t;
// t.key=key;
// t.value=value;
// kvs_map.push_back(t);
putCache(key,value,cacheType,obj);
return keyvaluestore::statusValue::SUCCESS;
}
public :
PutMethod(storeManager::AsyncService* service, ServerCompletionQueue* cq,int thread_id,Persistant *obj)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE), tid(thread_id),obj(obj) {
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
status_ = PROCESS;
service_->RequestPUT(&ctx_, &request_, &responder_, cq_, cq_,this);
}
else if (status_ == PROCESS) {
new PutMethod(service_, cq_,this->tid,obj);
write_log("PUT request :key-"+request_.key()+" value-"+request_.value());
reply_.set_status(put_key_value_into_map(request_.key(),request_.value()));
#ifdef DebugFlag
std::cout<<"After PUT"<<std::endl;printKvsMap();std::cout<<std::endl;
#endif
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
write_log("PUT response :status-"+std::to_string(reply_.status()));
}
else {
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
};
class DeleteMethod: public baseClass{
private:
storeManager::AsyncService *service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
requestKey request_;
responseValue reply_;
ServerAsyncResponseWriter<responseValue> responder_;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
int tid;
Persistant *obj;
keyvaluestore::statusValue del_key_value_from_map(std::string key,std::string & value) {
// const std::lock_guard<std::mutex> lock(kvs_map_mutex);
// for (size_t i = 0; i < kvs_map.size(); ++i) {
// if (key.compare(kvs_map[i].key) == 0) {
// kvs_map.erase(kvs_map.begin()+i);
// value=SuccessMessage;
// return keyvaluestore::statusValue::SUCCESS;
// }
// }
// value = ErrorMessage;
// return keyvaluestore::statusValue::FAILURE;
value = delCache(key,cacheType,obj);
if(value.compare(ErrorMessage)==0)
return keyvaluestore::statusValue::FAILURE;
return keyvaluestore::statusValue::SUCCESS;
}
public :
DeleteMethod(storeManager::AsyncService* service, ServerCompletionQueue* cq,int thread_id,Persistant *obj)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE), tid(thread_id),obj(obj) {
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
status_ = PROCESS;
service_->RequestDEL(&ctx_, &request_, &responder_, cq_, cq_,this);
}
else if (status_ == PROCESS) {
new DeleteMethod(service_, cq_,this->tid,obj);
std::string value;
write_log("DEL request :key-"+request_.key());
reply_.set_status(del_key_value_from_map(request_.key(),value));
reply_.set_value(value);
#ifdef DebugFlag
std::cout<<"After DEL"<<std::endl;printKvsMap();std::cout<<std::endl;
#endif
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
write_log("DEL response :status-"+std::to_string(reply_.status())+(std::string)" value-"+reply_.value());
}
else {
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
};
class ServerImpl final {
private:
storeManager::AsyncService service_;
std::unique_ptr<Server> server_;
int thread_count;
bool running_flag;
public:
ServerImpl(){
running_flag=false;
}
~ServerImpl() {
if(running_flag==true){
server_->Shutdown();
for(int i=0;i<thread_count;i++) {
arg[i].cq_->Shutdown();
}
}
}
int Run(int thread_count) {
configReader config;
if(config.readConfigFile("../../config.in")==-1){
return -1;
}
else{
running_flag=true;
cacheType=config.getCacheReplacementType();
init_cache();
// std::cout<<cacheType<<std::endl;
setCacheSize(config.getCacheSize());
init_cache();
this->thread_count=config.getThreadPoolSize();
std::string server_address("0.0.0.0:"+std::to_string(config.getListeningPort()));
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
// struct thread_arg *arg = (struct thread_arg *)malloc(sizeof(struct thread_arg)*thread_count);
pthread_t *threads = (pthread_t*)malloc(sizeof(pthread_t)*thread_count);
for(int i=0;i<thread_count;i++){
arg[i].tid=i+1;
arg[i].cq_ = builder.AddCompletionQueue();
arg[i].service_=&service_;
}
server_ = builder.BuildAndStart();
for(int i=0;i<thread_count;i++){
pthread_create(&threads[i],NULL,HandleRpcs_thread,&arg[i]);
}
std::cout << "Server listening on " << server_address << std::endl;
std::string log = "\n-----------------------------------------------------------------------------------------------------------\n";
log = log+"Server Started";
write_log(log);
for(int i=0;i<thread_count;i++){
pthread_join(threads[i],NULL);
}
}
return 0;
}
};
#ifdef DebugFlag
void initializeKvsMap(int size){
for(size_t i=0;i<size;i++){
struct kv_pair t;
t.key="key"+std::to_string(i+1);
t.value="value"+std::to_string(i+1);
kvs_map.push_back(t);
}
}
void printKvsMap(){
const std::lock_guard<std::mutex> lock(kvs_map_mutex);
for (size_t i = 0; i < kvs_map.size(); i++){
std::cout<<kvs_map[i].key<<" "<<kvs_map[i].value<<std::endl;
}
}
#endif
void signal_handler( int signal_num ) {
cacheFill();
exit(signal_num);
}
int main(int argc, char** argv) {
signal(SIGINT, signal_handler);
init_storage();
ServerImpl server;
int size = 10;
pthread_t log_thread;
int log_thread_id=1;
pthread_create(&log_thread,NULL,log_writer,&log_thread_id);
if(server.Run(10)==-1)
std::cout<<"Cannot read Config File Server Exiting...\n";
pthread_join(log_thread,NULL);
// cacheFill();
return 0;
}
void * HandleRpcs_thread(void * thread_arg){
Persistant obj;
struct thread_arg *arg= (struct thread_arg *)thread_arg;
new DeleteMethod((arg->service_), arg->cq_.get(),arg->tid,&obj);
new PutMethod((arg->service_), arg->cq_.get(),arg->tid,&obj);
new GetMethod((arg->service_), arg->cq_.get(),arg->tid,&obj);
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
GPR_ASSERT(arg->cq_->Next(&tag, &ok));
// 0- shutdown 2-timeout 1-gotresponse
GPR_ASSERT(ok);
static_cast<baseClass*>(tag)->Proceed();
}
}
\ No newline at end of file
#include <iostream>
#include <vector>
#include <fstream>
#include <string>
#include <bits/stdc++.h>
#include <utility>
#include <pthread.h>
#include <sys/stat.h>
#include <map>
#include "storage.h"
using namespace std;
struct file {
pthread_mutex_t mu;
pthread_cond_t cond_read;
pthread_cond_t cond_write;
map<string, std::iostream::pos_type> key_dict_read;
map<string, std::iostream::pos_type> key_dict_write;
int AR = 0, AW = 0, WR = 0, WW = 0;
} files[NUM_FILES];
void init_storage()
{
mkdir("storage",S_IRWXU);
for (int i=0; i<NUM_FILES; i++) {
pthread_mutex_init(&files[i].mu, NULL);
pthread_cond_init(&files[i].cond_read, NULL);
pthread_cond_init(&files[i].cond_write, NULL);
}
}
Persistant:: Persistant() {
for (int i = 0; i != NUM_FILES; i++) {
desc[i].open("storage/"+to_string(i) + ".txt", ios::app);
}
}
Persistant::~Persistant() {
for (int i = 0; i != NUM_FILES; i++) {
desc[i].close();
}
}
string Persistant::get_padded_value(int init_size , string value) {
char padded[(MAX_SIZE*2)-1-init_size+2];
memset(padded, '#', sizeof(padded));
strncpy(padded, value.c_str(), value.size());
padded[(MAX_SIZE*2)-init_size]= '\n';
padded[(MAX_SIZE*2)-init_size+1]='\0';
return padded;
}
std::vector<std::string> Persistant::parser(std::string line) {
std::vector<std::string> tokens{};
std::stringstream stream1(line);
std::string intermediate;
while(getline(stream1, intermediate, '#')) {
tokens.push_back(intermediate);
}
return tokens;
}
string Persistant::get_value(string key) {
string out;
int file_no = key.size() / NUM_CHAR;
string path = "storage/"+to_string(file_no) + ".txt";
pthread_mutex_t *mutex_lock = &files[file_no].mu;
pthread_cond_t *cv_read = &files[file_no].cond_read;
pthread_cond_t *cv_write = &files[file_no].cond_write;
string myText;
pthread_mutex_lock(mutex_lock);
files[file_no].WR++;
while ((files[file_no].AW + files[file_no].WW) > 0) {
pthread_cond_wait(cv_read, mutex_lock);
}
files[file_no].WR--;
files[file_no].AR++;
pthread_mutex_unlock(mutex_lock);
pthread_cond_broadcast(cv_read);
//----------------------
ifstream read_fd;
auto itr = files[file_no].key_dict_read.find(key);
desc[file_no].close();
desc[file_no].open("storage/"+to_string(file_no) + ".txt",ios::out | ios::in );
if (itr == files[file_no].key_dict_read.end()) {
read_fd.open(path.c_str());
string myText;
int flag = 0;
while (getline(read_fd, myText)) {
std::vector<string> tokens = parser(myText);
if (tokens[0].compare(key) == 0) {
flag = 1;
files[file_no].key_dict_read.insert(make_pair(key, (iostream::pos_type)((int)read_fd.tellg()-2*MAX_SIZE+key.size())));
desc[file_no].seekp((iostream::pos_type)((int)read_fd.tellg()-2*MAX_SIZE+key.size()));
files[file_no].key_dict_write.insert(make_pair(key, desc[file_no].tellp()));
out = tokens[1];
break;
}
}
if (flag == 0) {
out = ErrorMessage;
}
read_fd.close();
}
else {
iostream::pos_type value_location = (itr)->second;
read_fd.open(path.c_str());
if (!read_fd.is_open()){
pthread_mutex_lock(mutex_lock);
files[file_no].AR--;
if (files[file_no].AR == 0 && files[file_no].WW > 0)
pthread_cond_signal(cv_write);
pthread_mutex_unlock(mutex_lock);
out = ErrorMessage;
}
else{
char letter;
read_fd.seekg(value_location,ios::beg);
read_fd >> letter;
int count=0;
while(letter != '#' && count<MAX_SIZE){
out += letter;
read_fd >> letter;
count++;
}
read_fd.close();
}
}
//----------------------
pthread_mutex_lock(mutex_lock);
if ((--files[file_no].AR) == 0)
pthread_cond_signal(cv_write);
pthread_mutex_unlock(mutex_lock);
return out;
}
string Persistant::put_value(string key, string value) {
int file_no = key.size() / NUM_CHAR;
pthread_mutex_t *mutex_lock = &files[file_no].mu;
pthread_cond_t *cv_read = &files[file_no].cond_read;
pthread_cond_t *cv_write = &files[file_no].cond_write;
string line = get_padded_value(key.size()+1,value);
pthread_mutex_lock(mutex_lock);
files[file_no].WW++;
if ((files[file_no].AW + files[file_no].AR) > 0) {
// std::cout<<"LOCKED "<<(files[file_no].AW + files[file_no].AR)<<std::endl;
pthread_cond_wait(cv_write, mutex_lock);
// std::cout<<"FREED"<<std::endl;
}
files[file_no].WW--;
files[file_no].AW++;
pthread_mutex_unlock(mutex_lock);
// std::cout<<"ERROR"<<std::endl;
//----------------------
string path = "storage/"+to_string(file_no) + ".txt";
desc[file_no].close();
desc[file_no].open(path.c_str(),ios::out | ios::in );
string result;
auto itr = files[file_no].key_dict_write.find(key);
if (itr == files[file_no].key_dict_write.end()){
ifstream read_fd;
read_fd.open(path.c_str());
string myText;
int flag = 0;
while (getline(read_fd, myText)) {
std::vector<string> tokens = parser(myText);
if (tokens[0].compare(key) == 0) {
flag = 1;
files[file_no].key_dict_read.insert(make_pair(key, (iostream::pos_type)((int)read_fd.tellg()-2*MAX_SIZE+key.size())));
desc[file_no].seekp((iostream::pos_type)((int)(read_fd.tellg())-2*MAX_SIZE-1));
files[file_no].key_dict_write.insert(make_pair(key, (iostream::pos_type)((int)desc[file_no].tellp()+key.size()+1)));
desc[file_no] << key << '#' << line;
result = UpdateMessage;//"updated to file "+std::to_string(file_no)+"\n";
break;
}
}
if (flag == 0) {
desc[file_no].seekp(0,ios_base::end);
desc[file_no] << key << '#';
files[file_no].key_dict_read.insert(make_pair(key, desc[file_no].tellg()));
files[file_no].key_dict_write.insert(make_pair(key,desc[file_no].tellp()));
desc[file_no] << line;
result = PutMessage;//"inserted to file "+std::to_string(file_no)+"\n";
}
read_fd.close();
}
else {
iostream::pos_type value_location = (itr)->second;
desc[file_no].seekp((iostream::pos_type)((int)value_location - key.size() -1));
desc[file_no] << key << "#" << line; // overwrite the line1
result = UpdateMessage;//"updated to file "+std::to_string(file_no)+"\n";
}
desc[file_no].close();
//-----------------------
pthread_mutex_lock(mutex_lock);
files[file_no].AW--;
// if(files[file_no].WR > 0)
pthread_cond_broadcast(cv_read);
// else
pthread_cond_signal(cv_write);
pthread_mutex_unlock(mutex_lock);
return result;
}
string Persistant::delete_value(string key) {
int file_no = key.size() / NUM_CHAR;
pthread_mutex_t *mutex_lock = &files[file_no].mu;
pthread_cond_t *cv_read = &files[file_no].cond_read;
pthread_cond_t *cv_write = &files[file_no].cond_write;
pthread_mutex_lock(mutex_lock);
while ((files[file_no].AW + files[file_no].AR) > 0) {
files[file_no].WW++;
pthread_cond_wait(cv_write, mutex_lock);
files[file_no].WW--;
}
files[file_no].AW++;
pthread_mutex_unlock(mutex_lock);
string result;
string path = "storage/"+to_string(file_no) + ".txt";
desc[file_no].close();
desc[file_no].open(path.c_str(), ios::out | ios::in);
//-------------------------------------
auto itr = files[file_no].key_dict_write.find(key);
if (itr == files[file_no].key_dict_write.end()) {
ifstream read_fd;
read_fd.open(path.c_str());
string myText;
int flag = 0;
while (getline(read_fd, myText)) {
std::vector<string> tokens = parser(myText);
if (tokens[0].compare(key) == 0) {
flag = 1;
ofstream write_fd;
desc[file_no].seekp((iostream::pos_type)((int)(read_fd.tellg())-2*MAX_SIZE -1));
for (int i = 0; i < MAX_SIZE*2; i++) {
desc[file_no] << "#";
}
result = SuccessMessage;
break;
}
}
if (flag == 0) {
result = ErrorMessage;
}
read_fd.close();
}
else {
iostream::pos_type value_location = (itr)->second;
desc[file_no].seekp((iostream::pos_type)((int)value_location - key.size() - 1));
files[file_no].key_dict_read.erase(key);
files[file_no].key_dict_write.erase(key);
for (int i =0; i < MAX_SIZE*2; i++) {
desc[file_no] << "#";
}
result = SuccessMessage;
}
desc[file_no].close();
//-------------------------------------
pthread_mutex_lock(mutex_lock);
files[file_no].AW--;
// if (files[file_no].WR > 0)
pthread_cond_signal(cv_read);
// else
pthread_cond_signal(cv_write);
pthread_mutex_unlock(mutex_lock);
return result;
}
// void * func (void *args){
// Persistant p1 ;
// cout<< p1.get_value("key1") << "\n";
// cout<< p1.put_value("key1", "value1");
// cout<< p1.put_value("key2", "value2");
// cout<< p1.put_value("key1", "value5");
// cout<< p1.put_value("key3", "value21");
// cout<< p1.put_value("key2", "value21");
// cout << "key2" << p1.delete_value("key2") << "\n";
// cout<< p1.get_value("key3") << "\n";
// cout<< p1.put_value("key4", "value21");
// cout<< p1.get_value("key1") << "\n";
// cout<< p1.put_value("key2", "value4");
// cout<< p1.put_value("key3", "value5");
// cout << "key2" << p1.delete_value("key2") << "\n";
// cout << p1.get_value("key2") << "\n";
// cout << p1.get_value("key1") << "\n";
// cout << p1.get_value("keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey1") << "\n";
// cout << p1.get_value("key1") << "\n";
// cout << p1.get_value("key3") << "\n";
// cout << "key4" << p1.delete_value("key4") << "\n";
// cout<< p1.put_value("keykeykeykeykey1", "value2");
// cout<< p1.put_value("keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey1", "value1");
// cout<< p1.put_value("keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey2", "value2");
// cout<< p1.put_value("keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey1", "value67");
// cout<< p1.put_value("keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey3", "value3");
// cout << p1.get_value("keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey1") << "\n";
// cout << "keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey3" << p1.delete_value("keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey3") << "\n";
// cout << p1.get_value("keykeykeykeykeykeykeykeykeykeykeykeykeykeykeykeykey3") << "\n";
// return 0;
// }
// int main () {
// int thread_count=20;
// pthread_t tid[thread_count];
// for (int i=0; i<NUM_FILES; i++) {
// pthread_mutex_init(&files[i].mu, NULL);
// pthread_cond_init(&files[i].cond_read, NULL);
// pthread_cond_init(&files[i].cond_write, NULL);
// }
// for (int i=0; i<thread_count; i++) {
// pthread_create(&(tid[i]), NULL, &func, (void *)&tid[i]);
// }
// for (int i=0; i<thread_count; i++) {
// pthread_join(tid[i], NULL);
// }
// }
#include <iostream>
#include <string>
#include <vector>
#define NUM_CHAR 8
#define MAX_SIZE 256
#define NUM_FILES MAX_SIZE / NUM_CHAR
static std::string SuccessMessage = "KEY DELETED";
static std::string ErrorMessage = "KEY NOT EXIST";
static std::string PutMessage = "KEY INSERTED";
static std::string UpdateMessage = "KEY UPDATED";
class Persistant {
private:
std::fstream desc[NUM_FILES];
public:
Persistant();
~Persistant();
std::string get_padded_value(int init_size , std::string value);
std::vector<std::string> parser(std::string line);
std::string get_value(std::string key);
std::string put_value(std::string key, std::string value);
std::string delete_value(std::string key);
};
void init_storage();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment