Commit 622c0a03 authored by Vishal Saha's avatar Vishal Saha

implementation of sharing keys on server join

parent 5e582cf1
...@@ -77,7 +77,7 @@ public: ...@@ -77,7 +77,7 @@ public:
string getKeyValuePairs(int id) string getKeyValuePairs(int id)
{ {
return "keyvaluepairs"; return mycache.getKeyValuePairs(id);
} }
}; };
...@@ -123,7 +123,9 @@ public: ...@@ -123,7 +123,9 @@ public:
mycache.pushAll(); mycache.pushAll();
} }
// key1;key2;key3;;value1;value2;value3;
string getKeyValuePairs(int id) string getKeyValuePairs(int id)
{ {
return mycache.getKeyValuePairs(id);
} }
}; };
\ No newline at end of file
...@@ -10,7 +10,8 @@ ...@@ -10,7 +10,8 @@
using namespace std; using namespace std;
class LFUCache { class LFUCache
{
private: private:
int capacity; int capacity;
LFUNode **cacheHeap; LFUNode **cacheHeap;
...@@ -18,29 +19,34 @@ private: ...@@ -18,29 +19,34 @@ private:
std::mutex mtx; std::mutex mtx;
string deleted; string deleted;
int parent(int i) { int parent(int i)
{
if (i % 2 == 0) if (i % 2 == 0)
return (i / 2) - 1; return (i / 2) - 1;
else else
return (i - 1) / 2; return (i - 1) / 2;
} }
int left_child(int i) { int left_child(int i)
{
return (2 * i) + 1; return (2 * i) + 1;
} }
int right_child(int i) { int right_child(int i)
{
return (2 * i) + 2; return (2 * i) + 2;
} }
int exists(string key) { int exists(string key)
{
for (int i = curr_pos; i >= 0; i--) for (int i = curr_pos; i >= 0; i--)
if (key.compare(cacheHeap[i]->key) == 0) if (key.compare(cacheHeap[i]->key) == 0)
return i; return i;
return -1; return -1;
} }
void swap(int i, int j) { void swap(int i, int j)
{
cacheHeap[i]->frequency = cacheHeap[i]->frequency + cacheHeap[j]->frequency; cacheHeap[i]->frequency = cacheHeap[i]->frequency + cacheHeap[j]->frequency;
cacheHeap[j]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency; cacheHeap[j]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency;
cacheHeap[i]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency; cacheHeap[i]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency;
...@@ -52,8 +58,10 @@ private: ...@@ -52,8 +58,10 @@ private:
cacheHeap[j]->value = temp; cacheHeap[j]->value = temp;
} }
void heapify_up(int i) { void heapify_up(int i)
while (true) { {
while (true)
{
if (i == 0) if (i == 0)
break; break;
if (cacheHeap[parent(i)]->frequency <= cacheHeap[i]->frequency) if (cacheHeap[parent(i)]->frequency <= cacheHeap[i]->frequency)
...@@ -64,17 +72,21 @@ private: ...@@ -64,17 +72,21 @@ private:
} }
} }
void heapify_down(int i) { void heapify_down(int i)
while (true) { {
while (true)
{
if (left_child(i) > curr_pos) if (left_child(i) > curr_pos)
break; break;
if (right_child(i) > curr_pos && cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency) if (right_child(i) > curr_pos && cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency)
break; break;
if (right_child(i) <= curr_pos) { if (right_child(i) <= curr_pos)
{
if (cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency && cacheHeap[right_child(i)]->frequency >= cacheHeap[i]->frequency) if (cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency && cacheHeap[right_child(i)]->frequency >= cacheHeap[i]->frequency)
break; break;
} }
if (right_child(i) <= curr_pos) { if (right_child(i) <= curr_pos)
{
if (cacheHeap[left_child(i)]->frequency < cacheHeap[right_child(i)]->frequency) if (cacheHeap[left_child(i)]->frequency < cacheHeap[right_child(i)]->frequency)
swap(i, left_child(i)); swap(i, left_child(i));
else else
...@@ -85,16 +97,19 @@ private: ...@@ -85,16 +97,19 @@ private:
} }
} }
string insert(string key, string value) { string insert(string key, string value)
{
int i = exists(key); int i = exists(key);
if (i != -1) { if (i != -1)
{
cacheHeap[i]->frequency++; cacheHeap[i]->frequency++;
cacheHeap[i]->value = value; cacheHeap[i]->value = value;
heapify_down(i); heapify_down(i);
return INSERT_SUCCESS; return INSERT_SUCCESS;
} }
curr_pos++; curr_pos++;
if (curr_pos == capacity) { if (curr_pos == capacity)
{
curr_pos--; curr_pos--;
delete_min(true); delete_min(true);
insert(key, value); insert(key, value);
...@@ -107,14 +122,16 @@ private: ...@@ -107,14 +122,16 @@ private:
return INSERT_SUCCESS; return INSERT_SUCCESS;
} }
string delete_min(bool keep) { string delete_min(bool keep)
{
if (curr_pos == -1) if (curr_pos == -1)
return CACHE_EMPTY; return CACHE_EMPTY;
swap(0, curr_pos); swap(0, curr_pos);
curr_pos--; curr_pos--;
if (curr_pos >= 0) if (curr_pos >= 0)
heapify_down(0); heapify_down(0);
if (keep) { if (keep)
{
string filename = getFilename(cacheHeap[curr_pos + 1]->key); string filename = getFilename(cacheHeap[curr_pos + 1]->key);
ofstream fout; ofstream fout;
fout.open(filename, ios::app); fout.open(filename, ios::app);
...@@ -125,7 +142,8 @@ private: ...@@ -125,7 +142,8 @@ private:
return DELETE_SUCCESS; return DELETE_SUCCESS;
} }
void insertAll(unordered_map<string, string> flush) { void insertAll(unordered_map<string, string> flush)
{
int to_empty = flush.size() - capacity + curr_pos + 1; int to_empty = flush.size() - capacity + curr_pos + 1;
for (int i = 0; i < to_empty; i++) for (int i = 0; i < to_empty; i++)
delete_min(true); delete_min(true);
...@@ -134,12 +152,14 @@ private: ...@@ -134,12 +152,14 @@ private:
insert(itr->first, itr->second); insert(itr->first, itr->second);
} }
string getNodeByKey(string key) { string getNodeByKey(string key)
{
string value; string value;
int i = exists(key); int i = exists(key);
if (i == -1) if (i == -1)
return KEY_NOT_FOUND; return KEY_NOT_FOUND;
else { else
{
cacheHeap[i]->frequency++; cacheHeap[i]->frequency++;
value = cacheHeap[i]->value; value = cacheHeap[i]->value;
heapify_down(i); heapify_down(i);
...@@ -147,11 +167,13 @@ private: ...@@ -147,11 +167,13 @@ private:
} }
} }
string deleteNodeByKey(string key) { string deleteNodeByKey(string key)
{
int i = exists(key); int i = exists(key);
if (i == -1) if (i == -1)
return KEY_NOT_FOUND; return KEY_NOT_FOUND;
else { else
{
cacheHeap[i]->frequency = 0; cacheHeap[i]->frequency = 0;
heapify_up(i); heapify_up(i);
delete_min(false); delete_min(false);
...@@ -165,7 +187,8 @@ private: ...@@ -165,7 +187,8 @@ private:
} }
} }
string getFilename(string key) { string getFilename(string key)
{
string filename = "."; string filename = ".";
int length = key.size(); int length = key.size();
if (length == 1) if (length == 1)
...@@ -177,34 +200,40 @@ private: ...@@ -177,34 +200,40 @@ private:
return filename; return filename;
} }
bool fileExists(string filename) { bool fileExists(string filename)
{
ifstream f(filename.c_str()); ifstream f(filename.c_str());
return f.good(); return f.good();
} }
public: public:
LFUCache() { LFUCache()
{
curr_pos = -1; curr_pos = -1;
deleted = ""; deleted = "";
deleted += char(0); deleted += char(0);
} }
void setCap(int capacity) { void setCap(int capacity)
{
this->capacity = capacity; this->capacity = capacity;
cacheHeap = (LFUNode **)malloc(sizeof(LFUNode *) * capacity); cacheHeap = (LFUNode **)malloc(sizeof(LFUNode *) * capacity);
for(int i = 0; i < capacity; i++) for (int i = 0; i < capacity; i++)
cacheHeap[i] = new LFUNode(deleted, deleted); cacheHeap[i] = new LFUNode(deleted, deleted);
} }
string GET(string key, int *status) { string GET(string key, int *status)
{
//std::lock_guard<std::mutex> guard(mtx); //std::lock_guard<std::mutex> guard(mtx);
string value = getNodeByKey(key); string value = getNodeByKey(key);
if (value.compare(KEY_NOT_FOUND) != 0) { if (value.compare(KEY_NOT_FOUND) != 0)
{
*status = 200; *status = 200;
return value; return value;
} }
string filename = getFilename(key); string filename = getFilename(key);
if (!fileExists(filename)) { if (!fileExists(filename))
{
*status = 400; *status = 400;
return KEY_NOT_FOUND; return KEY_NOT_FOUND;
} }
...@@ -212,7 +241,8 @@ public: ...@@ -212,7 +241,8 @@ public:
ifstream fin; ifstream fin;
unordered_map<string, string> flush; unordered_map<string, string> flush;
fin.open(filename); fin.open(filename);
do { do
{
getline(fin, key1); getline(fin, key1);
if (key1.size() == 0) if (key1.size() == 0)
break; break;
...@@ -237,33 +267,39 @@ public: ...@@ -237,33 +267,39 @@ public:
return value; return value;
} }
void PUT(string key, string value) { void PUT(string key, string value)
{
//mtx.lock(); //mtx.lock();
insert(key, value); insert(key, value);
//mtx.unlock(); //mtx.unlock();
} }
void DEL(string key, int *status) { void DEL(string key, int *status)
{
int status1; int status1;
string value = GET(key, &status1); string value = GET(key, &status1);
if (status1 == 400) { if (status1 == 400)
{
*status = 400; *status = 400;
return; return;
} }
deleteNodeByKey(key); deleteNodeByKey(key);
} }
void pushAll() { void pushAll()
{
while (curr_pos >= 0) while (curr_pos >= 0)
delete_min(true); delete_min(true);
} }
void reformat(string filename) { void reformat(string filename)
{
ifstream fin; ifstream fin;
fin.open(filename); fin.open(filename);
unordered_map<string, string> flush; unordered_map<string, string> flush;
string key, value; string key, value;
do { do
{
getline(fin, key); getline(fin, key);
if (key.size() == 0) if (key.size() == 0)
break; break;
...@@ -279,15 +315,87 @@ public: ...@@ -279,15 +315,87 @@ public:
ofstream fout; ofstream fout;
fout.open(filename); fout.open(filename);
unordered_map<string, string>::iterator itr; unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end(); itr++) { for (itr = flush.begin(); itr != flush.end(); itr++)
{
fout << itr->first << "\n"; fout << itr->first << "\n";
fout << itr->second << "\n"; fout << itr->second << "\n";
} }
} }
void traverse() { void traverse()
{
for (int i = 0; i <= curr_pos; i++) for (int i = 0; i <= curr_pos; i++)
cout << cacheHeap[i]->key << endl; cout << cacheHeap[i]->key << endl;
cout << endl; cout << endl;
} }
string getKeyValuePairs(int id)
{
unordered_map<string, string> flush;
string keyValPairs = "";
DIR *dirFile = opendir(".");
if (dirFile)
{
struct dirent *hFile;
while ((hFile = readdir(dirFile)) != NULL)
{
string fName(hFile->d_name);
string fileName(hFile->d_name);
int n;
if ((n = fileName.size()) < 8)
continue;
if (fileName[n - 3] == 'k' && fileName[n - 2] == 'v' && fileName[n - 1] == 'm')
{
fileName.erase(n - 4);
fileName = fileName.substr(2);
if (hash(fileName) <= id)
{
ifstream fin;
fin.open(fName);
string _key, val;
do
{
getline(fin, _key);
if (_key.size() == 0)
break;
getline(fin, val);
if (val != deleted)
flush[_key] = val;
else
flush.erase(_key);
if (fin.eof())
break;
} while (fin);
fin.close();
const char *c = fName.c_str();
remove(c);
}
}
}
closedir(dirFile);
}
Node *temp = head->next;
while (temp->next)
{
if (hash(temp->key) <= id)
{
flush[temp->key] = temp->payload;
}
temp = temp->next;
}
unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end(); itr++)
keyValPairs += itr->first + ";";
keyValPairs += ";";
for (itr = flush.begin(); itr != flush.end(); itr++)
keyValPairs += itr->second + ";";
return keyValPairs;
}
}; };
...@@ -6,15 +6,18 @@ ...@@ -6,15 +6,18 @@
using namespace std; using namespace std;
inline bool fileExists(const std::string &name) { inline bool fileExists(const std::string &name)
{
struct stat buffer; struct stat buffer;
return (stat(name.c_str(), &buffer) == 0); return (stat(name.c_str(), &buffer) == 0);
} }
string getFilename(string key) { string getFilename(string key)
{
string filename = "."; string filename = ".";
int length = key.size(); int length = key.size();
if (length == 1) { if (length == 1)
{
return "_.kvm"; return "_.kvm";
} }
filename += char(length - 1); filename += char(length - 1);
...@@ -24,7 +27,8 @@ string getFilename(string key) { ...@@ -24,7 +27,8 @@ string getFilename(string key) {
return filename; return filename;
} }
class LRUcache { class LRUcache
{
private: private:
int capacity; int capacity;
string deleted; string deleted;
...@@ -34,7 +38,8 @@ private: ...@@ -34,7 +38,8 @@ private:
unordered_map<string, Node *> cache; unordered_map<string, Node *> cache;
public: public:
LRUcache() { LRUcache()
{
head = new Node("HEAD", "HEAD"); head = new Node("HEAD", "HEAD");
tail = new Node("TAIL", "TAIL"); tail = new Node("TAIL", "TAIL");
head->next = tail; head->next = tail;
...@@ -43,11 +48,13 @@ public: ...@@ -43,11 +48,13 @@ public:
deleted += char(0); deleted += char(0);
} }
void setCap(int capacity) { void setCap(int capacity)
{
this->capacity = capacity; this->capacity = capacity;
} }
LRUcache(int capacity) { LRUcache(int capacity)
{
this->capacity = capacity; this->capacity = capacity;
head = new Node("HEAD", "HEAD"); head = new Node("HEAD", "HEAD");
tail = new Node("TAIL", "TAIL"); tail = new Node("TAIL", "TAIL");
...@@ -57,7 +64,8 @@ public: ...@@ -57,7 +64,8 @@ public:
deleted += char(0); deleted += char(0);
} }
Node *getUtil(string key) { Node *getUtil(string key)
{
if (cache.find(key) == cache.end()) if (cache.find(key) == cache.end())
return NULL; return NULL;
Node *x = cache[key]; Node *x = cache[key];
...@@ -73,11 +81,14 @@ public: ...@@ -73,11 +81,14 @@ public:
return x; return x;
} }
string get(string key, int *status) { string get(string key, int *status)
{
//std::lock_guard<std::mutex> guard(mtx); //std::lock_guard<std::mutex> guard(mtx);
Node *x = getUtil(key); Node *x = getUtil(key);
if (x) { if (x)
if (x->payload == deleted) { {
if (x->payload == deleted)
{
*status = 400; *status = 400;
return NOT_EXIST; return NOT_EXIST;
} }
...@@ -86,7 +97,8 @@ public: ...@@ -86,7 +97,8 @@ public:
string fileName = getFilename(key); string fileName = getFilename(key);
if (!fileExists(fileName)) { if (!fileExists(fileName))
{
*status = 400; *status = 400;
return NOT_EXIST; return NOT_EXIST;
} }
...@@ -97,7 +109,8 @@ public: ...@@ -97,7 +109,8 @@ public:
string _key, val; string _key, val;
unordered_map<string, string> flush; unordered_map<string, string> flush;
do { do
{
getline(fin, _key); getline(fin, _key);
if (_key.size() == 0) if (_key.size() == 0)
break; break;
...@@ -113,12 +126,14 @@ public: ...@@ -113,12 +126,14 @@ public:
fin.close(); fin.close();
unordered_map<string, string>::iterator itr; unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end() && cache.size() < capacity - 1; itr++) { for (itr = flush.begin(); itr != flush.end() && cache.size() < capacity - 1; itr++)
{
this->put(itr->first, itr->second); this->put(itr->first, itr->second);
flush[itr->first] = deleted; flush[itr->first] = deleted;
} }
if (flush.find(key) != flush.end()) { if (flush.find(key) != flush.end())
{
this->put(key, flush[key]); this->put(key, flush[key]);
flush[key] = deleted; flush[key] = deleted;
} }
...@@ -128,7 +143,8 @@ public: ...@@ -128,7 +143,8 @@ public:
ofstream fout; ofstream fout;
fout.open(fileName); fout.open(fileName);
for (itr = flush.begin(); itr != flush.end(); itr++) { for (itr = flush.begin(); itr != flush.end(); itr++)
{
if (itr->second == deleted) if (itr->second == deleted)
continue; continue;
fout << itr->first << "\n"; fout << itr->first << "\n";
...@@ -136,7 +152,8 @@ public: ...@@ -136,7 +152,8 @@ public:
} }
fout.close(); fout.close();
x = getUtil(key); x = getUtil(key);
if (x) { if (x)
{
return x->payload; return x->payload;
} }
*status = 400; *status = 400;
...@@ -144,12 +161,14 @@ public: ...@@ -144,12 +161,14 @@ public:
return NOT_EXIST; return NOT_EXIST;
} }
void del(string key, int *status) { void del(string key, int *status)
{
int status2 = 200; int status2 = 200;
string result = get(key, &status2); string result = get(key, &status2);
// mtx.lock(); // mtx.lock();
if (status2 == 400) { if (status2 == 400)
{
*status = 400; *status = 400;
// mtx.unlock(); // mtx.unlock();
return; return;
...@@ -159,9 +178,11 @@ public: ...@@ -159,9 +178,11 @@ public:
// mtx.unlock(); // mtx.unlock();
} }
void pushAll() { void pushAll()
{
unordered_map<string, Node *>::iterator itr; unordered_map<string, Node *>::iterator itr;
for (itr = cache.begin(); itr != cache.end(); itr++) { for (itr = cache.begin(); itr != cache.end(); itr++)
{
string fileName = getFilename(itr->first); string fileName = getFilename(itr->first);
ofstream fout; ofstream fout;
...@@ -177,12 +198,15 @@ public: ...@@ -177,12 +198,15 @@ public:
tail->prev = head; tail->prev = head;
} }
void put(string key, string payload) { void put(string key, string payload)
{
//mtx.lock(); //mtx.lock();
if (getUtil(key) != NULL) if (getUtil(key) != NULL)
cache[key]->payload = payload; cache[key]->payload = payload;
else { else
if (cache.size() == capacity) { {
if (cache.size() == capacity)
{
Node *x = tail->prev; Node *x = tail->prev;
string keyToBeFlushed = x->key; string keyToBeFlushed = x->key;
cache.erase(keyToBeFlushed); cache.erase(keyToBeFlushed);
...@@ -206,7 +230,9 @@ public: ...@@ -206,7 +230,9 @@ public:
head->next = x; head->next = x;
x->prev = head; x->prev = head;
} else { }
else
{
Node *x = new Node(key, payload); Node *x = new Node(key, payload);
cache[key] = x; cache[key] = x;
x->next = head->next; x->next = head->next;
...@@ -219,12 +245,90 @@ public: ...@@ -219,12 +245,90 @@ public:
//mtx.unlock(); //mtx.unlock();
} }
void traverse() { void traverse()
{
Node *temp = head->next; Node *temp = head->next;
while (temp->next) { while (temp->next)
{
cout << temp->key << "\n"; cout << temp->key << "\n";
temp = temp->next; temp = temp->next;
} }
cout << "\n"; cout << "\n";
} }
int hash(string s)
{
return (((int)s.at(0)) << 8) + ((int)s.at(1));
}
// key1;key2;key3;;value1;value2;value3;
string getKeyValuePairs(int id)
{
unordered_map<string, string> flush;
string keyValPairs = "";
DIR *dirFile = opendir(".");
if (dirFile)
{
struct dirent *hFile;
while ((hFile = readdir(dirFile)) != NULL)
{
string fName(hFile->d_name);
string fileName(hFile->d_name);
int n;
if ((n = fileName.size()) < 8)
continue;
if (fileName[n - 3] == 'k' && fileName[n - 2] == 'v' && fileName[n - 1] == 'm')
{
fileName.erase(n - 4);
fileName = fileName.substr(2);
if (hash(fileName) <= id)
{
ifstream fin;
fin.open(fName);
string _key, val;
do
{
getline(fin, _key);
if (_key.size() == 0)
break;
getline(fin, val);
if (val != deleted)
flush[_key] = val;
else
flush.erase(_key);
if (fin.eof())
break;
} while (fin);
fin.close();
const char *c = fName.c_str();
remove(c);
}
}
}
closedir(dirFile);
}
Node *temp = head->next;
while (temp->next)
{
if (hash(temp->key) <= id)
{
flush[temp->key] = temp->payload;
}
temp = temp->next;
}
unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end(); itr++)
keyValPairs += itr->first + ";";
keyValPairs += ";";
for (itr = flush.begin(); itr != flush.end(); itr++)
keyValPairs += itr->second + ";";
return keyValPairs;
}
}; };
LISTENING_PORT=50051 LISTENING_PORT=50055
CACHE_REPLACEMENT_TYPE=LRU CACHE_REPLACEMENT_TYPE=LRU
CACHE_SIZE=256 CACHE_SIZE=256
NUM_SERVER_THREADS=4 NUM_SERVER_THREADS=4
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment