Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
C
CS744 DECS-PA4-KEYVALUE-SERVER
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Kamal Khodabhai
CS744 DECS-PA4-KEYVALUE-SERVER
Commits
5e582cf1
Commit
5e582cf1
authored
Nov 20, 2021
by
Vishal Saha
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
mayank's contribution
parent
40295e4f
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
1525 additions
and
49 deletions
+1525
-49
Backend.h
Backend.h
+53
-20
CMakeLists.txt
CMakeLists.txt
+1
-1
client.cpp
client.cpp
+11
-1
dns.cpp
dns.cpp
+208
-0
keyvaluestore.proto
keyvaluestore.proto
+37
-1
server.cpp
server.cpp
+1215
-26
No files found.
Backend.h
View file @
5e582cf1
...
@@ -3,33 +3,46 @@
...
@@ -3,33 +3,46 @@
#include "LFU.h"
#include "LFU.h"
using
namespace
std
;
using
namespace
std
;
class
memoryManagement
{
class
memoryManagement
{
public:
public:
virtual
string
get
(
int
*
a
,
string
s
)
{
virtual
string
get
(
int
*
a
,
string
s
)
{
return
"This will never run"
;
return
"This will never run"
;
}
}
virtual
void
put
(
string
a
,
string
b
)
{
virtual
void
put
(
string
a
,
string
b
)
{
return
;
return
;
}
}
virtual
void
del
(
int
*
a
,
string
s
)
{
virtual
void
del
(
int
*
a
,
string
s
)
{
return
;
return
;
}
}
virtual
void
traverse
()
{
virtual
void
traverse
()
{
return
;
return
;
}
}
virtual
void
pushAll
()
{
virtual
void
pushAll
()
{
return
;
return
;
}
}
virtual
string
getKeyValuePairs
(
int
id
)
{
return
"This will never run"
;
}
};
};
class
storageLRU
:
public
memoryManagement
{
class
storageLRU
:
public
memoryManagement
{
public:
public:
LRUcache
mycache
;
LRUcache
mycache
;
storageLRU
(
int
capacity
)
{
storageLRU
(
int
capacity
)
{
mycache
.
setCap
(
capacity
);
mycache
.
setCap
(
capacity
);
}
}
string
get
(
int
*
status
,
string
key
)
{
string
get
(
int
*
status
,
string
key
)
{
string
result
=
""
;
string
result
=
""
;
int
status2
=
200
;
int
status2
=
200
;
...
@@ -39,33 +52,45 @@ public:
...
@@ -39,33 +52,45 @@ public:
return
result
;
return
result
;
}
}
void
put
(
string
key
,
string
val
)
{
void
put
(
string
key
,
string
val
)
{
mycache
.
put
(
key
,
val
);
mycache
.
put
(
key
,
val
);
}
}
void
del
(
int
*
status
,
string
key
)
{
void
del
(
int
*
status
,
string
key
)
{
int
status2
=
200
;
int
status2
=
200
;
mycache
.
del
(
key
,
&
status2
);
mycache
.
del
(
key
,
&
status2
);
*
status
=
status2
;
*
status
=
status2
;
}
}
void
traverse
()
{
void
traverse
()
{
mycache
.
traverse
();
mycache
.
traverse
();
}
}
void
pushAll
()
{
void
pushAll
()
{
mycache
.
pushAll
();
mycache
.
pushAll
();
}
}
string
getKeyValuePairs
(
int
id
)
{
return
"keyvaluepairs"
;
}
};
};
class
storageLFU
:
public
memoryManagement
{
class
storageLFU
:
public
memoryManagement
{
public:
public:
LFUCache
mycache
;
LFUCache
mycache
;
storageLFU
(
int
capacity
)
{
storageLFU
(
int
capacity
)
{
mycache
.
setCap
(
capacity
);
mycache
.
setCap
(
capacity
);
}
}
string
get
(
int
*
status
,
string
key
)
{
string
get
(
int
*
status
,
string
key
)
{
string
result
=
""
;
string
result
=
""
;
int
status2
=
200
;
int
status2
=
200
;
...
@@ -75,22 +100,30 @@ public:
...
@@ -75,22 +100,30 @@ public:
return
result
;
return
result
;
}
}
void
put
(
string
key
,
string
val
)
{
void
put
(
string
key
,
string
val
)
{
mycache
.
PUT
(
key
,
val
);
mycache
.
PUT
(
key
,
val
);
}
}
void
del
(
int
*
status
,
string
key
)
{
void
del
(
int
*
status
,
string
key
)
{
int
status2
=
200
;
int
status2
=
200
;
mycache
.
DEL
(
key
,
&
status2
);
mycache
.
DEL
(
key
,
&
status2
);
*
status
=
status2
;
*
status
=
status2
;
}
}
void
traverse
()
{
void
traverse
()
{
mycache
.
traverse
();
mycache
.
traverse
();
}
}
void
pushAll
()
{
void
pushAll
()
{
mycache
.
pushAll
();
mycache
.
pushAll
();
}
}
string
getKeyValuePairs
(
int
id
)
{
}
};
};
\ No newline at end of file
CMakeLists.txt
View file @
5e582cf1
...
@@ -42,7 +42,7 @@ include_directories("${CMAKE_CURRENT_BINARY_DIR}")
...
@@ -42,7 +42,7 @@ include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# Targets (client|server)
# Targets (client|server)
foreach
(
_target
foreach
(
_target
client client_test server
)
client client_test server
dns
)
add_executable
(
${
_target
}
"
${
_target
}
.cpp"
add_executable
(
${
_target
}
"
${
_target
}
.cpp"
${
hw_proto_srcs
}
${
hw_proto_srcs
}
${
hw_grpc_srcs
}
)
${
hw_grpc_srcs
}
)
...
...
client.cpp
View file @
5e582cf1
...
@@ -13,6 +13,8 @@ using keyvaluestore::Value;
...
@@ -13,6 +13,8 @@ using keyvaluestore::Value;
using
keyvaluestore
::
KeyValue
;
using
keyvaluestore
::
KeyValue
;
using
keyvaluestore
::
ReqStatus
;
using
keyvaluestore
::
ReqStatus
;
using
keyvaluestore
::
KeyValueServices
;
using
keyvaluestore
::
KeyValueServices
;
using
keyvaluestore
::
Info
;
using
keyvaluestore
::
Null
;
std
::
map
<
std
::
string
,
std
::
string
>
params
;
std
::
map
<
std
::
string
,
std
::
string
>
params
;
std
::
string
config_filename
=
"../config"
;
std
::
string
config_filename
=
"../config"
;
...
@@ -150,7 +152,15 @@ void parse(std::string& str, std::string& cmd, std::string& key, std::string& va
...
@@ -150,7 +152,15 @@ void parse(std::string& str, std::string& cmd, std::string& key, std::string& va
}
}
void
RunClient
()
{
void
RunClient
()
{
std
::
string
target_address
(
"0.0.0.0:"
+
params
.
find
(
"LISTENING_PORT"
)
->
second
);
std
::
string
dns_address
(
"0.0.0.0:1234"
);
std
::
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
dns_address
,
grpc
::
InsecureChannelCredentials
());
std
::
unique_ptr
<
KeyValueServices
::
Stub
>
stub
=
KeyValueServices
::
NewStub
(
channel
);
Null
null
;
null
.
set_nothing
(
0
);
Info
info
;
ClientContext
context
;
Status
status
=
stub
->
GETADDRESS
(
&
context
,
null
,
&
info
);
std
::
string
target_address
(
info
.
address
());
// Instantiates the client
// Instantiates the client
KeyValueServicesClient
client
(
KeyValueServicesClient
client
(
// Channel from which RPCs are made - endpoint is the target_address
// Channel from which RPCs are made - endpoint is the target_address
...
...
dns.cpp
0 → 100644
View file @
5e582cf1
#include <bits/stdc++.h>
#include <grpcpp/grpcpp.h>
#include<fstream>
#include "keyvaluestore.grpc.pb.h"
#define SERVERS "serverlist.txt"
using
namespace
std
;
using
grpc
::
Server
;
using
grpc
::
ServerAsyncResponseWriter
;
using
grpc
::
ServerBuilder
;
using
grpc
::
ServerCompletionQueue
;
using
grpc
::
ServerContext
;
using
grpc
::
Status
;
using
grpc
::
Channel
;
using
grpc
::
ClientContext
;
using
keyvaluestore
::
KeyValueServices
;
using
keyvaluestore
::
Info
;
using
keyvaluestore
::
Null
;
using
keyvaluestore
::
Addresses
;
ServerBuilder
builder
;
KeyValueServices
::
AsyncService
service
;
std
::
unique_ptr
<
Server
>
server
;
enum
RequestType
{
GETADDRESS
,
ADDADDRESS
,
UPDATEFINGERTABLES
,
GETSERVERS
};
class
DNSData
{
public:
DNSData
(
KeyValueServices
::
AsyncService
*
service
,
ServerCompletionQueue
*
cq
,
RequestType
reqType
)
:
service
(
service
),
cq
(
cq
),
getAddressResponder
(
&
context
),
addAddressResponder
(
&
context
),
updateFingerTablesResponder
(
&
context
),
getServersResponder
(
&
context
),
status
(
CREATE
),
reqType
(
reqType
)
{
Proceed
();
}
void
Proceed
()
{
if
(
status
==
CREATE
)
{
status
=
PROCESS
;
if
(
reqType
==
GETADDRESS
)
service
->
RequestGETADDRESS
(
&
context
,
&
null
,
&
getAddressResponder
,
cq
,
cq
,
this
);
else
if
(
reqType
==
ADDADDRESS
)
service
->
RequestADDADDRESS
(
&
context
,
&
info
,
&
addAddressResponder
,
cq
,
cq
,
this
);
else
if
(
reqType
==
UPDATEFINGERTABLES
)
service
->
RequestUPDATEFINGERTABLES
(
&
context
,
&
null
,
&
updateFingerTablesResponder
,
cq
,
cq
,
this
);
else
service
->
RequestGETSERVERS
(
&
context
,
&
null
,
&
getServersResponder
,
cq
,
cq
,
this
);
}
else
if
(
status
==
PROCESS
)
{
new
DNSData
(
service
,
cq
,
reqType
);
if
(
reqType
==
GETADDRESS
)
{
ifstream
fin
;
int
size
=
0
;
map
<
int
,
string
>
servers
;
fin
.
open
(
SERVERS
);
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
.
size
()
==
0
)
break
;
servers
[
size
++
]
=
temp
;
}
while
(
fin
);
fin
.
close
();
if
(
size
==
0
)
info
.
set_address
(
"null"
);
else
{
int
x
=
rand
()
%
size
;
info
.
set_address
(
servers
.
find
(
x
)
->
second
);
}
getAddressResponder
.
Finish
(
info
,
Status
::
OK
,
this
);
}
else
if
(
reqType
==
ADDADDRESS
){
ifstream
fin
;
fin
.
open
(
SERVERS
);
int
size
=
0
;
map
<
int
,
string
>
svs
;
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
.
size
()
==
0
)
break
;
svs
[
size
++
]
=
temp
;
}
while
(
fin
);
fin
.
close
();
string
addresses
[
size
+
1
];
int
count
=
0
;
string
addtoadd
=
info
.
address
();
int
porttoadd
=
stoi
(
addtoadd
.
substr
(
addtoadd
.
find
(
':'
)
+
1
));
bool
flag
=
false
;
for
(
int
i
=
0
;
i
<
size
;
i
++
)
{
int
curr_port
=
stoi
(
svs
[
i
].
substr
(
svs
[
i
].
find
(
':'
)
+
1
));
if
(
porttoadd
<
curr_port
&&
flag
==
false
)
{
addresses
[
count
++
]
=
addtoadd
;
flag
=
true
;
}
addresses
[
count
++
]
=
svs
[
i
];
}
if
(
flag
==
false
)
addresses
[
count
++
]
=
addtoadd
;
ofstream
fout
;
fout
.
open
(
SERVERS
);
for
(
int
i
=
0
;
i
<
count
;
i
++
)
fout
<<
addresses
[
i
]
<<
endl
;
fout
.
close
();
null
.
set_nothing
(
0
);
cout
<<
info
.
address
()
<<
endl
;
addAddressResponder
.
Finish
(
null
,
Status
::
OK
,
this
);
}
else
if
(
reqType
==
UPDATEFINGERTABLES
){
ifstream
fin
;
int
index
=
0
;
map
<
int
,
string
>
servers
;
fin
.
open
(
SERVERS
);
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
.
size
()
==
0
)
break
;
servers
[
index
++
]
=
temp
;
}
while
(
fin
);
fin
.
close
();
string
addressarr
=
""
;
for
(
int
i
=
0
;
i
<
index
;
i
++
)
addressarr
+=
servers
[
i
]
+
";"
;
for
(
int
i
=
0
;
i
<
index
;
i
++
)
{
string
target_address
(
servers
[
i
]);
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
Null
null
;
ClientContext
cont
;
Addresses
addr
;
addr
.
set_addresses
(
addressarr
);
addr
.
set_servers
(
index
);
stub
->
UPDATETABLE
(
&
cont
,
addr
,
&
null
);
}
updateFingerTablesResponder
.
Finish
(
null
,
Status
::
OK
,
this
);
}
else
{
ifstream
fin
;
int
index
=
0
;
map
<
int
,
string
>
servers
;
fin
.
open
(
SERVERS
);
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
.
size
()
==
0
)
break
;
servers
[
index
++
]
=
temp
;
}
while
(
fin
);
fin
.
close
();
string
addressarr
=
""
;
for
(
int
i
=
0
;
i
<
index
;
i
++
)
addressarr
+=
servers
[
i
]
+
";"
;
addr1
.
set_addresses
(
addressarr
);
addr1
.
set_servers
(
index
);
getServersResponder
.
Finish
(
addr1
,
Status
::
OK
,
this
);
}
status
=
FINISH
;
}
else
{
GPR_ASSERT
(
status
==
FINISH
);
delete
this
;
}
}
private:
KeyValueServices
::
AsyncService
*
service
;
ServerCompletionQueue
*
cq
;
ServerContext
context
;
Null
null
;
Info
info
;
Addresses
addr1
;
ServerAsyncResponseWriter
<
Info
>
getAddressResponder
;
ServerAsyncResponseWriter
<
Null
>
addAddressResponder
;
ServerAsyncResponseWriter
<
Null
>
updateFingerTablesResponder
;
ServerAsyncResponseWriter
<
Addresses
>
getServersResponder
;
enum
CallStatus
{
CREATE
,
PROCESS
,
FINISH
};
CallStatus
status
;
RequestType
reqType
;
};
int
main
(
int
argc
,
char
**
argv
)
{
srand
(
time
(
0
));
string
server_address
(
"0.0.0.0:1234"
);
builder
.
AddListeningPort
(
server_address
,
grpc
::
InsecureServerCredentials
());
builder
.
RegisterService
(
&
service
);
unique_ptr
<
ServerCompletionQueue
>
comp_queue
=
builder
.
AddCompletionQueue
();
server
=
builder
.
BuildAndStart
();
cout
<<
"DNS SERVER COMES UP SUCCESSFULLY"
<<
endl
;
new
DNSData
(
&
service
,
comp_queue
.
get
(),
GETADDRESS
);
new
DNSData
(
&
service
,
comp_queue
.
get
(),
ADDADDRESS
);
new
DNSData
(
&
service
,
comp_queue
.
get
(),
UPDATEFINGERTABLES
);
new
DNSData
(
&
service
,
comp_queue
.
get
(),
GETSERVERS
);
void
*
tag
;
bool
ok
;
while
(
true
)
{
GPR_ASSERT
(
comp_queue
->
Next
(
&
tag
,
&
ok
));
GPR_ASSERT
(
ok
);
static_cast
<
DNSData
*>
(
tag
)
->
Proceed
();
}
}
\ No newline at end of file
keyvaluestore.proto
View file @
5e582cf1
...
@@ -8,9 +8,18 @@ service KeyValueServices {
...
@@ -8,9 +8,18 @@ service KeyValueServices {
rpc
GET
(
Key
)
returns
(
Value
)
{}
rpc
GET
(
Key
)
returns
(
Value
)
{}
rpc
PUT
(
KeyValue
)
returns
(
ReqStatus
)
{}
rpc
PUT
(
KeyValue
)
returns
(
ReqStatus
)
{}
rpc
DEL
(
Key
)
returns
(
ReqStatus
)
{}
rpc
DEL
(
Key
)
returns
(
ReqStatus
)
{}
rpc
NEW
(
Info
)
returns
(
SuccessorInfo
)
{}
rpc
INFORMSUCCESSOR
(
Info
)
returns
(
KeyValues
)
{}
rpc
INFORMPREDECESSOR
(
Info
)
returns
(
Null
)
{}
rpc
GETADDRESS
(
Null
)
returns
(
Info
)
{}
rpc
ADDADDRESS
(
Info
)
returns
(
Null
)
{}
rpc
UPDATEFINGERTABLES
(
Null
)
returns
(
Null
)
{}
rpc
GETSUCCESSOR
(
Id
)
returns
(
Id
)
{}
rpc
GETPREDECESSOR
(
Id
)
returns
(
Id
)
{}
rpc
UPDATETABLE
(
Addresses
)
returns
(
Null
)
{}
rpc
GETSERVERS
(
Null
)
returns
(
Addresses
)
{}
}
}
message
Key
{
message
Key
{
string
key
=
1
;
string
key
=
1
;
}
}
...
@@ -30,3 +39,30 @@ message ReqStatus {
...
@@ -30,3 +39,30 @@ message ReqStatus {
int32
status
=
1
;
int32
status
=
1
;
string
error
=
2
;
string
error
=
2
;
}
}
message
Info
{
string
address
=
1
;
}
message
SuccessorInfo
{
string
succaddress
=
1
;
string
predaddress
=
2
;
}
message
KeyValues
{
string
keys
=
1
;
string
values
=
2
;
}
message
Null
{
int32
nothing
=
1
;
}
message
Id
{
int64
id
=
1
;
}
message
Addresses
{
string
addresses
=
1
;
int64
servers
=
2
;
}
\ No newline at end of file
server.cpp
View file @
5e582cf1
#include <bits/stdc++.h>
#include <bits/stdc++.h>
#include <pthread.h>
#include <pthread.h>
#include <fstream>
#include <grpcpp/grpcpp.h>
#include <grpcpp/grpcpp.h>
#include "keyvaluestore.grpc.pb.h"
#include "keyvaluestore.grpc.pb.h"
#include "../Backend.h"
#include "../Backend.h"
#define NEIGHBOURS "neighbours.txt"
#define FINGER_TABLE "fingertable.txt"
#define DNS_SERVER "0.0.0.0:1234"
using
namespace
std
;
using
namespace
std
;
...
@@ -11,12 +15,20 @@ using grpc::ServerAsyncResponseWriter;
...
@@ -11,12 +15,20 @@ using grpc::ServerAsyncResponseWriter;
using
grpc
::
ServerBuilder
;
using
grpc
::
ServerBuilder
;
using
grpc
::
ServerCompletionQueue
;
using
grpc
::
ServerCompletionQueue
;
using
grpc
::
ServerContext
;
using
grpc
::
ServerContext
;
using
grpc
::
Channel
;
using
grpc
::
ClientContext
;
using
grpc
::
Status
;
using
grpc
::
Status
;
using
keyvaluestore
::
Key
;
using
keyvaluestore
::
Key
;
using
keyvaluestore
::
KeyValue
;
using
keyvaluestore
::
KeyValue
;
using
keyvaluestore
::
KeyValueServices
;
using
keyvaluestore
::
KeyValueServices
;
using
keyvaluestore
::
ReqStatus
;
using
keyvaluestore
::
ReqStatus
;
using
keyvaluestore
::
Value
;
using
keyvaluestore
::
Value
;
using
keyvaluestore
::
Info
;
using
keyvaluestore
::
SuccessorInfo
;
using
keyvaluestore
::
KeyValues
;
using
keyvaluestore
::
Null
;
using
keyvaluestore
::
Id
;
using
keyvaluestore
::
Addresses
;
pthread_mutex_t
_masterLock
;
pthread_mutex_t
_masterLock
;
...
@@ -26,6 +38,15 @@ enum RequestType {
...
@@ -26,6 +38,15 @@ enum RequestType {
DEL
DEL
};
};
enum
ServerRequest
{
NEW
,
INFORMSUCCESSOR
,
INFORMPREDECESSOR
,
GETSUCCESSOR
,
GETPREDECESSOR
,
UPDATETABLE
};
map
<
string
,
string
>
params
;
map
<
string
,
string
>
params
;
string
config_filename
=
"../config"
;
string
config_filename
=
"../config"
;
string
log_file
=
"../log"
;
string
log_file
=
"../log"
;
...
@@ -37,6 +58,9 @@ pthread_t *workers;
...
@@ -37,6 +58,9 @@ pthread_t *workers;
int
*
worker_id
;
int
*
worker_id
;
pthread_cond_t
startRpcs
;
pthread_cond_t
startRpcs
;
pthread_mutex_t
myLock
;
pthread_mutex_t
myLock
;
pthread_t
dist_worker
;
int
dist_worker_id
;
bool
start
;
bool
start
;
memoryManagement
*
memManager
;
memoryManagement
*
memManager
;
...
@@ -55,6 +79,354 @@ void getConfig() {
...
@@ -55,6 +79,354 @@ void getConfig() {
config
.
close
();
config
.
close
();
}
}
class
ServerData
{
public:
ServerData
(
KeyValueServices
::
AsyncService
*
service
,
ServerCompletionQueue
*
cq
,
ServerRequest
reqType
)
:
service
(
service
),
cq
(
cq
),
newResponder
(
&
context
),
informSuccessorResponder
(
&
context
),
informPredecessorResponder
(
&
context
),
getSuccessorResponder
(
&
context
),
getPredecessorResponder
(
&
context
),
updateTableResponder
(
&
context
),
status
(
CREATE
),
reqType
(
reqType
)
{
Proceed
();
}
void
Proceed
()
{
if
(
status
==
CREATE
)
{
status
=
PROCESS
;
if
(
reqType
==
NEW
)
service
->
RequestNEW
(
&
context
,
&
info
,
&
newResponder
,
cq
,
cq
,
this
);
else
if
(
reqType
==
INFORMSUCCESSOR
)
service
->
RequestINFORMSUCCESSOR
(
&
context
,
&
info
,
&
informSuccessorResponder
,
cq
,
cq
,
this
);
else
if
(
reqType
==
INFORMPREDECESSOR
)
service
->
RequestINFORMPREDECESSOR
(
&
context
,
&
info
,
&
informPredecessorResponder
,
cq
,
cq
,
this
);
else
if
(
reqType
==
GETSUCCESSOR
)
service
->
RequestGETSUCCESSOR
(
&
context
,
&
idvar1
,
&
getSuccessorResponder
,
cq
,
cq
,
this
);
else
if
(
reqType
==
GETPREDECESSOR
)
service
->
RequestGETPREDECESSOR
(
&
context
,
&
idvar1
,
&
getPredecessorResponder
,
cq
,
cq
,
this
);
else
service
->
RequestUPDATETABLE
(
&
context
,
&
addressarr
,
&
updateTableResponder
,
cq
,
cq
,
this
);
}
else
if
(
status
==
PROCESS
)
{
new
ServerData
(
service
,
cq
,
reqType
);
if
(
reqType
==
NEW
)
{
//cout<<"New Server to join:"<<info.address()<<endl;
//calculate id of node, return it's successor and predecessor
string
address
=
info
.
address
();
int
id
=
stoi
(
address
.
substr
(
address
.
find
(
':'
)
+
1
));
int
fingers
[
16
];
ifstream
fin
;
//cout<<"Getting my finger table"<<endl;
fin
.
open
(
FINGER_TABLE
);
int
nums
=
0
;
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
==
"null"
||
temp
.
size
()
==
0
)
break
;
fingers
[
nums
++
]
=
stoi
(
temp
);
}
while
(
fin
);
fin
.
close
();
int
node
=-
1
;
int
next
=-
1
;
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
if
(
nums
>
0
&&
fingers
[
nums
-
1
]
<
id
&&
my_id
>=
id
)
{
node
=
fingers
[
nums
-
1
];
next
=
my_id
;
}
else
if
(
nums
>
0
&&
my_id
<
id
&&
fingers
[
0
]
>=
id
)
{
node
=
my_id
;
next
=
fingers
[
0
];
}
else
{
for
(
int
i
=
0
;
i
<
nums
;
i
++
)
{
if
(
i
>
0
&&
fingers
[
i
-
1
]
<
id
&&
fingers
[
i
]
>=
id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
>
0
&&
fingers
[
i
]
<
fingers
[
i
-
1
]
&&
fingers
[
i
]
>
id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
==
nums
-
1
)
{
node
=
fingers
[
i
];
next
=
my_id
;
break
;
}
}
}
if
(
next
!=-
1
)
{
string
target_address
(
"0.0.0.0:"
+
to_string
(
next
));
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context
;
Id
x
;
x
.
set_id
(
next
);
Id
y
;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int
mypred
;
if
(
next
!=
my_id
)
{
stub
->
GETPREDECESSOR
(
&
context
,
x
,
&
y
);
mypred
=
y
.
id
();
}
else
{
fin
.
open
(
NEIGHBOURS
);
string
temp
;
getline
(
fin
,
temp
);
getline
(
fin
,
temp
);
fin
.
close
();
mypred
=
stoi
(
temp
.
substr
(
temp
.
find
(
':'
)
+
1
));
}
if
(
mypred
==
node
)
{
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
successorInfo
.
set_succaddress
(
"0.0.0.0:"
+
to_string
(
x
.
id
()));
successorInfo
.
set_predaddress
(
"0.0.0.0:"
+
to_string
(
mypred
));
}
else
{
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string
tar_address
(
"0.0.0.0:"
+
to_string
(
node
));
channel
=
grpc
::
CreateChannel
(
tar_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context1
;
x
.
set_id
(
id
);
stub
->
GETSUCCESSOR
(
&
context1
,
x
,
&
y
);
//cout<<"Yes. We got the successor"<<endl;
Id
z
;
string
t_address
(
"0.0.0.0:"
+
to_string
(
y
.
id
()));
channel
=
grpc
::
CreateChannel
(
t_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context2
;
stub
->
GETPREDECESSOR
(
&
context2
,
y
,
&
z
);
//cout<<"Yes. We got the predecessor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
//cout<<"Predecessor: "<<z.id()<<endl;
successorInfo
.
set_succaddress
(
"0.0.0.0:"
+
to_string
(
y
.
id
()));
successorInfo
.
set_predaddress
(
"0.0.0.0:"
+
to_string
(
z
.
id
()));
}
}
else
{
//cout<<"We got no node with id greater than new node"<<endl;
//cout<<"Only one node present right now, that is me"<<endl;
successorInfo
.
set_succaddress
(
"0.0.0.0:"
+
to_string
(
my_id
));
successorInfo
.
set_predaddress
(
"0.0.0.0:"
+
to_string
(
my_id
));
}
//cout<<"Sending the successor and predecessor back to the new server"<<endl;
newResponder
.
Finish
(
successorInfo
,
Status
::
OK
,
this
);
}
else
if
(
reqType
==
INFORMSUCCESSOR
){
//return half of the keyvalue pairs to the requesting node
string
address
=
info
.
address
();
int
id
=
stoi
(
address
.
substr
(
address
.
find
(
':'
)
+
1
));
string
keyvalues
=
memManager
->
getKeyValuePairs
(
id
);
string
keys
=
keyvalues
.
substr
(
0
,
keyvalues
.
find
(
";;"
)
+
1
);
string
values
=
keyvalues
.
substr
(
keyvalues
.
find
(
";;"
)
+
2
);
//cout<<"Okay, my new predecessor is: "<<info.address()<<endl;
ifstream
fin
;
fin
.
open
(
NEIGHBOURS
);
string
successor
,
predecessor
;
getline
(
fin
,
successor
);
getline
(
fin
,
predecessor
);
fin
.
close
();
predecessor
=
info
.
address
();
ofstream
fout
;
fout
.
open
(
NEIGHBOURS
);
fout
<<
successor
<<
endl
;
fout
<<
predecessor
<<
endl
;
fout
.
close
();
keyValues
.
set_keys
(
keys
);
keyValues
.
set_values
(
values
);
//cout<<"Done making changes accordingly"<<endl;
informSuccessorResponder
.
Finish
(
keyValues
,
Status
::
OK
,
this
);
}
else
if
(
reqType
==
GETSUCCESSOR
)
{
//cout<<"Some server asked me to find the successor of "<<idvar1.id()<<endl;
int
idtofind
=
idvar1
.
id
();
int
fingers
[
16
];
ifstream
fin
;
//cout<<"Getting the finger table"<<endl;
fin
.
open
(
FINGER_TABLE
);
int
nums
=
0
;
for
(
int
i
=
0
;
i
<
16
;
i
++
)
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
==
"null"
||
temp
.
size
()
==
0
)
break
;
fingers
[
i
]
=
stoi
(
temp
);
}
fin
.
close
();
int
node
=-
1
;
int
next
=-
1
;
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
if
(
nums
>
0
&&
fingers
[
nums
-
1
]
<
idtofind
&&
my_id
>=
idtofind
)
{
node
=
fingers
[
nums
-
1
];
next
=
my_id
;
}
else
if
(
nums
>
0
&&
my_id
<
idtofind
&&
fingers
[
0
]
>=
idtofind
)
{
node
=
my_id
;
next
=
fingers
[
0
];
}
else
{
for
(
int
i
=
0
;
i
<
nums
;
i
++
)
{
if
(
i
>
0
&&
fingers
[
i
-
1
]
<
idtofind
&&
fingers
[
i
]
>=
idtofind
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
>
0
&&
fingers
[
i
]
<
fingers
[
i
-
1
]
&&
fingers
[
i
]
>
idtofind
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
==
nums
-
1
)
{
node
=
fingers
[
i
];
next
=
my_id
;
break
;
}
}
}
string
target_address
(
"0.0.0.0:"
+
to_string
(
next
));
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context
;
Id
x
;
x
.
set_id
(
next
);
Id
y
;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
stub
->
GETPREDECESSOR
(
&
context
,
x
,
&
y
);
if
(
y
.
id
()
==
node
)
{
//cout<<"Yes it is. We found the successor"<<endl;
//cout<<"Successor: "<<y.id()<<endl;
idvar2
.
set_id
(
x
.
id
());
}
else
{
string
tar_address
(
"0.0.0.0:"
+
to_string
(
node
));
channel
=
grpc
::
CreateChannel
(
tar_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context1
;
x
.
set_id
(
idtofind
);
//cout<<"No it is not. Asking the possible predecessor to find its successor"<<endl;
stub
->
GETSUCCESSOR
(
&
context1
,
x
,
&
y
);
idvar2
.
set_id
(
y
.
id
());
}
getSuccessorResponder
.
Finish
(
idvar2
,
Status
::
OK
,
this
);
}
else
if
(
reqType
==
GETPREDECESSOR
)
{
//cout<<"Someone asked me for my predecessor. Sending them"<<endl;
ifstream
fin
;
fin
.
open
(
NEIGHBOURS
);
string
successor
,
predecessor
;
getline
(
fin
,
successor
);
getline
(
fin
,
predecessor
);
fin
.
close
();
idvar2
.
set_id
(
stoi
(
predecessor
.
substr
(
predecessor
.
find
(
':'
)
+
1
)));
//cout<<"Sent my predecessor"<<endl;
getPredecessorResponder
.
Finish
(
idvar2
,
Status
::
OK
,
this
);
}
else
if
(
reqType
==
INFORMPREDECESSOR
)
{
//cout<<"Okay, i got the information that my successor has changed"<<endl;
//cout<<"My new successor: "<<info.address()<<endl;
ifstream
fin
;
fin
.
open
(
NEIGHBOURS
);
string
successor
,
predecessor
;
getline
(
fin
,
successor
);
getline
(
fin
,
predecessor
);
fin
.
close
();
successor
=
info
.
address
();
ofstream
fout
;
fout
.
open
(
NEIGHBOURS
);
fout
<<
successor
<<
endl
;
fout
<<
predecessor
<<
endl
;
fout
.
close
();
null
.
set_nothing
(
0
);
//cout<<"Okay, i made the necessary changes"<<endl;
informPredecessorResponder
.
Finish
(
null
,
Status
::
OK
,
this
);
}
else
{
string
addresses
=
addressarr
.
addresses
();
int
num
=
addressarr
.
servers
();
int
ids
[
num
];
string
addr
[
num
];
for
(
int
i
=
0
;
i
<
num
;
i
++
)
{
addr
[
i
]
=
addresses
.
substr
(
0
,
addresses
.
find
(
';'
));
ids
[
i
]
=
stoi
(
addr
[
i
].
substr
(
addr
[
i
].
find
(
':'
)
+
1
));
addresses
=
addresses
.
substr
(
addresses
.
find
(
';'
)
+
1
);
}
ofstream
fout
;
fout
.
open
(
FINGER_TABLE
);
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
int
prev_entry
=
my_id
;
int
i
=
0
;
int
my_index
=
0
;
for
(
i
=
0
;
i
<
num
;
i
++
)
if
(
ids
[
i
]
==
my_id
)
{
my_index
=
i
;
break
;
}
int
fingernodes
[
num
-
1
];
int
count
=
0
;
for
(
i
=
my_index
+
1
;
i
<
num
;
i
++
)
fingernodes
[
count
++
]
=
ids
[
i
];
for
(
i
=
0
;
i
<
my_index
;
i
++
)
fingernodes
[
count
++
]
=
ids
[
i
];
int
curr
=
0
;
for
(
i
=
0
;
i
<
16
;
i
++
)
{
int
next_entry
=
(
my_id
+
(
1
<<
i
))
%
(
1
<<
16
);
if
(
curr
!=
count
&&
next_entry
>
ids
[
num
-
1
]
&&
my_index
!=
0
)
{
fout
<<
ids
[
0
]
<<
endl
;
if
(
fingernodes
[
curr
]
!=
ids
[
0
])
{
for
(
int
j
=
0
;
j
<
count
;
j
++
)
if
(
fingernodes
[
j
]
==
ids
[
0
])
curr
=
j
;
}
}
else
{
while
(
curr
<
count
&&
next_entry
>
fingernodes
[
curr
])
curr
++
;
}
if
(
curr
<
count
&&
fingernodes
[
curr
]
>=
next_entry
)
fout
<<
fingernodes
[
curr
]
<<
endl
;
if
(
curr
==
count
)
fout
<<
"null"
<<
endl
;
}
fout
.
close
();
Null
n
;
n
.
set_nothing
(
0
);
updateTableResponder
.
Finish
(
n
,
Status
::
OK
,
this
);
}
status
=
FINISH
;
}
else
{
GPR_ASSERT
(
status
==
FINISH
);
delete
this
;
}
}
private:
KeyValueServices
::
AsyncService
*
service
;
ServerCompletionQueue
*
cq
;
ServerContext
context
;
Info
info
;
SuccessorInfo
successorInfo
;
Null
null
;
KeyValues
keyValues
;
Id
idvar1
;
Id
idvar2
;
Addresses
addressarr
;
ServerAsyncResponseWriter
<
SuccessorInfo
>
newResponder
;
ServerAsyncResponseWriter
<
KeyValues
>
informSuccessorResponder
;
ServerAsyncResponseWriter
<
Null
>
informPredecessorResponder
;
ServerAsyncResponseWriter
<
Id
>
getSuccessorResponder
;
ServerAsyncResponseWriter
<
Id
>
getPredecessorResponder
;
ServerAsyncResponseWriter
<
Null
>
updateTableResponder
;
enum
CallStatus
{
CREATE
,
PROCESS
,
FINISH
};
CallStatus
status
;
ServerRequest
reqType
;
};
class
CallData
{
class
CallData
{
public:
public:
CallData
(
KeyValueServices
::
AsyncService
*
service
,
ServerCompletionQueue
*
cq
,
RequestType
reqType
)
:
service
(
service
),
cq
(
cq
),
getResponder
(
&
context
),
putResponder
(
&
context
),
delResponder
(
&
context
),
status
(
CREATE
),
reqType
(
reqType
)
{
CallData
(
KeyValueServices
::
AsyncService
*
service
,
ServerCompletionQueue
*
cq
,
RequestType
reqType
)
:
service
(
service
),
cq
(
cq
),
getResponder
(
&
context
),
putResponder
(
&
context
),
delResponder
(
&
context
),
status
(
CREATE
),
reqType
(
reqType
)
{
...
@@ -73,6 +445,111 @@ public:
...
@@ -73,6 +445,111 @@ public:
}
else
if
(
status
==
PROCESS
)
{
}
else
if
(
status
==
PROCESS
)
{
new
CallData
(
service
,
cq
,
reqType
);
new
CallData
(
service
,
cq
,
reqType
);
if
(
reqType
==
GET
)
{
if
(
reqType
==
GET
)
{
int
succ
;
int
key_id
=
hash
(
key
.
key
());
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
ifstream
fin
;
fin
.
open
(
NEIGHBOURS
);
string
pred
;
getline
(
fin
,
pred
);
getline
(
fin
,
pred
);
fin
.
close
();
int
pred_id
;
if
(
pred
==
"-1"
)
pred_id
=-
1
;
else
pred_id
=
stoi
(
pred
.
substr
(
pred
.
find
(
':'
)
+
1
));
if
(
my_id
<
key_id
&&!
(
pred_id
<
key_id
&&
pred_id
>
my_id
))
{
//transfer request
int
fingers
[
16
];
ifstream
fin
;
//cout<<"Getting my finger table"<<endl;
fin
.
open
(
FINGER_TABLE
);
int
nums
=
0
;
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
==
"null"
||
temp
.
size
()
==
0
)
break
;
fingers
[
nums
++
]
=
stoi
(
temp
);
}
while
(
fin
);
fin
.
close
();
int
node
=-
1
;
int
next
=-
1
;
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
if
(
nums
>
0
&&
fingers
[
nums
-
1
]
<
key_id
&&
my_id
>=
key_id
)
{
node
=
fingers
[
nums
-
1
];
next
=
my_id
;
}
else
if
(
nums
>
0
&&
my_id
<
key_id
&&
fingers
[
0
]
>=
key_id
)
{
node
=
my_id
;
next
=
fingers
[
0
];
}
else
{
for
(
int
i
=
0
;
i
<
nums
;
i
++
)
{
if
(
i
>
0
&&
fingers
[
i
-
1
]
<
key_id
&&
fingers
[
i
]
>=
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
>
0
&&
fingers
[
i
]
<
fingers
[
i
-
1
]
&&
fingers
[
i
]
>
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
==
nums
-
1
)
{
node
=
fingers
[
i
];
next
=
my_id
;
break
;
}
}
}
string
target_address
(
"0.0.0.0:"
+
to_string
(
next
));
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context
;
Id
x
;
x
.
set_id
(
next
);
Id
y
;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int
mypred
;
if
(
next
!=
my_id
)
{
stub
->
GETPREDECESSOR
(
&
context
,
x
,
&
y
);
mypred
=
y
.
id
();
}
else
{
fin
.
open
(
NEIGHBOURS
);
string
temp
;
getline
(
fin
,
temp
);
getline
(
fin
,
temp
);
fin
.
close
();
mypred
=
stoi
(
temp
.
substr
(
temp
.
find
(
':'
)
+
1
));
}
if
(
mypred
==
node
)
{
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ
=
x
.
id
();
}
else
{
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string
tar_address
(
"0.0.0.0:"
+
to_string
(
node
));
channel
=
grpc
::
CreateChannel
(
tar_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context1
;
x
.
set_id
(
key_id
);
stub
->
GETSUCCESSOR
(
&
context1
,
x
,
&
y
);
//cout<<"Yes. We got the successor"<<endl;
succ
=
x
.
id
();
}
string
t_address
(
"0.0.0.0:"
+
to_string
(
succ
));
channel
=
grpc
::
CreateChannel
(
t_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
cont1
;
stub
->
GET
(
&
cont1
,
key
,
&
value
);
}
else
if
(
my_id
==
key_id
||
(
my_id
<
key_id
&&
pred_id
<
key_id
&&
pred_id
>
my_id
))
{
cout
<<
"SERVER SERVES A GET REQUEST WITH PARAMETER KEY : "
<<
key
.
key
();
cout
<<
"SERVER SERVES A GET REQUEST WITH PARAMETER KEY : "
<<
key
.
key
();
int
status
=
200
;
int
status
=
200
;
pthread_mutex_lock
(
&
_masterLock
);
pthread_mutex_lock
(
&
_masterLock
);
...
@@ -87,17 +564,440 @@ public:
...
@@ -87,17 +564,440 @@ public:
value
.
set_error
(
v
);
value
.
set_error
(
v
);
}
}
cout
<<
" RETURN VALUE : "
<<
value
.
value
()
<<
endl
;
cout
<<
" RETURN VALUE : "
<<
value
.
value
()
<<
endl
;
}
else
{
if
(
pred_id
==-
1
||
pred_id
<
key_id
)
{
cout
<<
"SERVER SERVES A GET REQUEST WITH PARAMETER KEY : "
<<
key
.
key
();
int
status
=
200
;
pthread_mutex_lock
(
&
_masterLock
);
string
v
=
memManager
->
get
(
&
status
,
key
.
key
());
pthread_mutex_unlock
(
&
_masterLock
);
value
.
set_value
(
v
);
if
(
status
==
200
)
value
.
set_status
(
200
);
else
{
value
.
set_status
(
400
);
value
.
set_error
(
v
);
}
cout
<<
" RETURN VALUE : "
<<
value
.
value
()
<<
endl
;
}
else
{
//transfer the request
int
fingers
[
16
];
ifstream
fin
;
//cout<<"Getting my finger table"<<endl;
fin
.
open
(
FINGER_TABLE
);
int
nums
=
0
;
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
==
"null"
||
temp
.
size
()
==
0
)
break
;
fingers
[
nums
++
]
=
stoi
(
temp
);
}
while
(
fin
);
fin
.
close
();
int
node
=-
1
;
int
next
=-
1
;
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
if
(
nums
>
0
&&
fingers
[
nums
-
1
]
<
key_id
&&
my_id
>=
key_id
)
{
node
=
fingers
[
nums
-
1
];
next
=
my_id
;
}
else
if
(
nums
>
0
&&
my_id
<
key_id
&&
fingers
[
0
]
>=
key_id
)
{
node
=
my_id
;
next
=
fingers
[
0
];
}
else
{
for
(
int
i
=
0
;
i
<
nums
;
i
++
)
{
if
(
i
>
0
&&
fingers
[
i
-
1
]
<
key_id
&&
fingers
[
i
]
>=
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
>
0
&&
fingers
[
i
]
<
fingers
[
i
-
1
]
&&
fingers
[
i
]
>
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
==
nums
-
1
)
{
node
=
fingers
[
i
];
next
=
my_id
;
break
;
}
}
}
string
target_address
(
"0.0.0.0:"
+
to_string
(
next
));
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context
;
Id
x
;
x
.
set_id
(
next
);
Id
y
;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int
mypred
;
if
(
next
!=
my_id
)
{
stub
->
GETPREDECESSOR
(
&
context
,
x
,
&
y
);
mypred
=
y
.
id
();
}
else
{
fin
.
open
(
NEIGHBOURS
);
string
temp
;
getline
(
fin
,
temp
);
getline
(
fin
,
temp
);
fin
.
close
();
mypred
=
stoi
(
temp
.
substr
(
temp
.
find
(
':'
)
+
1
));
}
if
(
mypred
==
node
)
{
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ
=
x
.
id
();
}
else
{
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string
tar_address
(
"0.0.0.0:"
+
to_string
(
node
));
channel
=
grpc
::
CreateChannel
(
tar_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context1
;
x
.
set_id
(
key_id
);
stub
->
GETSUCCESSOR
(
&
context1
,
x
,
&
y
);
//cout<<"Yes. We got the successor"<<endl;
succ
=
x
.
id
();
}
string
t_address
(
"0.0.0.0:"
+
to_string
(
succ
));
channel
=
grpc
::
CreateChannel
(
t_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
cont1
;
stub
->
GET
(
&
cont1
,
key
,
&
value
);
}
}
getResponder
.
Finish
(
value
,
Status
::
OK
,
this
);
getResponder
.
Finish
(
value
,
Status
::
OK
,
this
);
}
else
if
(
reqType
==
PUT
)
{
}
else
if
(
reqType
==
PUT
)
{
int
succ
;
int
key_id
=
hash
(
keyvalue
.
key
());
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
ifstream
fin
;
fin
.
open
(
NEIGHBOURS
);
string
pred
;
getline
(
fin
,
pred
);
getline
(
fin
,
pred
);
fin
.
close
();
int
pred_id
;
if
(
pred
==
"-1"
)
pred_id
=-
1
;
else
pred_id
=
stoi
(
pred
.
substr
(
pred
.
find
(
':'
)
+
1
));
if
(
my_id
<
key_id
&&!
(
pred_id
<
key_id
&&
pred_id
>
my_id
))
{
//transfer request
int
fingers
[
16
];
ifstream
fin
;
//cout<<"Getting my finger table"<<endl;
fin
.
open
(
FINGER_TABLE
);
int
nums
=
0
;
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
==
"null"
||
temp
.
size
()
==
0
)
break
;
fingers
[
nums
++
]
=
stoi
(
temp
);
}
while
(
fin
);
fin
.
close
();
int
node
=-
1
;
int
next
=-
1
;
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
if
(
nums
>
0
&&
fingers
[
nums
-
1
]
<
key_id
&&
my_id
>=
key_id
)
{
node
=
fingers
[
nums
-
1
];
next
=
my_id
;
}
else
if
(
nums
>
0
&&
my_id
<
key_id
&&
fingers
[
0
]
>=
key_id
)
{
node
=
my_id
;
next
=
fingers
[
0
];
}
else
{
for
(
int
i
=
0
;
i
<
nums
;
i
++
)
{
if
(
i
>
0
&&
fingers
[
i
-
1
]
<
key_id
&&
fingers
[
i
]
>=
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
>
0
&&
fingers
[
i
]
<
fingers
[
i
-
1
]
&&
fingers
[
i
]
>
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
==
nums
-
1
)
{
node
=
fingers
[
i
];
next
=
my_id
;
break
;
}
}
}
string
target_address
(
"0.0.0.0:"
+
to_string
(
next
));
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context
;
Id
x
;
x
.
set_id
(
next
);
Id
y
;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int
mypred
;
if
(
next
!=
my_id
)
{
stub
->
GETPREDECESSOR
(
&
context
,
x
,
&
y
);
mypred
=
y
.
id
();
}
else
{
fin
.
open
(
NEIGHBOURS
);
string
temp
;
getline
(
fin
,
temp
);
getline
(
fin
,
temp
);
fin
.
close
();
mypred
=
stoi
(
temp
.
substr
(
temp
.
find
(
':'
)
+
1
));
}
if
(
mypred
==
node
)
{
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ
=
x
.
id
();
}
else
{
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string
tar_address
(
"0.0.0.0:"
+
to_string
(
node
));
channel
=
grpc
::
CreateChannel
(
tar_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context1
;
x
.
set_id
(
key_id
);
stub
->
GETSUCCESSOR
(
&
context1
,
x
,
&
y
);
//cout<<"Yes. We got the successor"<<endl;
succ
=
x
.
id
();
}
string
t_address
(
"0.0.0.0:"
+
to_string
(
succ
));
channel
=
grpc
::
CreateChannel
(
t_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
cont1
;
stub
->
PUT
(
&
cont1
,
keyvalue
,
&
stat
);
}
else
if
(
my_id
==
key_id
||
(
my_id
<
key_id
&&
pred_id
<
key_id
&&
pred_id
>
my_id
))
{
cout
<<
"SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : "
<<
keyvalue
.
key
()
<<
" & VALUE : "
<<
keyvalue
.
value
()
<<
endl
;
cout
<<
"SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : "
<<
keyvalue
.
key
()
<<
" & VALUE : "
<<
keyvalue
.
value
()
<<
endl
;
pthread_mutex_lock
(
&
_masterLock
);
pthread_mutex_lock
(
&
_masterLock
);
memManager
->
put
(
keyvalue
.
key
(),
keyvalue
.
value
());
memManager
->
put
(
keyvalue
.
key
(),
keyvalue
.
value
());
pthread_mutex_unlock
(
&
_masterLock
);
pthread_mutex_unlock
(
&
_masterLock
);
stat
.
set_status
(
200
);
stat
.
set_status
(
200
);
}
else
{
if
(
pred_id
==-
1
||
pred_id
<
key_id
)
{
cout
<<
"SERVER SERVES A PUT REQUEST WITH PARAMETER KEY : "
<<
keyvalue
.
key
()
<<
" & VALUE : "
<<
keyvalue
.
value
()
<<
endl
;
pthread_mutex_lock
(
&
_masterLock
);
memManager
->
put
(
keyvalue
.
key
(),
keyvalue
.
value
());
pthread_mutex_unlock
(
&
_masterLock
);
stat
.
set_status
(
200
);
}
else
{
//transfer the request
int
fingers
[
16
];
ifstream
fin
;
//cout<<"Getting my finger table"<<endl;
fin
.
open
(
FINGER_TABLE
);
int
nums
=
0
;
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
==
"null"
||
temp
.
size
()
==
0
)
break
;
fingers
[
nums
++
]
=
stoi
(
temp
);
}
while
(
fin
);
fin
.
close
();
int
node
=-
1
;
int
next
=-
1
;
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
if
(
nums
>
0
&&
fingers
[
nums
-
1
]
<
key_id
&&
my_id
>=
key_id
)
{
node
=
fingers
[
nums
-
1
];
next
=
my_id
;
}
else
if
(
nums
>
0
&&
my_id
<
key_id
&&
fingers
[
0
]
>=
key_id
)
{
node
=
my_id
;
next
=
fingers
[
0
];
}
else
{
for
(
int
i
=
0
;
i
<
nums
;
i
++
)
{
if
(
i
>
0
&&
fingers
[
i
-
1
]
<
key_id
&&
fingers
[
i
]
>=
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
>
0
&&
fingers
[
i
]
<
fingers
[
i
-
1
]
&&
fingers
[
i
]
>
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
==
nums
-
1
)
{
node
=
fingers
[
i
];
next
=
my_id
;
break
;
}
}
}
string
target_address
(
"0.0.0.0:"
+
to_string
(
next
));
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context
;
Id
x
;
x
.
set_id
(
next
);
Id
y
;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int
mypred
;
if
(
next
!=
my_id
)
{
stub
->
GETPREDECESSOR
(
&
context
,
x
,
&
y
);
mypred
=
y
.
id
();
}
else
{
fin
.
open
(
NEIGHBOURS
);
string
temp
;
getline
(
fin
,
temp
);
getline
(
fin
,
temp
);
fin
.
close
();
mypred
=
stoi
(
temp
.
substr
(
temp
.
find
(
':'
)
+
1
));
}
if
(
mypred
==
node
)
{
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ
=
x
.
id
();
}
else
{
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string
tar_address
(
"0.0.0.0:"
+
to_string
(
node
));
channel
=
grpc
::
CreateChannel
(
tar_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context1
;
x
.
set_id
(
key_id
);
stub
->
GETSUCCESSOR
(
&
context1
,
x
,
&
y
);
//cout<<"Yes. We got the successor"<<endl;
succ
=
x
.
id
();
}
string
t_address
(
"0.0.0.0:"
+
to_string
(
succ
));
channel
=
grpc
::
CreateChannel
(
t_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
cont1
;
stub
->
PUT
(
&
cont1
,
keyvalue
,
&
stat
);
}
}
putResponder
.
Finish
(
stat
,
Status
::
OK
,
this
);
putResponder
.
Finish
(
stat
,
Status
::
OK
,
this
);
}
else
{
}
else
{
int
succ
;
int
key_id
=
hash
(
key
.
key
());
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
ifstream
fin
;
fin
.
open
(
NEIGHBOURS
);
string
pred
;
getline
(
fin
,
pred
);
getline
(
fin
,
pred
);
fin
.
close
();
int
pred_id
;
if
(
pred
==
"-1"
)
pred_id
=-
1
;
else
pred_id
=
stoi
(
pred
.
substr
(
pred
.
find
(
':'
)
+
1
));
if
(
my_id
<
key_id
&&!
(
pred_id
<
key_id
&&
pred_id
>
my_id
))
{
//transfer request
int
fingers
[
16
];
ifstream
fin
;
//cout<<"Getting my finger table"<<endl;
fin
.
open
(
FINGER_TABLE
);
int
nums
=
0
;
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
==
"null"
||
temp
.
size
()
==
0
)
break
;
fingers
[
nums
++
]
=
stoi
(
temp
);
}
while
(
fin
);
fin
.
close
();
int
node
=-
1
;
int
next
=-
1
;
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
if
(
nums
>
0
&&
fingers
[
nums
-
1
]
<
key_id
&&
my_id
>=
key_id
)
{
node
=
fingers
[
nums
-
1
];
next
=
my_id
;
}
else
if
(
nums
>
0
&&
my_id
<
key_id
&&
fingers
[
0
]
>=
key_id
)
{
node
=
my_id
;
next
=
fingers
[
0
];
}
else
{
for
(
int
i
=
0
;
i
<
nums
;
i
++
)
{
if
(
i
>
0
&&
fingers
[
i
-
1
]
<
key_id
&&
fingers
[
i
]
>=
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
>
0
&&
fingers
[
i
]
<
fingers
[
i
-
1
]
&&
fingers
[
i
]
>
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
==
nums
-
1
)
{
node
=
fingers
[
i
];
next
=
my_id
;
break
;
}
}
}
string
target_address
(
"0.0.0.0:"
+
to_string
(
next
));
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context
;
Id
x
;
x
.
set_id
(
next
);
Id
y
;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int
mypred
;
if
(
next
!=
my_id
)
{
stub
->
GETPREDECESSOR
(
&
context
,
x
,
&
y
);
mypred
=
y
.
id
();
}
else
{
fin
.
open
(
NEIGHBOURS
);
string
temp
;
getline
(
fin
,
temp
);
getline
(
fin
,
temp
);
fin
.
close
();
mypred
=
stoi
(
temp
.
substr
(
temp
.
find
(
':'
)
+
1
));
}
if
(
mypred
==
node
)
{
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ
=
x
.
id
();
}
else
{
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string
tar_address
(
"0.0.0.0:"
+
to_string
(
node
));
channel
=
grpc
::
CreateChannel
(
tar_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context1
;
x
.
set_id
(
key_id
);
stub
->
GETSUCCESSOR
(
&
context1
,
x
,
&
y
);
//cout<<"Yes. We got the successor"<<endl;
succ
=
x
.
id
();
}
string
t_address
(
"0.0.0.0:"
+
to_string
(
succ
));
channel
=
grpc
::
CreateChannel
(
t_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
cont1
;
stub
->
DEL
(
&
cont1
,
key
,
&
stat
);
}
else
if
(
my_id
==
key_id
||
(
my_id
<
key_id
&&
pred_id
<
key_id
&&
pred_id
>
my_id
))
{
cout
<<
"SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : "
<<
key
.
key
()
<<
endl
;
cout
<<
"SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : "
<<
key
.
key
()
<<
endl
;
int
status
=
200
;
int
status
=
200
;
...
@@ -108,9 +1008,118 @@ public:
...
@@ -108,9 +1008,118 @@ public:
if
(
status
==
200
)
if
(
status
==
200
)
stat
.
set_status
(
200
);
stat
.
set_status
(
200
);
else
{
else
{
//as this server does not have the keyvalue pair, send request to next server using CHORD
stat
.
set_status
(
400
);
stat
.
set_status
(
400
);
stat
.
set_error
(
"KEY NOT EXIST"
);
stat
.
set_error
(
"KEY NOT EXIST"
);
}
}
}
else
{
if
(
pred_id
==-
1
||
pred_id
<
key_id
)
{
cout
<<
"SERVER SERVES A DEL REQUEST WITH PARAMETER KEY : "
<<
key
.
key
()
<<
endl
;
int
status
=
200
;
pthread_mutex_lock
(
&
_masterLock
);
memManager
->
del
(
&
status
,
key
.
key
());
pthread_mutex_unlock
(
&
_masterLock
);
if
(
status
==
200
)
stat
.
set_status
(
200
);
else
{
//as this server does not have the keyvalue pair, send request to next server using CHORD
stat
.
set_status
(
400
);
stat
.
set_error
(
"KEY NOT EXIST"
);
}
}
else
{
int
fingers
[
16
];
ifstream
fin
;
//cout<<"Getting my finger table"<<endl;
fin
.
open
(
FINGER_TABLE
);
int
nums
=
0
;
do
{
string
temp
;
getline
(
fin
,
temp
);
if
(
temp
==
"null"
||
temp
.
size
()
==
0
)
break
;
fingers
[
nums
++
]
=
stoi
(
temp
);
}
while
(
fin
);
fin
.
close
();
int
node
=-
1
;
int
next
=-
1
;
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
if
(
nums
>
0
&&
fingers
[
nums
-
1
]
<
key_id
&&
my_id
>=
key_id
)
{
node
=
fingers
[
nums
-
1
];
next
=
my_id
;
}
else
if
(
nums
>
0
&&
my_id
<
key_id
&&
fingers
[
0
]
>=
key_id
)
{
node
=
my_id
;
next
=
fingers
[
0
];
}
else
{
for
(
int
i
=
0
;
i
<
nums
;
i
++
)
{
if
(
i
>
0
&&
fingers
[
i
-
1
]
<
key_id
&&
fingers
[
i
]
>=
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
>
0
&&
fingers
[
i
]
<
fingers
[
i
-
1
]
&&
fingers
[
i
]
>
key_id
)
{
node
=
fingers
[
i
-
1
];
next
=
fingers
[
i
];
break
;
}
else
if
(
i
==
nums
-
1
)
{
node
=
fingers
[
i
];
next
=
my_id
;
break
;
}
}
}
string
target_address
(
"0.0.0.0:"
+
to_string
(
next
));
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context
;
Id
x
;
x
.
set_id
(
next
);
Id
y
;
//cout<<"asking the possible successor whether its predecessor is less than the new node id"<<endl;
int
mypred
;
if
(
next
!=
my_id
)
{
stub
->
GETPREDECESSOR
(
&
context
,
x
,
&
y
);
mypred
=
y
.
id
();
}
else
{
fin
.
open
(
NEIGHBOURS
);
string
temp
;
getline
(
fin
,
temp
);
getline
(
fin
,
temp
);
fin
.
close
();
mypred
=
stoi
(
temp
.
substr
(
temp
.
find
(
':'
)
+
1
));
}
if
(
mypred
==
node
)
{
//cout<<"Yes it is. So we found the successor and the predecessor"<<endl;
//cout<<"Successor: "<<x.id()<<endl;
//cout<<"Predecessor: "<<mypred<<endl;
succ
=
x
.
id
();
}
else
{
//cout<<"No it is not. We will ask the possible predecessor to find the successor of new node"<<endl;
string
tar_address
(
"0.0.0.0:"
+
to_string
(
node
));
channel
=
grpc
::
CreateChannel
(
tar_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
context1
;
x
.
set_id
(
key_id
);
stub
->
GETSUCCESSOR
(
&
context1
,
x
,
&
y
);
//cout<<"Yes. We got the successor"<<endl;
succ
=
x
.
id
();
}
string
t_address
(
"0.0.0.0:"
+
to_string
(
succ
));
channel
=
grpc
::
CreateChannel
(
t_address
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
ClientContext
cont1
;
stub
->
DEL
(
&
cont1
,
key
,
&
stat
);
}
}
delResponder
.
Finish
(
stat
,
Status
::
OK
,
this
);
delResponder
.
Finish
(
stat
,
Status
::
OK
,
this
);
}
}
/* --------------------------------CONTENT OF CACHE ONLY KEY-------------------------------- */
/* --------------------------------CONTENT OF CACHE ONLY KEY-------------------------------- */
...
@@ -121,6 +1130,9 @@ public:
...
@@ -121,6 +1130,9 @@ public:
delete
this
;
delete
this
;
}
}
}
}
int
hash
(
string
s
)
{
return
(((
int
)
s
.
at
(
0
))
<<
8
)
+
((
int
)
s
.
at
(
1
));
}
private:
private:
KeyValueServices
::
AsyncService
*
service
;
KeyValueServices
::
AsyncService
*
service
;
...
@@ -147,6 +1159,28 @@ void setupServer(string server_address) {
...
@@ -147,6 +1159,28 @@ void setupServer(string server_address) {
builder
.
RegisterService
(
&
service
);
builder
.
RegisterService
(
&
service
);
}
}
void
*
handleServerRequests
(
void
*
thread_id
)
{
unique_ptr
<
ServerCompletionQueue
>
comp_queue
=
builder
.
AddCompletionQueue
();
pthread_mutex_lock
(
&
myLock
);
while
(
!
start
)
pthread_cond_wait
(
&
startRpcs
,
&
myLock
);
pthread_mutex_unlock
(
&
myLock
);
new
ServerData
(
&
service
,
comp_queue
.
get
(),
NEW
);
new
ServerData
(
&
service
,
comp_queue
.
get
(),
INFORMSUCCESSOR
);
new
ServerData
(
&
service
,
comp_queue
.
get
(),
INFORMPREDECESSOR
);
new
ServerData
(
&
service
,
comp_queue
.
get
(),
GETSUCCESSOR
);
new
ServerData
(
&
service
,
comp_queue
.
get
(),
GETPREDECESSOR
);
new
ServerData
(
&
service
,
comp_queue
.
get
(),
UPDATETABLE
);
void
*
tag
;
bool
ok
;
while
(
true
)
{
GPR_ASSERT
(
comp_queue
->
Next
(
&
tag
,
&
ok
));
GPR_ASSERT
(
ok
);
static_cast
<
ServerData
*>
(
tag
)
->
Proceed
();
}
return
0
;
}
void
*
handleRpcs
(
void
*
thread_id
)
{
void
*
handleRpcs
(
void
*
thread_id
)
{
unique_ptr
<
ServerCompletionQueue
>
comp_queue
=
builder
.
AddCompletionQueue
();
unique_ptr
<
ServerCompletionQueue
>
comp_queue
=
builder
.
AddCompletionQueue
();
pthread_mutex_lock
(
&
myLock
);
pthread_mutex_lock
(
&
myLock
);
...
@@ -168,6 +1202,8 @@ void *handleRpcs(void *thread_id) {
...
@@ -168,6 +1202,8 @@ void *handleRpcs(void *thread_id) {
}
}
void
assignThreads
(
int
num_threads
)
{
void
assignThreads
(
int
num_threads
)
{
dist_worker_id
=-
1
;
pthread_create
(
&
dist_worker
,
NULL
,
handleServerRequests
,(
void
*
)
&
dist_worker_id
);
workers
=
(
pthread_t
*
)
malloc
(
sizeof
(
pthread_t
)
*
num_threads
);
workers
=
(
pthread_t
*
)
malloc
(
sizeof
(
pthread_t
)
*
num_threads
);
worker_id
=
(
int
*
)
malloc
(
sizeof
(
int
)
*
num_threads
);
worker_id
=
(
int
*
)
malloc
(
sizeof
(
int
)
*
num_threads
);
for
(
int
i
=
0
;
i
<
num_threads
;
i
++
)
{
for
(
int
i
=
0
;
i
<
num_threads
;
i
++
)
{
...
@@ -183,6 +1219,157 @@ void signalHandler(int signum) {
...
@@ -183,6 +1219,157 @@ void signalHandler(int signum) {
exit
(
0
);
exit
(
0
);
}
}
void
updateAllFingerTables
()
{
string
target_address
(
DNS_SERVER
);
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
Null
null1
;
null1
.
set_nothing
(
0
);
Null
null2
;
ClientContext
context
;
Addresses
addr
;
stub
->
GETSERVERS
(
&
context
,
null1
,
&
addr
);
string
addresses
=
addr
.
addresses
();
int
num
=
addr
.
servers
();
string
ad
[
num
];
int
ids
[
num
];
for
(
int
i
=
0
;
i
<
num
;
i
++
)
{
ad
[
i
]
=
addresses
.
substr
(
0
,
addresses
.
find
(
';'
));
ids
[
i
]
=
stoi
(
ad
[
i
].
substr
(
ad
[
i
].
find
(
':'
)
+
1
));
addresses
=
addresses
.
substr
(
addresses
.
find
(
';'
)
+
1
);
}
ofstream
fout
;
fout
.
open
(
FINGER_TABLE
);
int
my_id
=
stoi
(
params
.
find
(
"LISTENING_PORT"
)
->
second
);
int
prev_entry
=
my_id
;
int
i
=
0
;
int
my_index
=
0
;
for
(
i
=
0
;
i
<
num
;
i
++
)
if
(
ids
[
i
]
==
my_id
)
{
my_index
=
i
;
break
;
}
int
fingernodes
[
num
-
1
];
int
count
=
0
;
for
(
i
=
my_index
+
1
;
i
<
num
;
i
++
)
fingernodes
[
count
++
]
=
ids
[
i
];
for
(
i
=
0
;
i
<
my_index
;
i
++
)
fingernodes
[
count
++
]
=
ids
[
i
];
int
curr
=
0
;
for
(
i
=
0
;
i
<
16
;
i
++
)
{
int
next_entry
=
(
my_id
+
(
1
<<
i
))
%
(
1
<<
16
);
if
(
curr
!=
count
&&
next_entry
>
ids
[
num
-
1
]
&&
my_index
!=
0
)
{
fout
<<
ids
[
0
]
<<
endl
;
if
(
fingernodes
[
curr
]
!=
ids
[
0
])
{
for
(
int
j
=
0
;
j
<
count
;
j
++
)
if
(
fingernodes
[
j
]
==
ids
[
0
])
curr
=
j
;
}
}
else
{
while
(
curr
<
count
&&
next_entry
>
fingernodes
[
curr
])
curr
++
;
}
if
(
curr
<
count
&&
fingernodes
[
curr
]
>
next_entry
)
fout
<<
fingernodes
[
curr
]
<<
endl
;
if
(
curr
==
count
)
fout
<<
"null"
<<
endl
;
}
fout
.
close
();
ClientContext
context2
;
stub
->
UPDATEFINGERTABLES
(
&
context2
,
null1
,
&
null2
);
}
void
register_server_DNS
(
string
my_address
)
{
//cout<<"Registering to DNS"<<endl;
string
target_address
(
DNS_SERVER
);
shared_ptr
<
Channel
>
channel
=
grpc
::
CreateChannel
(
target_address
,
grpc
::
InsecureChannelCredentials
());
unique_ptr
<
KeyValueServices
::
Stub
>
stub
;
stub
=
KeyValueServices
::
NewStub
(
channel
);
Null
null
;
null
.
set_nothing
(
0
);
Info
info
;
ClientContext
context
;
Status
status
=
stub
->
GETADDRESS
(
&
context
,
null
,
&
info
);
//cout<<"Address received:"<<info.address()<<endl;
string
old_server
;
if
(
status
.
ok
())
{
old_server
=
info
.
address
();
info
.
set_address
(
my_address
);
ClientContext
new_context
;
//cout<<"Adding address to DNS"<<endl;
stub
->
ADDADDRESS
(
&
new_context
,
info
,
&
null
);
//cout<<"Address added to DNS"<<endl;
ofstream
fout
;
//cout<<"Generating initial finger table"<<endl;
fout
.
open
(
FINGER_TABLE
);
for
(
int
i
=
0
;
i
<
16
;
i
++
)
fout
<<
"null"
<<
endl
;
fout
.
close
();
//cout<<"Initial finger table generated"<<endl;
if
(
old_server
==
"null"
)
{
//cout<<"Initializing initial neighbours"<<endl;
fout
.
open
(
NEIGHBOURS
);
fout
<<
"-1"
<<
endl
;
fout
<<
"-1"
<<
endl
;
fout
.
close
();
//cout<<"Initialized initial neighbours"<<endl;
return
;
}
channel
=
grpc
::
CreateChannel
(
old_server
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
info
.
set_address
(
my_address
);
SuccessorInfo
successorInfo
;
ClientContext
context1
;
//cout<<"Sending request to server: "<<old_server<<endl;
status
=
stub
->
NEW
(
&
context1
,
info
,
&
successorInfo
);
//cout<<"Request sent. Successor and predecessor info received"<<endl;
if
(
status
.
ok
())
{
string
successor
=
successorInfo
.
succaddress
();
string
predecessor
=
successorInfo
.
predaddress
();
ofstream
fout
;
//cout<<"Storing neighbours info"<<endl;
fout
.
open
(
NEIGHBOURS
);
fout
<<
successor
<<
endl
;
fout
<<
predecessor
<<
endl
;
fout
.
close
();
//cout<<"Successor: "<<successor<<endl;
//cout<<"Predecessor: "<<predecessor<<endl;
//cout<<"Stored neighbours info"<<endl;
channel
=
grpc
::
CreateChannel
(
successor
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
info
.
set_address
(
my_address
);
KeyValues
keyValues
;
ClientContext
context2
;
//cout<<"Informing successor about my presence"<<endl;
status
=
stub
->
INFORMSUCCESSOR
(
&
context2
,
info
,
&
keyValues
);
string
keys
=
keyValues
.
keys
();
string
values
=
keyValues
.
values
();
while
(
true
)
{
string
key
=
keys
.
substr
(
0
,
keys
.
find
(
';'
));
string
value
=
values
.
substr
(
0
,
values
.
find
(
';'
));
if
(
key
.
size
()
==
0
)
break
;
if
(
keys
.
find
(
';'
)
+
1
==
keys
.
size
())
break
;
keys
=
keys
.
substr
(
keys
.
find
(
';'
)
+
1
);
values
=
values
.
substr
(
values
.
find
(
';'
)
+
1
);
memManager
->
put
(
key
,
value
);
}
//cout<<"Informed succesor"<<endl;
channel
=
grpc
::
CreateChannel
(
predecessor
,
grpc
::
InsecureChannelCredentials
());
stub
=
KeyValueServices
::
NewStub
(
channel
);
info
.
set_address
(
my_address
);
ClientContext
context3
;
//cout<<"Informing predecessor about my presence"<<endl;
status
=
stub
->
INFORMPREDECESSOR
(
&
context3
,
info
,
&
null
);
//cout<<"Informed predecessor"<<endl;
updateAllFingerTables
();
}
}
}
int
main
(
int
agrc
,
char
**
argv
)
{
int
main
(
int
agrc
,
char
**
argv
)
{
pthread_mutex_init
(
&
_masterLock
,
NULL
);
pthread_mutex_init
(
&
_masterLock
,
NULL
);
start
=
false
;
start
=
false
;
...
@@ -200,6 +1387,8 @@ int main(int agrc, char **argv) {
...
@@ -200,6 +1387,8 @@ int main(int agrc, char **argv) {
else
else
memManager
=
new
storageLRU
(
stoi
(
params
.
find
(
"CACHE_SIZE"
)
->
second
));
memManager
=
new
storageLRU
(
stoi
(
params
.
find
(
"CACHE_SIZE"
)
->
second
));
register_server_DNS
(
server_address
);
setupServer
(
server_address
);
setupServer
(
server_address
);
assignThreads
(
num_threads
);
assignThreads
(
num_threads
);
sleep
(
1
);
sleep
(
1
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment