Commit 36cd6757 authored by Mayank Manoj's avatar Mayank Manoj

Merge branch 'mayank' into 'master'

Mayank

See merge request !3
parents e94cec16 2cf6ddb2
...@@ -3,36 +3,46 @@ ...@@ -3,36 +3,46 @@
#include "LFU.h" #include "LFU.h"
using namespace std; using namespace std;
class memoryManagement { class memoryManagement
{
public: public:
virtual string get(int *a, string s) { virtual string get(int *a, string s)
{
return "This will never run"; return "This will never run";
} }
virtual void put(string a, string b) { virtual void put(string a, string b)
{
return; return;
} }
virtual void del(int *a, string s) { virtual void del(int *a, string s)
{
return; return;
} }
virtual void traverse() { virtual void traverse()
{
return; return;
} }
virtual void pushAll() { virtual void pushAll()
{
return; return;
} }
virtual string getKeyValuePairs(int id) { virtual string getKeyValuePairs(int id)
{
return "This will never run"; return "This will never run";
} }
}; };
class storageLRU : public memoryManagement { class storageLRU : public memoryManagement
{
public: public:
LRUcache mycache; LRUcache mycache;
storageLRU(int capacity) { storageLRU(int capacity)
{
mycache.setCap(capacity); mycache.setCap(capacity);
} }
string get(int *status, string key) { string get(int *status, string key)
{
string result = ""; string result = "";
int status2 = 200; int status2 = 200;
...@@ -42,37 +52,45 @@ public: ...@@ -42,37 +52,45 @@ public:
return result; return result;
} }
void put(string key, string val) { void put(string key, string val)
{
mycache.put(key, val); mycache.put(key, val);
} }
void del(int *status, string key) { void del(int *status, string key)
{
int status2 = 200; int status2 = 200;
mycache.del(key, &status2); mycache.del(key, &status2);
*status = status2; *status = status2;
} }
void traverse() { void traverse()
{
mycache.traverse(); mycache.traverse();
} }
void pushAll() { void pushAll()
{
mycache.pushAll(); mycache.pushAll();
} }
string getKeyValuePairs(int id) { string getKeyValuePairs(int id)
return "keyvaluepairs"; {
return mycache.getKeyValuePairs(id);
} }
}; };
class storageLFU : public memoryManagement { class storageLFU : public memoryManagement
{
public: public:
LFUCache mycache; LFUCache mycache;
storageLFU(int capacity) { storageLFU(int capacity)
{
mycache.setCap(capacity); mycache.setCap(capacity);
} }
string get(int *status, string key) { string get(int *status, string key)
{
string result = ""; string result = "";
int status2 = 200; int status2 = 200;
...@@ -82,26 +100,32 @@ public: ...@@ -82,26 +100,32 @@ public:
return result; return result;
} }
void put(string key, string val) { void put(string key, string val)
{
mycache.PUT(key, val); mycache.PUT(key, val);
} }
void del(int *status, string key) { void del(int *status, string key)
{
int status2 = 200; int status2 = 200;
mycache.DEL(key, &status2); mycache.DEL(key, &status2);
*status = status2; *status = status2;
} }
void traverse() { void traverse()
{
mycache.traverse(); mycache.traverse();
} }
void pushAll() { void pushAll()
{
mycache.pushAll(); mycache.pushAll();
} }
string getKeyValuePairs(int id) { // key1;key2;key3;;value1;value2;value3;
return "keyvaluepairs"; string getKeyValuePairs(int id)
{
return mycache.getKeyValuePairs(id);
} }
}; };
\ No newline at end of file
#include <bits/stdc++.h> #include <bits/stdc++.h>
#include <dirent.h>
#include <fstream> #include <fstream>
#include "LFUNode.h" #include "LFUNode.h"
#define CACHE_FULL "Cache Full" #define CACHE_FULL "Cache Full"
...@@ -10,7 +11,8 @@ ...@@ -10,7 +11,8 @@
using namespace std; using namespace std;
class LFUCache { class LFUCache
{
private: private:
int capacity; int capacity;
LFUNode **cacheHeap; LFUNode **cacheHeap;
...@@ -18,29 +20,34 @@ private: ...@@ -18,29 +20,34 @@ private:
std::mutex mtx; std::mutex mtx;
string deleted; string deleted;
int parent(int i) { int parent(int i)
{
if (i % 2 == 0) if (i % 2 == 0)
return (i / 2) - 1; return (i / 2) - 1;
else else
return (i - 1) / 2; return (i - 1) / 2;
} }
int left_child(int i) { int left_child(int i)
{
return (2 * i) + 1; return (2 * i) + 1;
} }
int right_child(int i) { int right_child(int i)
{
return (2 * i) + 2; return (2 * i) + 2;
} }
int exists(string key) { int exists(string key)
{
for (int i = curr_pos; i >= 0; i--) for (int i = curr_pos; i >= 0; i--)
if (key.compare(cacheHeap[i]->key) == 0) if (key.compare(cacheHeap[i]->key) == 0)
return i; return i;
return -1; return -1;
} }
void swap(int i, int j) { void swap(int i, int j)
{
cacheHeap[i]->frequency = cacheHeap[i]->frequency + cacheHeap[j]->frequency; cacheHeap[i]->frequency = cacheHeap[i]->frequency + cacheHeap[j]->frequency;
cacheHeap[j]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency; cacheHeap[j]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency;
cacheHeap[i]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency; cacheHeap[i]->frequency = cacheHeap[i]->frequency - cacheHeap[j]->frequency;
...@@ -52,8 +59,10 @@ private: ...@@ -52,8 +59,10 @@ private:
cacheHeap[j]->value = temp; cacheHeap[j]->value = temp;
} }
void heapify_up(int i) { void heapify_up(int i)
while (true) { {
while (true)
{
if (i == 0) if (i == 0)
break; break;
if (cacheHeap[parent(i)]->frequency <= cacheHeap[i]->frequency) if (cacheHeap[parent(i)]->frequency <= cacheHeap[i]->frequency)
...@@ -64,17 +73,21 @@ private: ...@@ -64,17 +73,21 @@ private:
} }
} }
void heapify_down(int i) { void heapify_down(int i)
while (true) { {
while (true)
{
if (left_child(i) > curr_pos) if (left_child(i) > curr_pos)
break; break;
if (right_child(i) > curr_pos && cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency) if (right_child(i) > curr_pos && cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency)
break; break;
if (right_child(i) <= curr_pos) { if (right_child(i) <= curr_pos)
{
if (cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency && cacheHeap[right_child(i)]->frequency >= cacheHeap[i]->frequency) if (cacheHeap[left_child(i)]->frequency >= cacheHeap[i]->frequency && cacheHeap[right_child(i)]->frequency >= cacheHeap[i]->frequency)
break; break;
} }
if (right_child(i) <= curr_pos) { if (right_child(i) <= curr_pos)
{
if (cacheHeap[left_child(i)]->frequency < cacheHeap[right_child(i)]->frequency) if (cacheHeap[left_child(i)]->frequency < cacheHeap[right_child(i)]->frequency)
swap(i, left_child(i)); swap(i, left_child(i));
else else
...@@ -85,16 +98,19 @@ private: ...@@ -85,16 +98,19 @@ private:
} }
} }
string insert(string key, string value) { string insert(string key, string value)
{
int i = exists(key); int i = exists(key);
if (i != -1) { if (i != -1)
{
cacheHeap[i]->frequency++; cacheHeap[i]->frequency++;
cacheHeap[i]->value = value; cacheHeap[i]->value = value;
heapify_down(i); heapify_down(i);
return INSERT_SUCCESS; return INSERT_SUCCESS;
} }
curr_pos++; curr_pos++;
if (curr_pos == capacity) { if (curr_pos == capacity)
{
curr_pos--; curr_pos--;
delete_min(true); delete_min(true);
insert(key, value); insert(key, value);
...@@ -107,14 +123,16 @@ private: ...@@ -107,14 +123,16 @@ private:
return INSERT_SUCCESS; return INSERT_SUCCESS;
} }
string delete_min(bool keep) { string delete_min(bool keep)
{
if (curr_pos == -1) if (curr_pos == -1)
return CACHE_EMPTY; return CACHE_EMPTY;
swap(0, curr_pos); swap(0, curr_pos);
curr_pos--; curr_pos--;
if (curr_pos >= 0) if (curr_pos >= 0)
heapify_down(0); heapify_down(0);
if (keep) { if (keep)
{
string filename = getFilename(cacheHeap[curr_pos + 1]->key); string filename = getFilename(cacheHeap[curr_pos + 1]->key);
ofstream fout; ofstream fout;
fout.open(filename, ios::app); fout.open(filename, ios::app);
...@@ -125,7 +143,8 @@ private: ...@@ -125,7 +143,8 @@ private:
return DELETE_SUCCESS; return DELETE_SUCCESS;
} }
void insertAll(unordered_map<string, string> flush) { void insertAll(unordered_map<string, string> flush)
{
int to_empty = flush.size() - capacity + curr_pos + 1; int to_empty = flush.size() - capacity + curr_pos + 1;
for (int i = 0; i < to_empty; i++) for (int i = 0; i < to_empty; i++)
delete_min(true); delete_min(true);
...@@ -134,12 +153,14 @@ private: ...@@ -134,12 +153,14 @@ private:
insert(itr->first, itr->second); insert(itr->first, itr->second);
} }
string getNodeByKey(string key) { string getNodeByKey(string key)
{
string value; string value;
int i = exists(key); int i = exists(key);
if (i == -1) if (i == -1)
return KEY_NOT_FOUND; return KEY_NOT_FOUND;
else { else
{
cacheHeap[i]->frequency++; cacheHeap[i]->frequency++;
value = cacheHeap[i]->value; value = cacheHeap[i]->value;
heapify_down(i); heapify_down(i);
...@@ -147,11 +168,13 @@ private: ...@@ -147,11 +168,13 @@ private:
} }
} }
string deleteNodeByKey(string key) { string deleteNodeByKey(string key)
{
int i = exists(key); int i = exists(key);
if (i == -1) if (i == -1)
return KEY_NOT_FOUND; return KEY_NOT_FOUND;
else { else
{
cacheHeap[i]->frequency = 0; cacheHeap[i]->frequency = 0;
heapify_up(i); heapify_up(i);
delete_min(false); delete_min(false);
...@@ -165,7 +188,8 @@ private: ...@@ -165,7 +188,8 @@ private:
} }
} }
string getFilename(string key) { string getFilename(string key)
{
string filename = "."; string filename = ".";
int length = key.size(); int length = key.size();
if (length == 1) if (length == 1)
...@@ -177,34 +201,40 @@ private: ...@@ -177,34 +201,40 @@ private:
return filename; return filename;
} }
bool fileExists(string filename) { bool fileExists(string filename)
{
ifstream f(filename.c_str()); ifstream f(filename.c_str());
return f.good(); return f.good();
} }
public: public:
LFUCache() { LFUCache()
{
curr_pos = -1; curr_pos = -1;
deleted = ""; deleted = "";
deleted += char(0); deleted += char(0);
} }
void setCap(int capacity) { void setCap(int capacity)
{
this->capacity = capacity; this->capacity = capacity;
cacheHeap = (LFUNode **)malloc(sizeof(LFUNode *) * capacity); cacheHeap = (LFUNode **)malloc(sizeof(LFUNode *) * capacity);
for(int i = 0; i < capacity; i++) for (int i = 0; i < capacity; i++)
cacheHeap[i] = new LFUNode(deleted, deleted); cacheHeap[i] = new LFUNode(deleted, deleted);
} }
string GET(string key, int *status) { string GET(string key, int *status)
{
//std::lock_guard<std::mutex> guard(mtx); //std::lock_guard<std::mutex> guard(mtx);
string value = getNodeByKey(key); string value = getNodeByKey(key);
if (value.compare(KEY_NOT_FOUND) != 0) { if (value.compare(KEY_NOT_FOUND) != 0)
{
*status = 200; *status = 200;
return value; return value;
} }
string filename = getFilename(key); string filename = getFilename(key);
if (!fileExists(filename)) { if (!fileExists(filename))
{
*status = 400; *status = 400;
return KEY_NOT_FOUND; return KEY_NOT_FOUND;
} }
...@@ -212,7 +242,8 @@ public: ...@@ -212,7 +242,8 @@ public:
ifstream fin; ifstream fin;
unordered_map<string, string> flush; unordered_map<string, string> flush;
fin.open(filename); fin.open(filename);
do { do
{
getline(fin, key1); getline(fin, key1);
if (key1.size() == 0) if (key1.size() == 0)
break; break;
...@@ -237,33 +268,39 @@ public: ...@@ -237,33 +268,39 @@ public:
return value; return value;
} }
void PUT(string key, string value) { void PUT(string key, string value)
{
//mtx.lock(); //mtx.lock();
insert(key, value); insert(key, value);
//mtx.unlock(); //mtx.unlock();
} }
void DEL(string key, int *status) { void DEL(string key, int *status)
{
int status1; int status1;
string value = GET(key, &status1); string value = GET(key, &status1);
if (status1 == 400) { if (status1 == 400)
{
*status = 400; *status = 400;
return; return;
} }
deleteNodeByKey(key); deleteNodeByKey(key);
} }
void pushAll() { void pushAll()
{
while (curr_pos >= 0) while (curr_pos >= 0)
delete_min(true); delete_min(true);
} }
void reformat(string filename) { void reformat(string filename)
{
ifstream fin; ifstream fin;
fin.open(filename); fin.open(filename);
unordered_map<string, string> flush; unordered_map<string, string> flush;
string key, value; string key, value;
do { do
{
getline(fin, key); getline(fin, key);
if (key.size() == 0) if (key.size() == 0)
break; break;
...@@ -279,15 +316,86 @@ public: ...@@ -279,15 +316,86 @@ public:
ofstream fout; ofstream fout;
fout.open(filename); fout.open(filename);
unordered_map<string, string>::iterator itr; unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end(); itr++) { for (itr = flush.begin(); itr != flush.end(); itr++)
{
fout << itr->first << "\n"; fout << itr->first << "\n";
fout << itr->second << "\n"; fout << itr->second << "\n";
} }
} }
void traverse() { void traverse()
{
for (int i = 0; i <= curr_pos; i++) for (int i = 0; i <= curr_pos; i++)
cout << cacheHeap[i]->key << endl; cout << cacheHeap[i]->key << endl;
cout << endl; cout << endl;
} }
int hash(string s)
{
return (((int)s.at(0)) << 8) + ((int)s.at(1));
}
string getKeyValuePairs(int id)
{
unordered_map<string, string> flush;
string keyValPairs = "";
DIR *dirFile = opendir(".");
if (dirFile)
{
struct dirent *hFile;
while ((hFile = readdir(dirFile)) != NULL)
{
string fName(hFile->d_name);
string fileName(hFile->d_name);
int n;
if ((n = fileName.size()) < 8)
continue;
if (fileName[n - 3] == 'k' && fileName[n - 2] == 'v' && fileName[n - 1] == 'm')
{
fileName.erase(n - 4);
fileName = fileName.substr(2);
if (hash(fileName) <= id)
{
ifstream fin;
fin.open(fName);
string _key, val;
do
{
getline(fin, _key);
if (_key.size() == 0)
break;
getline(fin, val);
if (val != deleted)
flush[_key] = val;
else
flush.erase(_key);
if (fin.eof())
break;
} while (fin);
fin.close();
const char *c = fName.c_str();
remove(c);
}
}
}
closedir(dirFile);
}
for(int i=0;i<=curr_pos;i++)
if(hash(cacheHeap[i]->key)<=id)
flush[cacheHeap[i]->key]=cacheHeap[i]->value;
unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end(); itr++)
keyValPairs += itr->first + ";";
keyValPairs += ";";
for (itr = flush.begin(); itr != flush.end(); itr++)
keyValPairs += itr->second + ";";
return keyValPairs;
}
}; };
#include <bits/stdc++.h> #include <bits/stdc++.h>
#include "LRUNode.h" #include "LRUNode.h"
#include <dirent.h>
#include <sys/stat.h> #include <sys/stat.h>
#define NOT_EXIST "KEY NOT EXIST" #define NOT_EXIST "KEY NOT EXIST"
using namespace std; using namespace std;
inline bool fileExists(const std::string &name) { inline bool fileExists(const std::string &name)
{
struct stat buffer; struct stat buffer;
return (stat(name.c_str(), &buffer) == 0); return (stat(name.c_str(), &buffer) == 0);
} }
string getFilename(string key) { string getFilename(string key)
{
string filename = "."; string filename = ".";
int length = key.size(); int length = key.size();
if (length == 1) { if (length == 1)
{
return "_.kvm"; return "_.kvm";
} }
filename += char(length - 1); filename += char(length - 1);
...@@ -24,7 +28,8 @@ string getFilename(string key) { ...@@ -24,7 +28,8 @@ string getFilename(string key) {
return filename; return filename;
} }
class LRUcache { class LRUcache
{
private: private:
int capacity; int capacity;
string deleted; string deleted;
...@@ -34,7 +39,8 @@ private: ...@@ -34,7 +39,8 @@ private:
unordered_map<string, Node *> cache; unordered_map<string, Node *> cache;
public: public:
LRUcache() { LRUcache()
{
head = new Node("HEAD", "HEAD"); head = new Node("HEAD", "HEAD");
tail = new Node("TAIL", "TAIL"); tail = new Node("TAIL", "TAIL");
head->next = tail; head->next = tail;
...@@ -43,11 +49,13 @@ public: ...@@ -43,11 +49,13 @@ public:
deleted += char(0); deleted += char(0);
} }
void setCap(int capacity) { void setCap(int capacity)
{
this->capacity = capacity; this->capacity = capacity;
} }
LRUcache(int capacity) { LRUcache(int capacity)
{
this->capacity = capacity; this->capacity = capacity;
head = new Node("HEAD", "HEAD"); head = new Node("HEAD", "HEAD");
tail = new Node("TAIL", "TAIL"); tail = new Node("TAIL", "TAIL");
...@@ -57,7 +65,8 @@ public: ...@@ -57,7 +65,8 @@ public:
deleted += char(0); deleted += char(0);
} }
Node *getUtil(string key) { Node *getUtil(string key)
{
if (cache.find(key) == cache.end()) if (cache.find(key) == cache.end())
return NULL; return NULL;
Node *x = cache[key]; Node *x = cache[key];
...@@ -73,11 +82,14 @@ public: ...@@ -73,11 +82,14 @@ public:
return x; return x;
} }
string get(string key, int *status) { string get(string key, int *status)
{
//std::lock_guard<std::mutex> guard(mtx); //std::lock_guard<std::mutex> guard(mtx);
Node *x = getUtil(key); Node *x = getUtil(key);
if (x) { if (x)
if (x->payload == deleted) { {
if (x->payload == deleted)
{
*status = 400; *status = 400;
return NOT_EXIST; return NOT_EXIST;
} }
...@@ -86,7 +98,8 @@ public: ...@@ -86,7 +98,8 @@ public:
string fileName = getFilename(key); string fileName = getFilename(key);
if (!fileExists(fileName)) { if (!fileExists(fileName))
{
*status = 400; *status = 400;
return NOT_EXIST; return NOT_EXIST;
} }
...@@ -97,7 +110,8 @@ public: ...@@ -97,7 +110,8 @@ public:
string _key, val; string _key, val;
unordered_map<string, string> flush; unordered_map<string, string> flush;
do { do
{
getline(fin, _key); getline(fin, _key);
if (_key.size() == 0) if (_key.size() == 0)
break; break;
...@@ -113,12 +127,14 @@ public: ...@@ -113,12 +127,14 @@ public:
fin.close(); fin.close();
unordered_map<string, string>::iterator itr; unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end() && cache.size() < capacity - 1; itr++) { for (itr = flush.begin(); itr != flush.end() && cache.size() < capacity - 1; itr++)
{
this->put(itr->first, itr->second); this->put(itr->first, itr->second);
flush[itr->first] = deleted; flush[itr->first] = deleted;
} }
if (flush.find(key) != flush.end()) { if (flush.find(key) != flush.end())
{
this->put(key, flush[key]); this->put(key, flush[key]);
flush[key] = deleted; flush[key] = deleted;
} }
...@@ -128,7 +144,8 @@ public: ...@@ -128,7 +144,8 @@ public:
ofstream fout; ofstream fout;
fout.open(fileName); fout.open(fileName);
for (itr = flush.begin(); itr != flush.end(); itr++) { for (itr = flush.begin(); itr != flush.end(); itr++)
{
if (itr->second == deleted) if (itr->second == deleted)
continue; continue;
fout << itr->first << "\n"; fout << itr->first << "\n";
...@@ -136,7 +153,8 @@ public: ...@@ -136,7 +153,8 @@ public:
} }
fout.close(); fout.close();
x = getUtil(key); x = getUtil(key);
if (x) { if (x)
{
return x->payload; return x->payload;
} }
*status = 400; *status = 400;
...@@ -144,12 +162,14 @@ public: ...@@ -144,12 +162,14 @@ public:
return NOT_EXIST; return NOT_EXIST;
} }
void del(string key, int *status) { void del(string key, int *status)
{
int status2 = 200; int status2 = 200;
string result = get(key, &status2); string result = get(key, &status2);
// mtx.lock(); // mtx.lock();
if (status2 == 400) { if (status2 == 400)
{
*status = 400; *status = 400;
// mtx.unlock(); // mtx.unlock();
return; return;
...@@ -159,9 +179,11 @@ public: ...@@ -159,9 +179,11 @@ public:
// mtx.unlock(); // mtx.unlock();
} }
void pushAll() { void pushAll()
{
unordered_map<string, Node *>::iterator itr; unordered_map<string, Node *>::iterator itr;
for (itr = cache.begin(); itr != cache.end(); itr++) { for (itr = cache.begin(); itr != cache.end(); itr++)
{
string fileName = getFilename(itr->first); string fileName = getFilename(itr->first);
ofstream fout; ofstream fout;
...@@ -177,12 +199,15 @@ public: ...@@ -177,12 +199,15 @@ public:
tail->prev = head; tail->prev = head;
} }
void put(string key, string payload) { void put(string key, string payload)
{
//mtx.lock(); //mtx.lock();
if (getUtil(key) != NULL) if (getUtil(key) != NULL)
cache[key]->payload = payload; cache[key]->payload = payload;
else { else
if (cache.size() == capacity) { {
if (cache.size() == capacity)
{
Node *x = tail->prev; Node *x = tail->prev;
string keyToBeFlushed = x->key; string keyToBeFlushed = x->key;
cache.erase(keyToBeFlushed); cache.erase(keyToBeFlushed);
...@@ -206,7 +231,9 @@ public: ...@@ -206,7 +231,9 @@ public:
head->next = x; head->next = x;
x->prev = head; x->prev = head;
} else { }
else
{
Node *x = new Node(key, payload); Node *x = new Node(key, payload);
cache[key] = x; cache[key] = x;
x->next = head->next; x->next = head->next;
...@@ -219,12 +246,90 @@ public: ...@@ -219,12 +246,90 @@ public:
//mtx.unlock(); //mtx.unlock();
} }
void traverse() { void traverse()
{
Node *temp = head->next; Node *temp = head->next;
while (temp->next) { while (temp->next)
{
cout << temp->key << "\n"; cout << temp->key << "\n";
temp = temp->next; temp = temp->next;
} }
cout << "\n"; cout << "\n";
} }
int hash(string s)
{
return (((int)s.at(0)) << 8) + ((int)s.at(1));
}
// key1;key2;key3;;value1;value2;value3;
string getKeyValuePairs(int id)
{
unordered_map<string, string> flush;
string keyValPairs = "";
DIR *dirFile = opendir(".");
if (dirFile)
{
struct dirent *hFile;
while ((hFile = readdir(dirFile)) != NULL)
{
string fName(hFile->d_name);
string fileName(hFile->d_name);
int n;
if ((n = fileName.size()) < 8)
continue;
if (fileName[n - 3] == 'k' && fileName[n - 2] == 'v' && fileName[n - 1] == 'm')
{
fileName.erase(n - 4);
fileName = fileName.substr(2);
if (hash(fileName) <= id)
{
ifstream fin;
fin.open(fName);
string _key, val;
do
{
getline(fin, _key);
if (_key.size() == 0)
break;
getline(fin, val);
if (val != deleted)
flush[_key] = val;
else
flush.erase(_key);
if (fin.eof())
break;
} while (fin);
fin.close();
const char *c = fName.c_str();
remove(c);
}
}
}
closedir(dirFile);
}
Node *temp = head->next;
while (temp->next)
{
if (hash(temp->key) <= id)
{
flush[temp->key] = temp->payload;
}
temp = temp->next;
}
unordered_map<string, string>::iterator itr;
for (itr = flush.begin(); itr != flush.end(); itr++)
keyValPairs += itr->first + ";";
keyValPairs += ";";
for (itr = flush.begin(); itr != flush.end(); itr++)
keyValPairs += itr->second + ";";
return keyValPairs;
}
}; };
...@@ -160,6 +160,7 @@ void RunClient() { ...@@ -160,6 +160,7 @@ void RunClient() {
Info info; Info info;
ClientContext context; ClientContext context;
Status status=stub->GETADDRESS(&context,null,&info); Status status=stub->GETADDRESS(&context,null,&info);
std::cout<<"Server: "<<info.address()<<std::endl;
std::string target_address(info.address()); std::string target_address(info.address());
// Instantiates the client // Instantiates the client
KeyValueServicesClient client( KeyValueServicesClient client(
......
LISTENING_PORT=50051 LISTENING_PORT=50055
CACHE_REPLACEMENT_TYPE=LRU CACHE_REPLACEMENT_TYPE=LRU
CACHE_SIZE=256 CACHE_SIZE=256
NUM_SERVER_THREADS=4 NUM_SERVER_THREADS=4
...@@ -186,7 +186,13 @@ private: ...@@ -186,7 +186,13 @@ private:
RequestType reqType; RequestType reqType;
}; };
void signalHandler(int signum) {
remove(SERVERS);
exit(0);
}
int main(int argc,char **argv) { int main(int argc,char **argv) {
signal(SIGINT, signalHandler);
srand(time(0)); srand(time(0));
string server_address("0.0.0.0:1234"); string server_address("0.0.0.0:1234");
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
......
...@@ -104,13 +104,13 @@ public: ...@@ -104,13 +104,13 @@ public:
else if (status == PROCESS) { else if (status == PROCESS) {
new ServerData(service, cq, reqType); new ServerData(service, cq, reqType);
if (reqType == NEW) { if (reqType == NEW) {
//cout<<"New Server to join:"<<info.address()<<endl; cout<<"New Server to join:"<<info.address()<<endl;
//calculate id of node, return it's successor and predecessor //calculate id of node, return it's successor and predecessor
string address=info.address(); string address=info.address();
int id=stoi(address.substr(address.find(':')+1)); int id=stoi(address.substr(address.find(':')+1));
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl; cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0; int nums=0;
do { do {
...@@ -123,6 +123,7 @@ public: ...@@ -123,6 +123,7 @@ public:
fin.close(); fin.close();
int node=-1; int node=-1;
int next=-1; int next=-1;
bool fl=false;
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<id&&my_id>=id) { if(nums>0&&fingers[nums-1]<id&&my_id>=id) {
node=fingers[nums-1]; node=fingers[nums-1];
...@@ -139,12 +140,13 @@ public: ...@@ -139,12 +140,13 @@ public:
next=fingers[i]; next=fingers[i];
break; break;
} }
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>id) { else if(i>0&&fingers[i]<fingers[i-1]) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
break; break;
} }
else if(i==nums-1) { else if(i==nums-1) {
fl=true;
node=fingers[i]; node=fingers[i];
next=my_id; next=my_id;
break; break;
...@@ -160,66 +162,185 @@ public: ...@@ -160,66 +162,185 @@ public:
Id x; Id x;
x.set_id(next); x.set_id(next);
Id y; Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl; cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred; int mypred,nextpred;
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
if(next!=my_id) { if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y); stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id(); nextpred=y.id();
} }
else { if(fl==false&&next!=my_id&&nextpred==node) {
fin.open(NEIGHBOURS); cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
string temp; cout<<"Successor: "<<x.id()<<endl;
getline(fin,temp); cout<<"Predecessor: "<<nextpred<<endl;
getline(fin,temp); successorInfo.set_succaddress("0.0.0.0:"+to_string(x.id()));
fin.close(); successorInfo.set_predaddress("0.0.0.0:"+to_string(nextpred));
mypred=stoi(temp.substr(temp.find(':')+1));
} }
if(mypred==node) { else if(fl==false&&next==my_id&&mypred==node) {
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl; cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl; cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl; cout<<"Predecessor: "<<mypred<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(x.id())); successorInfo.set_succaddress("0.0.0.0:"+to_string(x.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(mypred)); successorInfo.set_predaddress("0.0.0.0:"+to_string(mypred));
} }
else if(fl==false){
cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
if(node==my_id) {
int mysucc;
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mysucc=stoi(temp.substr(temp.find(':')+1));
if(mysucc>=id) {
successorInfo.set_succaddress("0.0.0.0:"+to_string(mysucc));
successorInfo.set_predaddress("0.0.0.0:"+to_string(my_id));
}
else {
string tar_address("0.0.0.0:"+to_string(mysucc));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
Id z;
string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z);
cout<<"Yes. We got the predecessor"<<endl;
cout<<"Successor: "<<y.id()<<endl;
cout<<"Predecessor: "<<z.id()<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
Id z;
string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z);
cout<<"Yes. We got the predecessor"<<endl;
cout<<"Successor: "<<y.id()<<endl;
cout<<"Predecessor: "<<z.id()<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
}
}
else { else {
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl; if(node>next) {
string tar_address("0.0.0.0:"+to_string(node)); fin.open(NEIGHBOURS);
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); string xyz;
stub=KeyValueServices::NewStub(channel); getline(fin,xyz);
ClientContext context1; getline(fin,xyz);
x.set_id(id); fin.close();
stub->GETSUCCESSOR(&context1,x,&y); int xi=stoi(xyz.substr(xyz.find(':')+1));
//cout<<"Yes. We got the successor"<<endl; if(xi==node) {
Id z; successorInfo.set_succaddress("0.0.0.0:"+to_string(next));
string t_address("0.0.0.0:"+to_string(y.id())); successorInfo.set_predaddress("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); }
stub=KeyValueServices::NewStub(channel); else {
ClientContext context2; string tar_address("0.0.0.0:"+to_string(node));
stub->GETPREDECESSOR(&context2,y,&z); channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
//cout<<"Yes. We got the predecessor"<<endl; stub=KeyValueServices::NewStub(channel);
//cout<<"Successor: "<<y.id()<<endl; ClientContext context1;
//cout<<"Predecessor: "<<z.id()<<endl; x.set_id(id);
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id())); stub->GETSUCCESSOR(&context1,x,&y);
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id())); cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
Id z;
string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z);
cout<<"Yes. We got the predecessor"<<endl;
cout<<"Successor: "<<y.id()<<endl;
cout<<"Predecessor: "<<z.id()<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
}
else {
successorInfo.set_succaddress("0.0.0.0:"+to_string(next));
successorInfo.set_predaddress("0.0.0.0:"+to_string(node));
}
}
}
else {
ifstream fin1;
fin1.open(NEIGHBOURS);
string t1,t2;
getline(fin1,t1);
getline(fin1,t2);
fin1.close();
int t1i=stoi(t1.substr(t1.find(':')+1));
if(t1==t2||t1i==node) {
successorInfo.set_succaddress("0.0.0.0:"+to_string(node));
successorInfo.set_predaddress("0.0.0.0:"+to_string(next));
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
Id z;
string t_address("0.0.0.0:"+to_string(y.id()));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context2;
stub->GETPREDECESSOR(&context2,y,&z);
cout<<"Yes. We got the predecessor"<<endl;
cout<<"Successor: "<<y.id()<<endl;
cout<<"Predecessor: "<<z.id()<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(y.id()));
successorInfo.set_predaddress("0.0.0.0:"+to_string(z.id()));
}
else {
successorInfo.set_succaddress("0.0.0.0:"+to_string(next));
successorInfo.set_predaddress("0.0.0.0:"+to_string(node));
}
}
}
} }
} }
else { else {
//cout<<"We got no node with id greater than new node"<<endl; cout<<"We got no node with id greater than new node"<<endl;
//cout<<"Only one node present right now, that is me"<<endl; cout<<"Only one node present right now, that is me"<<endl;
successorInfo.set_succaddress("0.0.0.0:"+to_string(my_id)); successorInfo.set_succaddress("0.0.0.0:"+to_string(my_id));
successorInfo.set_predaddress("0.0.0.0:"+to_string(my_id)); successorInfo.set_predaddress("0.0.0.0:"+to_string(my_id));
} }
//cout<<"Sending the successor and predecessor back to the new server"<<endl; cout<<"Sending the successor and predecessor back to the new server"<<endl;
newResponder.Finish(successorInfo,Status::OK,this); newResponder.Finish(successorInfo,Status::OK,this);
} }
else if(reqType==INFORMSUCCESSOR){ else if(reqType==INFORMSUCCESSOR){
//return half of the keyvalue pairs to the requesting node //return half of the keyvalue pairs to the requesting node
string address=info.address(); string address=info.address();
int id=stoi(address.substr(address.find(':')+1)); int id=stoi(address.substr(address.find(':')+1));
string keyvalues=memManager->getKeyValuePairs(id); string keyvalues=memManager->getKeyValuePairs(id,stoi(params["LISTENING_PORT"]));
string keys=keyvalues.substr(0,keyvalues.find(";;")+1); string keys=keyvalues.substr(0,keyvalues.find(";;")+1);
string values=keyvalues.substr(keyvalues.find(";;")+2); string values=keyvalues.substr(keyvalues.find(";;")+2);
//cout<<"Okay, my new predecessor is: "<<info.address()<<endl; cout<<"Okay, my new predecessor is: "<<info.address()<<endl;
ifstream fin; ifstream fin;
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string successor,predecessor; string successor,predecessor;
...@@ -234,15 +355,15 @@ public: ...@@ -234,15 +355,15 @@ public:
fout.close(); fout.close();
keyValues.set_keys(keys); keyValues.set_keys(keys);
keyValues.set_values(values); keyValues.set_values(values);
//cout<<"Done making changes accordingly"<<endl; cout<<"Done making changes accordingly"<<endl;
informSuccessorResponder.Finish(keyValues,Status::OK,this); informSuccessorResponder.Finish(keyValues,Status::OK,this);
} }
else if(reqType==GETSUCCESSOR) { else if(reqType==GETSUCCESSOR) {
//cout<<"Some server asked me to find the successor of "<<idvar1.id()<<endl; cout<<"Some server asked me to find the successor of "<<idvar1.id()<<endl;
int idtofind=idvar1.id(); int id=idvar1.id();
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting the finger table"<<endl; cout<<"Getting the finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0; int nums=0;
for(int i=0;i<16;i++) { for(int i=0;i<16;i++) {
...@@ -254,64 +375,154 @@ public: ...@@ -254,64 +375,154 @@ public:
} }
fin.close(); fin.close();
int node=-1; int node=-1;
bool fl=false;
int next=-1; int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<idtofind&&my_id>=idtofind) { if(nums>0&&fingers[nums-1]<id&&my_id>=id) {
node=fingers[nums-1]; node=fingers[nums-1];
next=my_id; next=my_id;
} }
else if(nums>0&&my_id<idtofind&&fingers[0]>=idtofind) { else if(nums>0&&my_id<id&&fingers[0]>=id) {
node=my_id; node=my_id;
next=fingers[0]; next=fingers[0];
} }
else { else {
for(int i=0;i<nums;i++) { for(int i=0;i<nums;i++) {
if(i>0&&fingers[i-1]<idtofind&&fingers[i]>=idtofind) { if(i>0&&fingers[i-1]<id&&fingers[i]>=id) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
break; break;
} }
else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>idtofind) { else if(i>0&&fingers[i]<fingers[i-1]&&fingers[i]>id) {
node=fingers[i-1]; node=fingers[i-1];
next=fingers[i]; next=fingers[i];
break; break;
} }
else if(i==nums-1) { else if(i==nums-1) {
fl=true;
node=fingers[i]; node=fingers[i];
next=my_id; next=my_id;
break; break;
} }
} }
} }
string target_address("0.0.0.0:"+to_string(next)); if(next!=-1) {
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); string target_address("0.0.0.0:"+to_string(next));
unique_ptr<KeyValueServices::Stub> stub; shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub;
ClientContext context;
Id x;
x.set_id(next);
Id y;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
stub->GETPREDECESSOR(&context,x,&y);
if(y.id()==node) {
//cout<<"Yes it is. We found the successor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
idvar2.set_id(x.id());
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
ClientContext context1; ClientContext context;
x.set_id(idtofind); Id x;
//cout<<"No it is not. Asking the possible predecessor to find its successor"<<endl; x.set_id(next);
stub->GETSUCCESSOR(&context1,x,&y); Id y;
idvar2.set_id(y.id()); cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int mypred,nextpred;
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mypred=stoi(temp.substr(temp.find(':')+1));
if(next!=my_id) {
stub->GETPREDECESSOR(&context,x,&y);
nextpred=y.id();
}
if(fl==false&&next!=my_id&&nextpred==node)
idvar2.set_id(x.id());
else if(fl==false&&next==my_id&&mypred==node)
idvar2.set_id(x.id());
else if(fl==false){
if(node==my_id) {
int mysucc;
fin.open(NEIGHBOURS);
string temp;
getline(fin,temp);
getline(fin,temp);
fin.close();
mysucc=stoi(temp.substr(temp.find(':')+1));
if(mysucc>=id)
idvar2.set_id(mysucc);
else {
string tar_address("0.0.0.0:"+to_string(mysucc));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
idvar2.set_id(y.id());
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
idvar2.set_id(y.id());
}
}
else {
if(node>next) {
fin.open(NEIGHBOURS);
string xyz;
getline(fin,xyz);
getline(fin,xyz);
fin.close();
int xi=stoi(xyz.substr(xyz.find(':')+1));;
if(xi==node) {
idvar2.set_id(next);
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
idvar2.set_id(y.id());
}
else {
idvar2.set_id(next);
}
}
}
else {
ifstream fin1;
fin1.open(NEIGHBOURS);
string t1,t2;
getline(fin1,t1);
getline(fin1,t2);
fin1.close();
int t1i=stoi(t1.substr(t1.find(':')+1));
if(t1==t2||t1i==node) {
cout<<"Here"<<endl;
idvar2.set_id(node);
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
idvar2.set_id(y.id());
}
else {
idvar2.set_id(next);
}
}
}
}
} }
getSuccessorResponder.Finish(idvar2,Status::OK,this); getSuccessorResponder.Finish(idvar2,Status::OK,this);
} }
else if(reqType==GETPREDECESSOR) { else if(reqType==GETPREDECESSOR) {
//cout<<"Someone asked me for my predecessor. Sending them"<<endl; cout<<"Someone asked me for my predecessor. Sending them"<<endl;
ifstream fin; ifstream fin;
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string successor,predecessor; string successor,predecessor;
...@@ -319,12 +530,12 @@ public: ...@@ -319,12 +530,12 @@ public:
getline(fin,predecessor); getline(fin,predecessor);
fin.close(); fin.close();
idvar2.set_id(stoi(predecessor.substr(predecessor.find(':')+1))); idvar2.set_id(stoi(predecessor.substr(predecessor.find(':')+1)));
//cout<<"Sent my predecessor"<<endl; cout<<"Sent my predecessor"<<endl;
getPredecessorResponder.Finish(idvar2,Status::OK,this); getPredecessorResponder.Finish(idvar2,Status::OK,this);
} }
else if(reqType==INFORMPREDECESSOR) { else if(reqType==INFORMPREDECESSOR) {
//cout<<"Okay, i got the information that my successor has changed"<<endl; cout<<"Okay, i got the information that my successor has changed"<<endl;
//cout<<"My new successor: "<<info.address()<<endl; cout<<"My new successor: "<<info.address()<<endl;
ifstream fin; ifstream fin;
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string successor,predecessor; string successor,predecessor;
...@@ -338,7 +549,7 @@ public: ...@@ -338,7 +549,7 @@ public:
fout<<predecessor<<endl; fout<<predecessor<<endl;
fout.close(); fout.close();
null.set_nothing(0); null.set_nothing(0);
//cout<<"Okay, i made the necessary changes"<<endl; cout<<"Okay, i made the necessary changes"<<endl;
informPredecessorResponder.Finish(null,Status::OK,this); informPredecessorResponder.Finish(null,Status::OK,this);
} }
else { else {
...@@ -463,7 +674,7 @@ public: ...@@ -463,7 +674,7 @@ public:
//transfer request //transfer request
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl; cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0; int nums=0;
do { do {
...@@ -475,6 +686,7 @@ public: ...@@ -475,6 +686,7 @@ public:
}while(fin); }while(fin);
fin.close(); fin.close();
int node=-1; int node=-1;
bool fl=false;
int next=-1; int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) { if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
...@@ -498,54 +710,128 @@ public: ...@@ -498,54 +710,128 @@ public:
break; break;
} }
else if(i==nums-1) { else if(i==nums-1) {
fl=true;
node=fingers[i]; node=fingers[i];
next=my_id; next=my_id;
break; break;
} }
} }
} }
string target_address("0.0.0.0:"+to_string(next)); if(next!=-1) {
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); string target_address("0.0.0.0:"+to_string(next));
unique_ptr<KeyValueServices::Stub> stub; shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub;
ClientContext context; stub=KeyValueServices::NewStub(channel);
Id x; ClientContext context;
x.set_id(next); Id x;
Id y; x.set_id(next);
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl; Id y;
int mypred; cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
if(next!=my_id) { int mypred,nextpred;
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string temp; string temp;
getline(fin,temp); getline(fin,temp);
getline(fin,temp); getline(fin,temp);
fin.close(); fin.close();
mypred=stoi(temp.substr(temp.find(':')+1)); mypred=stoi(temp.substr(temp.find(':')+1));
} if(next!=my_id) {
if(mypred==node) { stub->GETPREDECESSOR(&context,x,&y);
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl; nextpred=y.id();
//cout<<"Successor: "<<x.id()<<endl; }
//cout<<"Predecessor: "<<mypred<<endl; if(fl==false&&next!=my_id&&nextpred==node)
succ=x.id(); succ=x.id();
} else if(fl==false&&next==my_id&&mypred==node)
else { succ=x.id();
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl; else if(fl==false){
string tar_address("0.0.0.0:"+to_string(node)); if(node==my_id) {
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); int mysucc;
stub=KeyValueServices::NewStub(channel); fin.open(NEIGHBOURS);
ClientContext context1; string temp;
x.set_id(key_id); getline(fin,temp);
stub->GETSUCCESSOR(&context1,x,&y); getline(fin,temp);
//cout<<"Yes. We got the successor"<<endl; fin.close();
succ=x.id(); mysucc=stoi(temp.substr(temp.find(':')+1));
if(mysucc>=key_id)
succ=mysucc;
else {
string tar_address("0.0.0.0:"+to_string(mysucc));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
if(node>next) {
fin.open(NEIGHBOURS);
string xyz;
getline(fin,xyz);
getline(fin,xyz);
fin.close();
int xi=stoi(xyz.substr(xyz.find(':')+1));
if(xi==node) {
succ=next;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
else {
ifstream fin1;
fin1.open(NEIGHBOURS);
string t1,t2;
getline(fin1,t1);
getline(fin1,t2);
fin1.close();
int t1i=stoi(t1.substr(t1.find(':')+1));
if(t1==t2||t1i==node) {
succ=node;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
}
} }
string t_address("0.0.0.0:"+to_string(succ)); string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext cont1; ClientContext cont1;
stub->GET(&cont1,key,&value); stub->GET(&cont1,key,&value);
} }
...@@ -586,7 +872,7 @@ public: ...@@ -586,7 +872,7 @@ public:
//transfer the request //transfer the request
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl; cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0; int nums=0;
do { do {
...@@ -598,6 +884,7 @@ public: ...@@ -598,6 +884,7 @@ public:
}while(fin); }while(fin);
fin.close(); fin.close();
int node=-1; int node=-1;
bool fl=false;
int next=-1; int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) { if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
...@@ -622,53 +909,127 @@ public: ...@@ -622,53 +909,127 @@ public:
} }
else if(i==nums-1) { else if(i==nums-1) {
node=fingers[i]; node=fingers[i];
fl=true;
next=my_id; next=my_id;
break; break;
} }
} }
} }
string target_address("0.0.0.0:"+to_string(next)); if(next!=-1) {
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); string target_address("0.0.0.0:"+to_string(next));
unique_ptr<KeyValueServices::Stub> stub; shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub;
ClientContext context; stub=KeyValueServices::NewStub(channel);
Id x; ClientContext context;
x.set_id(next); Id x;
Id y; x.set_id(next);
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl; Id y;
int mypred; cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
if(next!=my_id) { int mypred,nextpred;
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string temp; string temp;
getline(fin,temp); getline(fin,temp);
getline(fin,temp); getline(fin,temp);
fin.close(); fin.close();
mypred=stoi(temp.substr(temp.find(':')+1)); mypred=stoi(temp.substr(temp.find(':')+1));
} if(next!=my_id) {
if(mypred==node) { stub->GETPREDECESSOR(&context,x,&y);
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl; nextpred=y.id();
//cout<<"Successor: "<<x.id()<<endl; }
//cout<<"Predecessor: "<<mypred<<endl; if(fl==false&&next!=my_id&&nextpred==node)
succ=x.id(); succ=x.id();
} else if(fl==false&&next==my_id&&mypred==node)
else { succ=x.id();
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl; else if(fl==false){
string tar_address("0.0.0.0:"+to_string(node)); if(node==my_id) {
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); int mysucc;
stub=KeyValueServices::NewStub(channel); fin.open(NEIGHBOURS);
ClientContext context1; string temp;
x.set_id(key_id); getline(fin,temp);
stub->GETSUCCESSOR(&context1,x,&y); getline(fin,temp);
//cout<<"Yes. We got the successor"<<endl; fin.close();
succ=x.id(); mysucc=stoi(temp.substr(temp.find(':')+1));
if(mysucc>=key_id)
succ=mysucc;
else {
string tar_address("0.0.0.0:"+to_string(mysucc));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
if(node>next) {
fin.open(NEIGHBOURS);
string xyz;
getline(fin,xyz);
getline(fin,xyz);
fin.close();
int xi=stoi(xyz.substr(xyz.find(':')+1));
if(xi==node) {
succ=next;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
else {
ifstream fin1;
fin1.open(NEIGHBOURS);
string t1,t2;
getline(fin1,t1);
getline(fin1,t2);
fin1.close();
int t1i=stoi(t1.substr(t1.find(':')+1));
if(t1==t2||t1i==node) {
succ=node;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
}
} }
string t_address("0.0.0.0:"+to_string(succ)); string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext cont1; ClientContext cont1;
stub->GET(&cont1,key,&value); stub->GET(&cont1,key,&value);
} }
...@@ -680,20 +1041,24 @@ public: ...@@ -680,20 +1041,24 @@ public:
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
ifstream fin; ifstream fin;
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string pred; string pred,scc;
getline(fin,pred); getline(fin,scc);
getline(fin,pred); getline(fin,pred);
fin.close(); fin.close();
int pred_id; int pred_id,succ_id;
if(pred=="-1") if(pred=="-1")
pred_id=-1; pred_id=-1;
else else
pred_id=stoi(pred.substr(pred.find(':')+1)); pred_id=stoi(pred.substr(pred.find(':')+1));
if(scc=="-1")
succ_id=-1;
else
succ_id=stoi(scc.substr(scc.find(':')+1));
if(my_id<key_id&&!(pred_id<key_id&&pred_id>my_id)) { if(my_id<key_id&&!(pred_id<key_id&&pred_id>my_id)) {
//transfer request //transfer request
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl; cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0; int nums=0;
do { do {
...@@ -705,6 +1070,7 @@ public: ...@@ -705,6 +1070,7 @@ public:
}while(fin); }while(fin);
fin.close(); fin.close();
int node=-1; int node=-1;
bool fl=false;
int next=-1; int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) { if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
...@@ -729,53 +1095,127 @@ public: ...@@ -729,53 +1095,127 @@ public:
} }
else if(i==nums-1) { else if(i==nums-1) {
node=fingers[i]; node=fingers[i];
fl=true;
next=my_id; next=my_id;
break; break;
} }
} }
} }
string target_address("0.0.0.0:"+to_string(next)); if(next!=-1) {
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); string target_address("0.0.0.0:"+to_string(next));
unique_ptr<KeyValueServices::Stub> stub; shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub;
ClientContext context; stub=KeyValueServices::NewStub(channel);
Id x; ClientContext context;
x.set_id(next); Id x;
Id y; x.set_id(next);
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl; Id y;
int mypred; cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
if(next!=my_id) { int mypred,nextpred;
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string temp; string temp;
getline(fin,temp); getline(fin,temp);
getline(fin,temp); getline(fin,temp);
fin.close(); fin.close();
mypred=stoi(temp.substr(temp.find(':')+1)); mypred=stoi(temp.substr(temp.find(':')+1));
} if(next!=my_id) {
if(mypred==node) { stub->GETPREDECESSOR(&context,x,&y);
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl; nextpred=y.id();
//cout<<"Successor: "<<x.id()<<endl; }
//cout<<"Predecessor: "<<mypred<<endl; if(fl==false&&next!=my_id&&nextpred==node)
succ=x.id(); succ=x.id();
} else if(fl==false&&next==my_id&&mypred==node)
else { succ=x.id();
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl; else if(fl==false){
string tar_address("0.0.0.0:"+to_string(node)); if(node==my_id) {
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); int mysucc;
stub=KeyValueServices::NewStub(channel); fin.open(NEIGHBOURS);
ClientContext context1; string temp;
x.set_id(key_id); getline(fin,temp);
stub->GETSUCCESSOR(&context1,x,&y); getline(fin,temp);
//cout<<"Yes. We got the successor"<<endl; fin.close();
succ=x.id(); mysucc=stoi(temp.substr(temp.find(':')+1));
if(mysucc>=key_id)
succ=mysucc;
else {
string tar_address("0.0.0.0:"+to_string(mysucc));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
if(node>next) {
fin.open(NEIGHBOURS);
string xyz;
getline(fin,xyz);
getline(fin,xyz);
fin.close();
int xi=stoi(xyz.substr(xyz.find(':')+1));
if(xi==node) {
succ=next;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
else {
ifstream fin1;
fin1.open(NEIGHBOURS);
string t1,t2;
getline(fin1,t1);
getline(fin1,t2);
fin1.close();
int t1i=stoi(t1.substr(t1.find(':')+1));
if(t1==t2||t1i==node) {
succ=node;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
}
} }
string t_address("0.0.0.0:"+to_string(succ)); string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext cont1; ClientContext cont1;
stub->PUT(&cont1,keyvalue,&stat); stub->PUT(&cont1,keyvalue,&stat);
} }
...@@ -790,7 +1230,7 @@ public: ...@@ -790,7 +1230,7 @@ public:
stat.set_status(200); stat.set_status(200);
} }
else { else {
if(pred_id==-1||pred_id<key_id) { if(pred_id==-1||(pred_id<key_id&&my_id>=key_id)||(pred_id>my_id&&my_id>=key_id)) {
cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl; cout << "SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : " << keyvalue.key() << " & VALUE : " << keyvalue.value() << endl;
...@@ -804,7 +1244,7 @@ public: ...@@ -804,7 +1244,7 @@ public:
//transfer the request //transfer the request
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl; cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0; int nums=0;
do { do {
...@@ -816,6 +1256,7 @@ public: ...@@ -816,6 +1256,7 @@ public:
}while(fin); }while(fin);
fin.close(); fin.close();
int node=-1; int node=-1;
bool fl=false;
int next=-1; int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) { if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
...@@ -840,53 +1281,127 @@ public: ...@@ -840,53 +1281,127 @@ public:
} }
else if(i==nums-1) { else if(i==nums-1) {
node=fingers[i]; node=fingers[i];
fl=true;
next=my_id; next=my_id;
break; break;
} }
} }
} }
string target_address("0.0.0.0:"+to_string(next)); if(next!=-1) {
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); string target_address("0.0.0.0:"+to_string(next));
unique_ptr<KeyValueServices::Stub> stub; shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub;
ClientContext context; stub=KeyValueServices::NewStub(channel);
Id x; ClientContext context;
x.set_id(next); Id x;
Id y; x.set_id(next);
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl; Id y;
int mypred; cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
if(next!=my_id) { int mypred,nextpred;
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string temp; string temp;
getline(fin,temp); getline(fin,temp);
getline(fin,temp); getline(fin,temp);
fin.close(); fin.close();
mypred=stoi(temp.substr(temp.find(':')+1)); mypred=stoi(temp.substr(temp.find(':')+1));
} if(next!=my_id) {
if(mypred==node) { stub->GETPREDECESSOR(&context,x,&y);
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl; nextpred=y.id();
//cout<<"Successor: "<<x.id()<<endl; }
//cout<<"Predecessor: "<<mypred<<endl; if(fl==false&&next!=my_id&&nextpred==node)
succ=x.id(); succ=x.id();
} else if(fl==false&&next==my_id&&mypred==node)
else { succ=x.id();
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl; else if(fl==false){
string tar_address("0.0.0.0:"+to_string(node)); if(node==my_id) {
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); int mysucc;
stub=KeyValueServices::NewStub(channel); fin.open(NEIGHBOURS);
ClientContext context1; string temp;
x.set_id(key_id); getline(fin,temp);
stub->GETSUCCESSOR(&context1,x,&y); getline(fin,temp);
//cout<<"Yes. We got the successor"<<endl; fin.close();
succ=x.id(); mysucc=stoi(temp.substr(temp.find(':')+1));
if(mysucc>=key_id)
succ=mysucc;
else {
string tar_address("0.0.0.0:"+to_string(mysucc));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
if(node>next) {
fin.open(NEIGHBOURS);
string xyz;
getline(fin,xyz);
getline(fin,xyz);
fin.close();
int xi=stoi(xyz.substr(xyz.find(':')+1));
if(xi==node) {
succ=next;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
else {
ifstream fin1;
fin1.open(NEIGHBOURS);
string t1,t2;
getline(fin1,t1);
getline(fin1,t2);
fin1.close();
int t1i=stoi(t1.substr(t1.find(':')+1));
if(t1==t2||t1i==node) {
succ=node;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
}
} }
string t_address("0.0.0.0:"+to_string(succ)); string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext cont1; ClientContext cont1;
stub->PUT(&cont1,keyvalue,&stat); stub->PUT(&cont1,keyvalue,&stat);
} }
...@@ -911,7 +1426,7 @@ public: ...@@ -911,7 +1426,7 @@ public:
//transfer request //transfer request
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl; cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0; int nums=0;
do { do {
...@@ -923,6 +1438,7 @@ public: ...@@ -923,6 +1438,7 @@ public:
}while(fin); }while(fin);
fin.close(); fin.close();
int node=-1; int node=-1;
bool fl=false;
int next=-1; int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) { if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
...@@ -947,53 +1463,127 @@ public: ...@@ -947,53 +1463,127 @@ public:
} }
else if(i==nums-1) { else if(i==nums-1) {
node=fingers[i]; node=fingers[i];
fl=true;
next=my_id; next=my_id;
break; break;
} }
} }
} }
string target_address("0.0.0.0:"+to_string(next)); if(next!=-1) {
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); string target_address("0.0.0.0:"+to_string(next));
unique_ptr<KeyValueServices::Stub> stub; shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub;
ClientContext context; stub=KeyValueServices::NewStub(channel);
Id x; ClientContext context;
x.set_id(next); Id x;
Id y; x.set_id(next);
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl; Id y;
int mypred; cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
if(next!=my_id) { int mypred,nextpred;
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string temp; string temp;
getline(fin,temp); getline(fin,temp);
getline(fin,temp); getline(fin,temp);
fin.close(); fin.close();
mypred=stoi(temp.substr(temp.find(':')+1)); mypred=stoi(temp.substr(temp.find(':')+1));
} if(next!=my_id) {
if(mypred==node) { stub->GETPREDECESSOR(&context,x,&y);
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl; nextpred=y.id();
//cout<<"Successor: "<<x.id()<<endl; }
//cout<<"Predecessor: "<<mypred<<endl; if(fl==false&&next!=my_id&&nextpred==node)
succ=x.id(); succ=x.id();
} else if(fl==false&&next==my_id&&mypred==node)
else { succ=x.id();
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl; else if(fl==false){
string tar_address("0.0.0.0:"+to_string(node)); if(node==my_id) {
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); int mysucc;
stub=KeyValueServices::NewStub(channel); fin.open(NEIGHBOURS);
ClientContext context1; string temp;
x.set_id(key_id); getline(fin,temp);
stub->GETSUCCESSOR(&context1,x,&y); getline(fin,temp);
//cout<<"Yes. We got the successor"<<endl; fin.close();
succ=x.id(); mysucc=stoi(temp.substr(temp.find(':')+1));
if(mysucc>=key_id)
succ=mysucc;
else {
string tar_address("0.0.0.0:"+to_string(mysucc));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
if(node>next) {
fin.open(NEIGHBOURS);
string xyz;
getline(fin,xyz);
getline(fin,xyz);
fin.close();
int xi=stoi(xyz.substr(xyz.find(':')+1));
if(xi==node) {
succ=next;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
else {
ifstream fin1;
fin1.open(NEIGHBOURS);
string t1,t2;
getline(fin1,t1);
getline(fin1,t2);
fin1.close();
int t1i=stoi(t1.substr(t1.find(':')+1));
if(t1==t2||t1i==node) {
succ=node;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
}
} }
string t_address("0.0.0.0:"+to_string(succ)); string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext cont1; ClientContext cont1;
stub->DEL(&cont1,key,&stat); stub->DEL(&cont1,key,&stat);
} }
...@@ -1033,7 +1623,7 @@ public: ...@@ -1033,7 +1623,7 @@ public:
else { else {
int fingers[16]; int fingers[16];
ifstream fin; ifstream fin;
//cout<<"Getting my finger table"<<endl; cout<<"Getting my finger table"<<endl;
fin.open(FINGER_TABLE); fin.open(FINGER_TABLE);
int nums=0; int nums=0;
do { do {
...@@ -1045,6 +1635,7 @@ public: ...@@ -1045,6 +1635,7 @@ public:
}while(fin); }while(fin);
fin.close(); fin.close();
int node=-1; int node=-1;
bool fl=false;
int next=-1; int next=-1;
int my_id=stoi(params.find("LISTENING_PORT")->second); int my_id=stoi(params.find("LISTENING_PORT")->second);
if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) { if(nums>0&&fingers[nums-1]<key_id&&my_id>=key_id) {
...@@ -1069,53 +1660,127 @@ public: ...@@ -1069,53 +1660,127 @@ public:
} }
else if(i==nums-1) { else if(i==nums-1) {
node=fingers[i]; node=fingers[i];
fl=true;
next=my_id; next=my_id;
break; break;
} }
} }
} }
string target_address("0.0.0.0:"+to_string(next)); if(next!=-1) {
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); string target_address("0.0.0.0:"+to_string(next));
unique_ptr<KeyValueServices::Stub> stub; shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub;
ClientContext context; stub=KeyValueServices::NewStub(channel);
Id x; ClientContext context;
x.set_id(next); Id x;
Id y; x.set_id(next);
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl; Id y;
int mypred; cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
if(next!=my_id) { int mypred,nextpred;
stub->GETPREDECESSOR(&context,x,&y);
mypred=y.id();
}
else {
fin.open(NEIGHBOURS); fin.open(NEIGHBOURS);
string temp; string temp;
getline(fin,temp); getline(fin,temp);
getline(fin,temp); getline(fin,temp);
fin.close(); fin.close();
mypred=stoi(temp.substr(temp.find(':')+1)); mypred=stoi(temp.substr(temp.find(':')+1));
} if(next!=my_id) {
if(mypred==node) { stub->GETPREDECESSOR(&context,x,&y);
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl; nextpred=y.id();
//cout<<"Successor: "<<x.id()<<endl; }
//cout<<"Predecessor: "<<mypred<<endl; if(fl==false&&next!=my_id&&nextpred==node)
succ=x.id(); succ=x.id();
} else if(fl==false&&next==my_id&&mypred==node)
else { succ=x.id();
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl; else if(fl==false){
string tar_address("0.0.0.0:"+to_string(node)); if(node==my_id) {
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials()); int mysucc;
stub=KeyValueServices::NewStub(channel); fin.open(NEIGHBOURS);
ClientContext context1; string temp;
x.set_id(key_id); getline(fin,temp);
stub->GETSUCCESSOR(&context1,x,&y); getline(fin,temp);
//cout<<"Yes. We got the successor"<<endl; fin.close();
succ=x.id(); mysucc=stoi(temp.substr(temp.find(':')+1));
if(mysucc>=key_id)
succ=mysucc;
else {
string tar_address("0.0.0.0:"+to_string(mysucc));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
succ=y.id();
}
}
else {
if(node>next) {
fin.open(NEIGHBOURS);
string xyz;
getline(fin,xyz);
getline(fin,xyz);
fin.close();
int xi=stoi(xyz.substr(xyz.find(':')+1));
if(xi==node) {
succ=next;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
else {
ifstream fin1;
fin1.open(NEIGHBOURS);
string t1,t2;
getline(fin1,t1);
getline(fin1,t2);
fin1.close();
int t1i=stoi(t1.substr(t1.find(':')+1));
if(t1==t2||t1i==node) {
succ=node;
}
else {
string tar_address("0.0.0.0:"+to_string(node));
channel=grpc::CreateChannel(tar_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel);
ClientContext context1;
x.set_id(key_id);
stub->GETSUCCESSOR(&context1,x,&y);
cout<<"Yes. We got the successor"<<endl;
if(y.id()!=my_id) {
succ=y.id();
}
else {
succ=next;
}
}
}
}
} }
string t_address("0.0.0.0:"+to_string(succ)); string t_address("0.0.0.0:"+to_string(succ));
channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(t_address, grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); unique_ptr<KeyValueServices::Stub> stub=KeyValueServices::NewStub(channel);
ClientContext cont1; ClientContext cont1;
stub->DEL(&cont1,key,&stat); stub->DEL(&cont1,key,&stat);
} }
...@@ -1282,7 +1947,7 @@ void updateAllFingerTables() { ...@@ -1282,7 +1947,7 @@ void updateAllFingerTables() {
} }
void register_server_DNS(string my_address) { void register_server_DNS(string my_address) {
//cout<<"Registering to DNS"<<endl; cout<<"Registering to DNS"<<endl;
string target_address(DNS_SERVER); string target_address(DNS_SERVER);
shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials()); shared_ptr<Channel> channel=grpc::CreateChannel(target_address, grpc::InsecureChannelCredentials());
unique_ptr<KeyValueServices::Stub> stub; unique_ptr<KeyValueServices::Stub> stub;
...@@ -1292,29 +1957,29 @@ void register_server_DNS(string my_address) { ...@@ -1292,29 +1957,29 @@ void register_server_DNS(string my_address) {
Info info; Info info;
ClientContext context; ClientContext context;
Status status=stub->GETADDRESS(&context,null,&info); Status status=stub->GETADDRESS(&context,null,&info);
//cout<<"Address received:"<<info.address()<<endl; cout<<"Address received:"<<info.address()<<endl;
string old_server; string old_server;
if(status.ok()) { if(status.ok()) {
old_server=info.address(); old_server=info.address();
info.set_address(my_address); info.set_address(my_address);
ClientContext new_context; ClientContext new_context;
//cout<<"Adding address to DNS"<<endl; cout<<"Adding address to DNS"<<endl;
stub->ADDADDRESS(&new_context,info,&null); stub->ADDADDRESS(&new_context,info,&null);
//cout<<"Address added to DNS"<<endl; cout<<"Address added to DNS"<<endl;
ofstream fout; ofstream fout;
//cout<<"Generating initial finger table"<<endl; cout<<"Generating initial finger table"<<endl;
fout.open(FINGER_TABLE); fout.open(FINGER_TABLE);
for(int i=0;i<16;i++) for(int i=0;i<16;i++)
fout<<"null"<<endl; fout<<"null"<<endl;
fout.close(); fout.close();
//cout<<"Initial finger table generated"<<endl; cout<<"Initial finger table generated"<<endl;
if(old_server=="null") { if(old_server=="null") {
//cout<<"Initializing initial neighbours"<<endl; cout<<"Initializing initial neighbours"<<endl;
fout.open(NEIGHBOURS); fout.open(NEIGHBOURS);
fout<<"-1"<<endl; fout<<"-1"<<endl;
fout<<"-1"<<endl; fout<<"-1"<<endl;
fout.close(); fout.close();
//cout<<"Initialized initial neighbours"<<endl; cout<<"Initialized initial neighbours"<<endl;
return; return;
} }
channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(old_server,grpc::InsecureChannelCredentials());
...@@ -1322,27 +1987,27 @@ void register_server_DNS(string my_address) { ...@@ -1322,27 +1987,27 @@ void register_server_DNS(string my_address) {
info.set_address(my_address); info.set_address(my_address);
SuccessorInfo successorInfo; SuccessorInfo successorInfo;
ClientContext context1; ClientContext context1;
//cout<<"Sending request to server: "<<old_server<<endl; cout<<"Sending request to server: "<<old_server<<endl;
status=stub->NEW(&context1,info,&successorInfo); status=stub->NEW(&context1,info,&successorInfo);
//cout<<"Request sent. Successor and predecessor info received"<<endl; cout<<"Request sent. Successor and predecessor info received"<<endl;
if(status.ok()) { if(status.ok()) {
string successor=successorInfo.succaddress(); string successor=successorInfo.succaddress();
string predecessor=successorInfo.predaddress(); string predecessor=successorInfo.predaddress();
ofstream fout; ofstream fout;
//cout<<"Storing neighbours info"<<endl; cout<<"Storing neighbours info"<<endl;
fout.open(NEIGHBOURS); fout.open(NEIGHBOURS);
fout<<successor<<endl; fout<<successor<<endl;
fout<<predecessor<<endl; fout<<predecessor<<endl;
fout.close(); fout.close();
//cout<<"Successor: "<<successor<<endl; cout<<"Successor: "<<successor<<endl;
//cout<<"Predecessor: "<<predecessor<<endl; cout<<"Predecessor: "<<predecessor<<endl;
//cout<<"Stored neighbours info"<<endl; cout<<"Stored neighbours info"<<endl;
channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(successor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
info.set_address(my_address); info.set_address(my_address);
KeyValues keyValues; KeyValues keyValues;
ClientContext context2; ClientContext context2;
//cout<<"Informing successor about my presence"<<endl; cout<<"Informing successor about my presence"<<endl;
status=stub->INFORMSUCCESSOR(&context2,info,&keyValues); status=stub->INFORMSUCCESSOR(&context2,info,&keyValues);
string keys=keyValues.keys(); string keys=keyValues.keys();
string values=keyValues.values(); string values=keyValues.values();
...@@ -1357,14 +2022,14 @@ void register_server_DNS(string my_address) { ...@@ -1357,14 +2022,14 @@ void register_server_DNS(string my_address) {
values=values.substr(values.find(';')+1); values=values.substr(values.find(';')+1);
memManager->put(key,value); memManager->put(key,value);
} }
//cout<<"Informed succesor"<<endl; cout<<"Informed succesor"<<endl;
channel=grpc::CreateChannel(predecessor,grpc::InsecureChannelCredentials()); channel=grpc::CreateChannel(predecessor,grpc::InsecureChannelCredentials());
stub=KeyValueServices::NewStub(channel); stub=KeyValueServices::NewStub(channel);
info.set_address(my_address); info.set_address(my_address);
ClientContext context3; ClientContext context3;
//cout<<"Informing predecessor about my presence"<<endl; cout<<"Informing predecessor about my presence"<<endl;
status=stub->INFORMPREDECESSOR(&context3,info,&null); status=stub->INFORMPREDECESSOR(&context3,info,&null);
//cout<<"Informed predecessor"<<endl; cout<<"Informed predecessor"<<endl;
updateAllFingerTables(); updateAllFingerTables();
} }
} }
...@@ -1391,7 +2056,7 @@ int main(int agrc, char **argv) { ...@@ -1391,7 +2056,7 @@ int main(int agrc, char **argv) {
setupServer(server_address); setupServer(server_address);
assignThreads(num_threads); assignThreads(num_threads);
sleep(1); sleep(2);
signal(SIGINT, signalHandler); signal(SIGINT, signalHandler);
server = builder.BuildAndStart(); server = builder.BuildAndStart();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment