Commit 8066bddd authored by Nihal's avatar Nihal

initial commit

parents
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# cmake build file for C++ kv example.
# Assumes protobuf and gRPC have been installed using cmake.
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build
# that automatically builds all the dependencies before building kv.
cmake_minimum_required(VERSION 3.5.1)
project(HelloWorld C CXX)
include(/home/sherrinford/grpc/examples/cpp/cmake/common.cmake)
# Proto file
get_filename_component(hw_proto "kv.proto" ABSOLUTE)
get_filename_component(hw_proto_path "${hw_proto}" PATH)
# Generated sources
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/kv.pb.cc")
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/kv.pb.h")
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/kv.grpc.pb.cc")
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/kv.grpc.pb.h")
add_custom_command(
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${hw_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${hw_proto}"
DEPENDS "${hw_proto}")
# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# hw_grpc_proto
add_library(hw_grpc_proto
${hw_grpc_srcs}
${hw_grpc_hdrs}
${hw_proto_srcs}
${hw_proto_hdrs})
target_link_libraries(hw_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
# Targets greeter_[async_](client|server)
foreach(_target
client server)
add_executable(${_target} "${_target}.cc")
target_link_libraries(${_target}
hw_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
endforeach()
File added
1)To run to project do the following :
$ mkdir -p cmake/build
$ pushd cmake/build
$ cmake -DCMAKE_PREFIX_PATH=$MY_INSTALL_DIR ../..
$ make -j
2)server.config is to placed in cmake/build in the following manner (or Copy server.config to cmake/build) :
50051 //LISTENING_PORT
LRU //CACHE_REPLACEMENT_TYPE
15 //CACHE_SIZE
10 //THREAD_POOL_SIZE
For example :
50051
LRU
15
10
3)For using Batch Mode Place batch.txt file in cmake/build , It may contain Command in the following (case sensitive Manner):
Get key
Put key value
Del key
4)For using Iterative mode , type the commands in case sensetive manner :
Get key
Put key value
Del key
https://github.com/raviprakashgiri/Distributed-Service-using-GRPC
https://github.com/Mityuha/grpc_async
https://github.com/mtrebi/thread-pool/blob/master/README.md#thread-pool
https://grpc.io/docs/languages/cpp/async/
\ No newline at end of file
#include <pthread.h>
#include<bits/stdc++.h>
#include "storage.h"
using namespace std;
class cache
{
public:
struct entry{
string key;
string value;
bool valid;
bool modified;
int last_used=-1;
int frequency=0;
pthread_mutex_t lock;
};
const int KEY_SIZE= 256;
const int VAL_SIZE =256;
map<string,entry*>cache;
long long int current_capacity;
long long unsigned int counter;
// bool delete_key(string);
// void find_new_slot();
string get_entry(string key){
if(cache.find(key)!=cache.end()){
return cache[key]->value;
}
return "KEY NOT EXIST";
}
void initialize_cache(int size_){
storage_init();
current_capacity=size_;
counter=0;
}
bool put_into_cache(string key1,string value2,int code){
if(key1.length()>256 || value2.length()>256 || key1.length()==0 ||value2.length()==0 ){
return 0;
}
struct entry * temp=(struct entry*)malloc(sizeof(struct entry));
int k;
pthread_mutex_lock(&(temp->lock));
temp->key=key1;
temp->value =value2;
temp->valid=1;
temp->modified=0;
if(code=1)
temp->last_used=counter++;
else
temp->frequency++;
if(current_capacity<=0){
find_new_slot(code);
current_capacity++;
}
cache[key1]=temp;
current_capacity--;
counter++;
file_put(toCharArray(key1),toCharArray(value2));
pthread_mutex_unlock(&(temp->lock));
return 1;
}
void find_new_slot(int code){
cout<<"Problem here"<<endl;
string lowest;
int last_mod=INT_MAX;
for(map<string,entry*>::const_iterator it=cache.begin();it!=cache.end();++it){
if(code==1){
if(it->second->last_used<last_mod){
lowest=it->first;
last_mod=it->second->last_used;
}
}
else{
if(it->second->frequency<last_mod){
lowest=it->first;
last_mod=it->second->frequency;
}
}
}
entry* temp = cache[lowest];
cout<<"DELETEING ENTRY KEY : "<<lowest<<endl;
file_del(toCharArray(lowest));
cache.erase(lowest);
// free(temp);
}
void print_cache(){
for(map<string,entry*>::const_iterator it=cache.begin();it!=cache.end();++it)
cout<<it->first<<" "<<it->second->value<<endl;
}
bool delete_key(string key){
// cout<<"DELETEING ENTRY KEY : "<<key<<endl;
auto it = cache.find(key);
if(it==cache.end())
return false;
pthread_mutex_lock(&(it->second->lock));
file_del(toCharArray(it->first));
cache.erase(it->first);
pthread_mutex_unlock(&(it->second->lock));
// free(it->second);
return true;
}
};
#include <pthread.h>
#include<bits/stdc++.h>
using namespace std;
class cache
{
public:
struct entry{
string key;
string value;
bool valid;
bool modified;
int frequency=0;
pthread_mutex_t lock;
};
const int KEY_SIZE= 256;
const int VAL_SIZE =256;
const int max_size =3;
map<string,entry*>cache;
long long int current_capacity;
long long unsigned int counter;
// bool delete_key(string);
// void find_new_slot();
string get_entry(string key){
if(cache.find(key)!=cache.end()){
cache[key]->frequency++;
return cache[key]->value;
}
return "Entry Not Found";
}
void initialize_cache(){
current_capacity=max_size;
counter=0;
}
bool put_into_cache(string key1,string value2){
if(key1.length()>256 || value2.length()>256){
return false;
}
struct entry * temp=(struct entry*)malloc(sizeof(struct entry));
int k;
pthread_mutex_lock(&(temp->lock));
temp->key=key1;
temp->value =value2;
temp->valid=1;
temp->modified=0;
temp->frequency++;
if(current_capacity<=0){
find_new_slot();
current_capacity++;
}
cache[key1]=temp;
current_capacity--;
counter++;
pthread_mutex_unlock(&(temp->lock));
return true;
}
void find_new_slot(){
string lowest;
int last_mod=INT_MAX;
for(map<string,entry*>::const_iterator it=cache.begin();it!=cache.end();++it){
if(it->second->frequency<last_mod){
lowest=it->first;
last_mod=it->second->frequency;
}
}
pthread_mutex_lock(&(cache[lowest]->lock));
entry* temp = cache[lowest];
cout<<"DELETEING ENTRY KEY : "<<lowest<<endl;
cache.erase(lowest);
pthread_mutex_unlock(&(cache[lowest]->lock));
// free(temp);
}
void print_cache(){
for(map<string,entry*>::const_iterator it=cache.begin();it!=cache.end();++it)
cout<<it->first<<" "<<it->second->value<<endl;
}
bool delete_key(string key){
cout<<"DELETEING ENTRY KEY : "<<key<<endl;
auto it = cache.find(key);
if(it==cache.end())
return false;
pthread_mutex_lock(&(it->second->lock));
cache.erase(it->first);
pthread_mutex_unlock(&(it->second->lock));
// free(it->second);
return true;
}
};
#ifndef METEO_GRPC_CLIENT_H
#define METEO_GRPC_CLIENT_H
#include <functional>
#include <stdexcept>
#include <bits/stdc++.h>
#include <memory>
#include <fstream>
#include <iostream>
#include <cmath>
#include "assert.h"
#include <grpc++/grpc++.h>
#include <thread>
#include "kv.grpc.pb.h"
using grpc::Channel;
using grpc::ChannelArguments;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::ClientAsyncReader;
using grpc::ClientAsyncWriter;
using grpc::ClientAsyncReaderWriter;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::GetRequest;
using helloworld::GetReply;
using helloworld::PutRequest;
using helloworld::PutReply;
using helloworld::DelRequest;
using helloworld::DelReply;
using helloworld::KeyValueStore;
#include <chrono>
using namespace std::chrono;
using namespace std;
time_t start;
time_t end;
class AbstractAsyncClientCall
{
public:
enum CallStatus { PROCESS, FINISH, DESTROY };
explicit AbstractAsyncClientCall():callStatus(PROCESS){}
virtual ~AbstractAsyncClientCall(){}
ClientContext context;
Status status;
CallStatus callStatus ;
virtual void Proceed(bool = true) = 0;
};
class AsyncClientCall: public AbstractAsyncClientCall
{
std::unique_ptr< ClientAsyncResponseReader<GetReply> > responder;
public:
GetReply reply;
AsyncClientCall(const GetRequest& request, CompletionQueue& cq_, std::unique_ptr<KeyValueStore::Stub>& stub_):AbstractAsyncClientCall()
{
// std::cout << "Get Key" << std::endl;
responder = stub_->AsyncGetValue(&context, request, &cq_);
responder->Finish(&reply, &status, (void*)this);
callStatus = PROCESS ;
}
virtual void Proceed(bool ok = true) override
{
if(callStatus == PROCESS)
{
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if(status.ok())
{
std::cout << "Value received: " <<reply.value()<<"Status:"<<reply.status() << std::endl;
// time(&end);
}
delete this;
}
}
};
class AsyncClientCall1M : public AbstractAsyncClientCall
{
std::unique_ptr< ClientAsyncResponseReader<PutReply> > responder;
public:
PutReply reply;
AsyncClientCall1M(const PutRequest& request, CompletionQueue& cq_, std::unique_ptr<KeyValueStore::Stub>& stub_):AbstractAsyncClientCall()
{
// std::cout << "Put Key" << std::endl;
responder = stub_->AsyncPutKValue(&context, request, &cq_);
responder->Finish(&reply, &status, (void*)this);
callStatus = PROCESS ;
}
virtual void Proceed(bool ok = true) override
{
if(callStatus == PROCESS)
{
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if(status.ok())
{
std::cout << "Message received: " <<reply.message() <<"Status:"<<reply.status()<< std::endl;
// time(&end);
}
delete this;
}
}
};
class AsyncClientCallMM : public AbstractAsyncClientCall
{
std::unique_ptr< ClientAsyncResponseReader<DelReply> > responder;
public:
DelReply reply;
AsyncClientCallMM(const DelRequest& request, CompletionQueue& cq_, std::unique_ptr<KeyValueStore::Stub>& stub_):AbstractAsyncClientCall()
{
// std::cout << "Delete Key" << std::endl;
responder = stub_->AsyncDelKValue(&context, request, &cq_);
responder->Finish(&reply, &status, (void*)this);
callStatus = PROCESS ;
}
virtual void Proceed(bool ok = true) override
{
if(callStatus == PROCESS)
{
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if(status.ok())
{
std::cout << "Message received: " <<reply.message()<<"Status:"<<reply.status() << std::endl;
// time(&end);
}
delete this;
}
}
};
class KVClient
{
public:
explicit KVClient(std::shared_ptr<Channel> channel)
:stub_(KeyValueStore::NewStub(channel))
{}
void GetValue(const std::string& key) {
// Data we are sending to the server.
GetRequest request;
request.set_key(key); // Assembles the client's payload and sends it to the server.
new AsyncClientCall(request, cq_, stub_);
}
void PutKValue(const std::string& key,const std::string& value)
{
// Data we are sending to the server.
PutRequest request;
request.set_key(key); // Assembles the client's payload and sends it to the server.
request.set_value(value);
new AsyncClientCall1M(request, cq_, stub_);
}
void DelKValue(const std::string& key)
{
// Data we are sending to the server.
DelRequest request;
request.set_key(key); // Assembles the client's payload and sends it to the server.
new AsyncClientCallMM(request, cq_, stub_);
}
void AsyncCompleteRpc()
{
void* got_tag;
bool ok = false;
while(cq_.Next(&got_tag, &ok))
{
AbstractAsyncClientCall* call = static_cast<AbstractAsyncClientCall*>(got_tag);
call->Proceed(ok);
}
std::cout << "Completion queue is shutting down." << std::endl;
}
private:
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr<KeyValueStore::Stub> stub_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq_;
};
// for string delimiter
vector<string> split (string s, string delimiter) {
size_t pos_start = 0, pos_end, delim_len = delimiter.length();
string token;
vector<string> res;
while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
token = s.substr (pos_start, pos_end - pos_start);
pos_start = pos_end + delim_len;
res.push_back (token);
}
res.push_back (s.substr (pos_start));
return res;
}
int main(int argc, char* argv[])
{
ChannelArguments args;
// Set the load balancing policy for the channel.
args.SetLoadBalancingPolicyName("round_robin");
KVClient client(grpc::CreateCustomChannel("localhost:50051", grpc::InsecureChannelCredentials(),args));
std::thread thread_ = std::thread(&KVClient::AsyncCompleteRpc, &client);
std::string userInput;
int mode;
cout<<"1.Batch Mode 2.Interactive mode"<<endl;
cin>>mode;
if(mode==1)
{
fstream newfile;
newfile.open("batch.txt",ios::in); //open a file to perform read operation using file object
if (newfile.is_open())
{ //checking whether the file is open
string tp;
// cout<<"Reached here"<<endl;
while(getline(newfile, tp))
{ //read data from file object and put it into string.
std::string delimiter = " ";
std::vector<std::string> v = split (tp, delimiter);
string func=v.front();
if(!func.compare("Get"))
{
v.erase(v.begin());
string key=v.front();
time(&start);
// unsync the I/O of C and C++.
ios_base::sync_with_stdio(false);
client.GetValue(key);
}
else if(!func.compare("Put"))
{
v.erase(v.begin());
string key=v.front();
v.erase(v.begin());
string value=v.front();
time(&start);
// unsync the I/O of C and C++.
ios_base::sync_with_stdio(false);
client.PutKValue(key,value);
}
else if(!func.compare("Del"))
{
v.erase(v.begin());
string key=v.front();
time(&start);
// unsync the I/O of C and C++.
ios_base::sync_with_stdio(false);
client.DelKValue(key);
}
}
newfile.close(); //close the file object.
}
}
else if(mode==2)
{
std::cout<<"1.Get Key 2.Put Key Value 3.Del Key"<<std::endl;
std::cout << "Press control-c to quit" << std::endl << std::endl;
while(1)
{
getline(std::cin,userInput);
std::string delimiter = " ";
std::vector<std::string> v = split (userInput, delimiter);
string func=v.front();
if(!func.compare("Get"))
{
v.erase(v.begin());
string key=v.front();
time(&start);
// unsync the I/O of C and C++.
ios_base::sync_with_stdio(false);
client.GetValue(key);
}
else if(!func.compare("Put"))
{
v.erase(v.begin());
string key=v.front();
v.erase(v.begin());
string value=v.front();
time(&start);
// unsync the I/O of C and C++.
ios_base::sync_with_stdio(false);
client.PutKValue(key,value);
}
else if(!func.compare("Del"))
{
v.erase(v.begin());
string key=v.front();
time(&start);
// unsync the I/O of C and C++.
ios_base::sync_with_stdio(false);
client.DelKValue(key);
} }
}
thread_.join();
}
#endif
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service KeyValueStore {
// Sends a greeting
rpc GetValue (GetRequest) returns (GetReply) {}
rpc PutKValue (PutRequest) returns (PutReply) {}
rpc DelKValue (DelRequest) returns (DelReply) {}
}
// The request message containing the key.
message GetRequest {
string key = 1;
}
// The response message containing the value of corresponding key
message GetReply {
string value = 1;
int32 status=2;
}
// The request message containing the key and value.
message PutRequest {
string key = 1;
string value = 2;
}
// The response message containing message after adding key-value pair
message PutReply {
string message = 1;
int32 status=2;
}
// The request message containing the key and value.
message DelRequest {
string key = 1;
}
// The response message containing message after removing key-value pair
message DelReply {
string message = 1;
int32 status=2;
}
#include<stdlib.h>
#include<semaphore.h>
#include <stdint.h>
#include<unistd.h>
#include<pthread.h>
#include<stdio.h>
#include<iostream>
#include<fcntl.h>
#include<sys/stat.h>
#include <limits.h>
#include <sys/types.h>
#include<string.h>
#include<unistd.h>
int *fds ,setSize=4,*readCounters,readercount = 0;;
sem_t *mutex,*readingLock,x,y;
pthread_t tid,writerthreads[100],readerthreads[100];
unsigned modulusIndex( char *num, size_t size, unsigned divisor) {
unsigned rem = 0,temp = 0, i;
while(1)
{
if(i>=divisor)
break;
temp=num[i]%divisor;
rem += temp;
i++;
}
unsigned ans = rem%divisor;
return ans;
}
int file_search(char *key, char *value, int index) {
// Setting pointer to the start of file
lseek(fds[index], 0, SEEK_SET);
char fkey[256],fval[256];
int offset = 0,size,temp;
do {
// Reading key
size = read(fds[index], fkey, sizeof(fkey));
if (size<=0) break;
else temp=1;
// Reading value
size=read(fds[index], fval, sizeof(fval));
if(size<=0) break;
else temp=1;
// Comparing keys if found same we will update the key on offset and copy the value in value incase of get call
if(strcmp(key,fkey) ==0 ) {
if(value!=0)
{
// copy key's value
memcpy(value, fval, sizeof(fval));
}
else{
}
return offset;
}
offset += sizeof(fval)+sizeof(fkey);
} while (1);
temp=1;
return -1;
}
int file_get(char *key, char *value)
{
int index=modulusIndex(key,256,setSize),temp;
int flag =0;
sem_wait(&readingLock[index]);
// Reader started reading
readCounters[index]+=1;
// Readers are there lock the mutex
if(readCounters[index]==1)
sem_wait(&mutex[index]);
sem_post(&readingLock[index]);
if(file_search(key, value, index)>=0) {
flag = 1;
}
sem_wait(&readingLock[index]);
readCounters[index]-=1;
if(readCounters[index]==0)
sem_post(&mutex[index]);
else
temp=1;
sem_post(&readingLock[index]);
return flag;
}
int file_del( char *key)
{
// flag variable
long long int flag=0,l=10;
// finding index of the key
int offset,index=modulusIndex(key,256,setSize);
char fkey[256] ,fval[256];
// Locking the mutex
sem_wait(&mutex[index]);
//looking for offset
offset = file_search(key, NULL, index);
if(!(offset<0)) {
lseek(fds[index], offset, SEEK_SET);
// setting value to all zeros
char data[256];
read(fds[index], data, sizeof(fkey));
lseek(fds[index], offset, SEEK_SET);
memset(fkey, 0, 256);
flag = 1;
lseek(fds[index], offset, SEEK_SET);
read(fds[index], data, sizeof(fkey));
memset(fval, 0, 256);
while(l>0)
l--;
lseek(fds[index], offset, SEEK_SET);
write(fds[index], fkey, 256);
lseek(fds[index], offset, SEEK_SET);
write(fds[index], fval, 256);
flag = 1;
}
sem_post(&mutex[index]);
return flag;
}
int storage_init()
{
readCounters=(int *) malloc(sizeof(int)*setSize);
// Creating Array of File Descriptors
fds=(int*)malloc(sizeof(int)*setSize);
// Creating array of read locks
readingLock=(sem_t *) malloc(sizeof(sem_t)*setSize);
mutex=(sem_t *)malloc(sizeof(sem_t)*setSize);
char nameOfFile[22+setSize];
long long int i=0;
while(i<setSize)
{
int temp=0;
readCounters[i]=0;
snprintf(nameOfFile,sizeof(nameOfFile),"data%d.txt",i);
if(temp)
temp=1;
else
temp=0;
temp++;
fds[i]=open(nameOfFile, O_CREAT|O_RDWR,S_IRWXU);
if(fds[i]<0)std::cout<<("\n[Error : Cannot open File%d.txt]\n",i);
else
temp=1;
//Initializing semaphores
sem_init(&readingLock[i],0,1);
//Initializing mutexes
sem_init(&mutex[i],0,1);
i=i+1;
}
return 0;
}
void file_put(char *key,char *value) {
/*
if found then ask the offset where it is present and if the value noy matches with the present value ,update the given line
if not present then search for empty line and insert there!
*/
int offset,temp=0;
int index=modulusIndex(key,256,setSize);
temp=0;
sem_wait(&mutex[index]);
offset = file_search(key, NULL, index);
// if we got nothing in the existing set
if(offset < 0) {
//adding value at the end of file
lseek(fds[index], 0, SEEK_END);
write(fds[index], key, 256);
temp=1;
write(fds[index], value, 256);
} else {
//adding value at returned offset
lseek(fds[index], offset, SEEK_SET);
write(fds[index], key, 256);
temp=1;
write(fds[index], value, 256);
}
sem_post(&mutex[index]);
}
#ifndef METEO_GRPC_SERVER_H
#define METEO_GRPC_SERVER_H
#include "threadpool.h"
#include <functional>
#include <stdexcept>
#include <fstream>
#include <algorithm>
#include <memory>
#include <iostream>
#include <cmath>
#include <string>
#include "assert.h"
#include "cache.h"
#include <sys/epoll.h>
#include <unistd.h>
#include <error.h>
#include <pthread.h>
#include <grpc++/grpc++.h>
#include "kv.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerAsyncWriter;
using grpc::ServerAsyncReader;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerCompletionQueue;
using grpc::Status;
using helloworld::GetRequest;
using helloworld::GetReply;
using helloworld::PutRequest;
using helloworld::PutReply;
using helloworld::DelRequest;
using helloworld::DelReply;
using helloworld::KeyValueStore;
using namespace std;
pthread_mutex_t lock1;
int tech=0;
string port="2021";
ofstream MyFile("server.logs");
struct configuration
{
int cacheSize, threadPSize,listeningPort;
std::string cacheRType;
} settings;
cache c;
std::string getFromMap(std::string key)
{
// Key is not present
MyFile <<"Get "<<key;
if(!(c.get_entry(key).empty()))
{
string ans= c.get_entry(key);
MyFile<<" Succesfull"<<endl;
return ans;
}
else{
char val[256];
char *temp=toCharArray(key);
if(file_get(temp,val)){
free(temp);
MyFile<<" Succesfull"<<endl;
return toString(val);
}
MyFile<<" Failed"<<endl;
return "KEY NOT EXIST";
}
}
std::string putIntoMap(std::string key ,std::string value){
MyFile <<"Put "<<key;
if(c.put_into_cache(key,value,tech)){
MyFile<<" Succesfull"<<endl;
return "Key: "+key+" Added Successfully";
}
MyFile<<" Failed"<<endl;
return "Error Occured While Adding Key : "+key;
}
std::string deleteFromMap(std::string key)
{
MyFile <<"Delete"<<key;
if(c.delete_key(key)){
MyFile<<" Succesfull"<<endl;
return "Deleted Key : "+key;
}
MyFile<<" Failed"<<endl;
return "KEY NOT EXIST";
}
class CommonCallData
{
public:
// The means of communication with the gRPC runtime for an asynchronous
// server.
KeyValueStore::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
// What we get from the client.
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // The current serving state.
public:
explicit CommonCallData(KeyValueStore::AsyncService* service, ServerCompletionQueue* cq):
service_(service), cq_(cq),status_(CREATE)
{}
virtual ~CommonCallData()
{
// std::cout << "CommonCallData destructor" << std::endl;
}
virtual void Proceed(bool = true) = 0;
};
class CallData: public CommonCallData
{
ServerAsyncResponseWriter<GetReply> responder_;
public:
GetRequest request_;
GetReply reply_;
CallData(KeyValueStore::AsyncService* service, ServerCompletionQueue* cq):
CommonCallData(service, cq), responder_(&ctx_){Proceed();}
virtual void Proceed(bool = true) override
{
if (status_ == CREATE)
{
// std::cout << "GetKey Value Function " << std::endl;
status_ = PROCESS;
service_->RequestGetValue(&ctx_, &request_, &responder_, cq_, cq_, this);
}
else if (status_ == PROCESS)
{
new CallData(service_, cq_);
// std::cout << "key = " << request_.key() << std::endl;
reply_.set_value(getFromMap(request_.key()));
if(!getFromMap(request_.key()).compare("KEY NOT EXIST"))
{
reply_.set_status(400);
}
else
{
reply_.set_status(200);
}
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
}
else
{
GPR_ASSERT(status_ == FINISH);
// std::cout << " Get Function Done" << std::endl;
delete this;
}
}
};
class CallData1M: public CommonCallData
{
ServerAsyncResponseWriter<PutReply> responder_;
public:
PutRequest request_;
PutReply reply_;
CallData1M(KeyValueStore::AsyncService* service, ServerCompletionQueue* cq):
CommonCallData(service, cq), responder_(&ctx_){Proceed();}
virtual void Proceed(bool = true) override
{
if(status_ == CREATE)
{
// std::cout << "PutKeyValue Function" << std::endl;
service_->RequestPutKValue(&ctx_, &request_, &responder_, cq_, cq_, this);
status_ = PROCESS ;
}
else if(status_ == PROCESS)
{
new CallData1M(service_, cq_);
// std::cout << "Key:" << request_.key()<<"Value: "<<request_.value() << std::endl;
reply_.set_message(putIntoMap(request_.key(),request_.value()));
if(!putIntoMap(request_.key(),request_.value()).compare("Error Occured While Adding Key"))
{
reply_.set_status(400);
}
else
{
reply_.set_status(200);
}
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
}
else
{
GPR_ASSERT(status_ == FINISH);
//s std::cout << "PutKeyValue Function Done" << std::endl;
delete this;
}
}
};
class CallDataMM: public CommonCallData
{
ServerAsyncResponseWriter<DelReply> responder_;
public:
DelRequest request_;
DelReply reply_;
CallDataMM(KeyValueStore::AsyncService* service, ServerCompletionQueue* cq):
CommonCallData(service, cq), responder_(&ctx_){Proceed();}
virtual void Proceed(bool = true) override
{
if(status_ == CREATE)
{
// std::cout << "DelKeyValue Function" << std::endl;
service_->RequestDelKValue(&ctx_, &request_, &responder_, cq_, cq_, this);
status_ = PROCESS ;
}
else if(status_ == PROCESS)
{
new CallDataMM(service_, cq_);
// std::cout << "Key:" << request_.key()<< std::endl;
reply_.set_message(deleteFromMap(request_.key()));
if(!deleteFromMap(request_.key()).compare("Could Not Delete Key"))
{
reply_.set_status(400);
}
else
{
reply_.set_status(200);
}
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
}
else
{
GPR_ASSERT(status_ == FINISH);
// std::cout << "DelKeyValue Function Done" << std::endl;
delete this;
}
}
};
class ServerImpl
{
public:
~ServerImpl()
{
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
void Run(threadpool *pool)
{
std::string server_address("0.0.0.0:"+port);
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
new CallData1M(&service_, cq_.get());
new CallDataMM(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while(true)
{
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
CommonCallData* calldata = static_cast<CommonCallData*>(tag);
pool->enqueue([calldata](){calldata->Proceed();});
}
}
private:
std::unique_ptr<ServerCompletionQueue> cq_;
KeyValueStore::AsyncService service_;
std::unique_ptr<Server> server_;
};
int readFile()
{
FILE *fp;
fp = fopen("client.cc", "r");
if (fp == NULL)
{
cout<<"Error in reading file:"<< errno << std::endl;
return -1;
}
fclose(fp);
return 0;
/*
FILE * fp;
char * line = NULL;
size_t len = 0;
ssize_t read;
char *param;
char *value;
int i;
fp = fopen("settings.conf", "r");
if (fp == NULL)
{
cout<<"Error in reading file:"<< errno << std::endl;
return -1;
}
while ((read = getline(&line, &len, fp)) != -1) {
i = 0;
if(line[0]=='#') {
break;
} else {
param=strtok(line,"=");
value=strtok(NULL,"=");
if(strcmp(param,"LISTENING_PORT")==0) {
settings.listeningPort = atoi(value);
}
else if (strcmp(param,"CACHE_REPLACEMENT_TYPE")==0) {
settings.cacheRType = atoi(value);
}
else if (strcmp(param,"CACHE_SIZE")==0) {
settings.cacheSize = atoi(value);
} else if (strcmp(param,"THREAD_POOL_SIZE")==0) {
settings.threadPSize = atoi(value);
} else {
}
}
}
fclose(fp);
*/
}
int main(int argc, char* argv[])
{
ifstream file("server.config");
string str1,str2,str3;
int k=0;
getline(file, str1);
port = str1;
getline(file, str2);
if(!str2.compare("LRU")){
tech=1;
}
getline(file, str2);
int cache_size=stoi(str2);
getline(file, str2);
int num_threads=stoi(str2);
ServerImpl server;
//readFile();
c.initialize_cache(cache_size);
threadpool pool{num_threads};
server.Run(&pool);
MyFile.close();
}
#endif
50051
LSU
30
300
\ No newline at end of file
LISTENING_PORT=50051
CACHE_REPLACEMENT_TYPE=LRU
CACHE_SIZE=15
THREAD_POOL_SIZE=10
#include<stdlib.h>
#include<semaphore.h>
#include <stdint.h>
#include<unistd.h>
#include<pthread.h>
#include<stdio.h>
#include<fcntl.h>
#include<sys/stat.h>
#include <limits.h>
#include <sys/types.h>
#include<string.h>
#include<unistd.h>
#include<iostream>
using namespace std;
int *fds;
int setSize=4;
int *readCounters;
sem_t *mutex1;
sem_t *readerLocks;
sem_t x,y;
pthread_t tid;
pthread_t writerthreads[100],readerthreads[100];
int readercount = 0;
unsigned modulus1( char *num, size_t size, unsigned divisor) {
unsigned rem = 0,temp = 0;
int i=0;
while(1)
{
if(i>=divisor)
break;
temp=num[i]%divisor;
rem += temp;
i++;
}
unsigned ans = rem%divisor;
return ans;
}
int file_search(char *key, char *value, int index) {
// Setting pointer to the start of file
lseek(fds[index], 0, SEEK_SET);
char fkey[256],fval[256];
int offset = 0;
int size;
do {
// Reading key
size = read(fds[index], fkey, sizeof(fkey));
if (size<=0) break;
// Reading value
size=read(fds[index], fval, sizeof(fval));
if(size<=0) break;
// Comparing keys if found same we will update the key on offset and copy the value in value incase of get call
if(strcmp(key,fkey) ==0 ) {
if(value!=0)
{
// copy key's value
memcpy(value, fval, sizeof(fval));
}
return offset;
}
offset += sizeof(fkey) + sizeof(fval);
} while (1);
return -1;
}
char *toCharArray(string surname)
{
char *temp = new char[265];
for(int i=0;i<=surname.length();i++)
{
temp[i]=surname[i];
}
return temp;
}
string toString(char* a)
{
int i;
string s = "";
for (i = 0; i < 256; i++) {
s = s + a[i];
}
return s;
}
int file_del( char *key)
{
int offset;
// finding index of the key
int index=modulus1(key,256,setSize);
char fkey[256] ,fval[256];
// Locking the mutex1
sem_wait(&mutex1[index]);
// flag variable
int flag=0;
//looking for offset
offset = file_search(key, NULL, index);
if(offset >= 0) {
lseek(fds[index], offset, SEEK_SET);
// setting value to all zeros
memset(fkey, 0, 256);
memset(fval, 0, 256);
write(fds[index], fkey, 256);
write(fds[index], fval, 256);
flag = 1;
}
sem_post(&mutex1[index]);
return flag;
}
int file_get(char *key, char *value)
{
/* Gets the value stored at offset */
/* Does not depend on key argument */
int index=modulus1(key,256,setSize);
int flag =0;
sem_wait(&readerLocks[index]);
// Reader started reading
readCounters[index]+=1;
// Readers are there lock the mutex1
if(readCounters[index]==1)
sem_wait(&mutex1[index]);
sem_post(&readerLocks[index]);
if(file_search(key, value, index)>=0) {
flag = 1;
}
sem_wait(&readerLocks[index]);
readCounters[index]-=1;
if(readCounters[index]==0)
{
sem_post(&mutex1[index]);
}
sem_post(&readerLocks[index]);
return flag;
}
void file_put(char *key,char *value) {
/*
if found then ask the offset where it is present and if the value noy matches with the present value ,update the given line
if not present then search for empty line and insert there!
*/
int offset;
int index=modulus1(key,256,setSize);
sem_wait(&mutex1[index]);
offset = file_search(key, NULL, index);
// if we got nothing in the existing set
if(offset < 0) {
//adding value at the end of file
lseek(fds[index], 0, SEEK_END);
write(fds[index], key, 256);
write(fds[index], value, 256);
} else {
//adding value at returned offset
lseek(fds[index], offset, SEEK_SET);
write(fds[index], key, 256);
write(fds[index], value, 256);
}
sem_post(&mutex1[index]);
}
int storage_init()
{
// Creating Array of File Descriptors
fds=(int *)malloc(sizeof(int)*setSize);
int i=0;
/*
define the array of file descriptors depending on the prefix
define the array of readCount as well as the semaphore (read x and write y) for the same
PUT,DEL would use write lock
GET would use read lock
each write should return the line number
*/
// Creating array of read locks
readerLocks=(sem_t *) malloc(sizeof(sem_t)*setSize);
// Creating array of readcounters
readCounters=(int *) malloc(sizeof(int)*setSize);
mutex1=(sem_t *)malloc(sizeof(sem_t)*setSize);
char fileName[22+setSize];
for(i=0;i<setSize;i++)
{
snprintf(fileName,sizeof(fileName),"File%d.txt",i);
fds[i]=open(fileName, O_CREAT|O_RDWR,S_IRWXU);
if(fds[i]<0)
{
printf("\n[Error : Cannot open File%d.txt]\n",i);
}
//Initializing semaphores
sem_init(&readerLocks[i],0,1);
//Initializing mutex1es
sem_init(&mutex1[i],0,1);
readCounters[i]=0;
}
return 0;
}
// Reference: Thread Pool Tutorial - How-To (Youtube), https://github.com/mtrebi/thread-pool
#pragma once
#include <iostream>
#include <functional>
#include <thread>
#include <atomic>
#include <vector>
#include <memory>
#include <exception>
#include <future>
#include <mutex>
#include <unistd.h>
#include <queue>
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/epoll.h>
class threadpool {
public:
using job = std::function<void()>;
explicit threadpool(std::size_t nthreads){
num_threads = nthreads;
start(num_threads);
}
template<typename F, typename...Args>
auto enqueue(F&& new_job, Args&&... args) -> std::future<decltype(new_job(args...))> {
{
std::unique_lock<std::mutex> lock(event_lock);
std::function<decltype(new_job(args...))()> func = std::bind(std::forward<F>(new_job), std::forward<Args>(args)...);
auto task_ptr = std::make_shared<std::packaged_task<decltype(new_job(args...))()>>(func);
std::function<void()> wrapper_func = [task_ptr]() {
(*task_ptr)();
};
jobs.emplace(wrapper_func);
}
event_notifier.notify_one();
}
std::size_t num_threads;
private:
std::mutex event_lock;
bool threadstop = false;
std::vector<std::thread> threads;
std::condition_variable event_notifier;
std::queue<job> jobs;
void start(std::size_t num_threads){
for (int i = 0; i < num_threads; i++) {
threads.emplace_back([&] {
job job_to_be_executed;
while(true) {
// get lock as critical section scope entered
{
std::unique_lock<std::mutex> lock(event_lock);
event_notifier.wait(lock);
if (threadstop) {
break;
}
if (!jobs.empty()) {
job_to_be_executed = std::move(jobs.front());
jobs.pop();
}
}
// Release lock as critical section scope is over
job_to_be_executed();
}
});
}
}
};
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment