Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
C
cs733
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Sushant Mahajan
cs733
Commits
6ebcc6a3
Commit
6ebcc6a3
authored
Feb 08, 2015
by
Sushant Mahajan
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
modularized the code
parent
ba33c25e
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
116 additions
and
63 deletions
+116
-63
assignment2/connhandler.go
assignment2/connhandler.go
+32
-0
assignment2/kvstore.go
assignment2/kvstore.go
+62
-42
assignment2/raft.go
assignment2/raft.go
+22
-21
No files found.
assignment2/connhandler.go
0 → 100644
View file @
6ebcc6a3
package
main
import
(
"kvstore"
"raft"
)
func
main
()
{
server_id
,
err
:=
strconv
.
Atoi
(
os
.
Args
[
1
])
if
err
!=
nil
{
Info
.
Println
(
"argument "
,
os
.
Args
[
1
],
"is not string"
)
}
initializeLogger
(
server_id
)
Info
.
Println
(
"Start"
)
this_server
,
_
:=
NewServerConfig
(
server_id
)
num_servers
,
err2
:=
strconv
.
Atoi
((
os
.
Args
[
2
]))
if
err2
!=
nil
{
Info
.
Println
(
"argument "
,
os
.
Args
[
2
],
"is not string"
)
}
cluster_config
,
_
:=
NewClusterConfig
(
num_servers
)
Info
.
Println
(
reflect
.
TypeOf
(
this_server
))
Info
.
Println
(
reflect
.
TypeOf
(
cluster_config
))
initializeInterServerCommunication
(
this_server
)
var
dummy_input
string
fmt
.
Scanln
(
&
dummy_input
)
}
assignment2/kvstore.go
View file @
6ebcc6a3
package
main
package
kvstore
import
(
"bufio"
...
...
@@ -67,27 +67,27 @@ var table *KeyValueStore
*arguments: none
*return: none
*/
func
startServer
()
{
logger
.
Println
(
"Server started"
)
listener
,
err
:=
net
.
Listen
(
"tcp"
,
":5000"
)
if
err
!=
nil
{
logger
.
Println
(
"Could not start server!"
)
}
//initialize key value store
table
=
&
KeyValueStore
{
dictionary
:
make
(
map
[
string
]
*
Data
)}
//infinite loop
for
{
conn
,
err
:=
listener
.
Accept
()
if
err
!=
nil
{
logger
.
Println
(
err
)
continue
}
go
handleClient
(
conn
,
table
)
//client connection handler
}
}
//
func startServer() {
//
logger.Println("Server started")
//
listener, err := net.Listen("tcp", ":5000")
//
if err != nil {
//
logger.Println("Could not start server!")
//
}
//
//initialize key value store
//
table = &KeyValueStore{dictionary: make(map[string]*Data)}
//
//infinite loop
//
for {
//
conn, err := listener.Accept()
//
if err != nil {
//
logger.Println(err)
//
continue
//
}
// go handleClient(conn
) //client connection handler
//
}
//
}
/*Function to read data from the connection and put it on the channel so it could be read in a systematic fashion.
*arguments: channel shared between this go routine and other functions performing actions based on the commands given, client connection
...
...
@@ -119,10 +119,10 @@ func write(conn net.Conn, msg string) {
}
/*After initial establishment of the connection with the client, this go routine handles further interaction
*arguments: client connection
, pointer to the hastable structure
*arguments: client connection
*return: none
*/
func
handleClient
(
conn
net
.
Conn
,
table
*
KeyValueStore
)
{
func
handleClient
(
conn
net
.
Conn
)
{
defer
conn
.
Close
()
//channel for every connection for every client
ch
:=
make
(
chan
[]
byte
)
...
...
@@ -248,10 +248,10 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
/*Function parses the command provided by the client and delegates further action to command specific functions.
*Based on the return values of those functions, send appropriate messages to the client.
*arguments: client connection, message from client,
pointer to hashtable structure,
channel shared with myRead function
*arguments: client connection, message from client, channel shared with myRead function
*return: none
*/
func
parseInput
(
conn
net
.
Conn
,
msg
string
,
table
*
KeyValueStore
,
ch
chan
[]
byte
)
{
func
parseInput
(
conn
net
.
Conn
,
msg
string
,
ch
chan
[]
byte
)
{
tokens
:=
strings
.
Fields
(
msg
)
//general error, don't check for commands, avoid the pain ;)
if
len
(
tokens
)
>
MAX_CMD_ARGS
||
len
(
tokens
)
<
MIN_CMD_ARGS
{
...
...
@@ -427,10 +427,10 @@ func readValue(ch chan []byte, n uint64) ([]byte, bool) {
}
/*Delegate function responsible for all parsing and hashtable interactions for the SET command sent by client
*arguments: client connection, tokenized command sent by the client,
pointer to hashtable structure,
channel shared with myRead
*arguments: client connection, tokenized command sent by the client, channel shared with myRead
*return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client
*/
func
performSet
(
conn
net
.
Conn
,
tokens
[]
string
,
table
*
KeyValueStore
,
ch
chan
[]
byte
)
(
uint64
,
bool
,
bool
)
{
func
performSet
(
conn
net
.
Conn
,
tokens
[]
string
,
ch
chan
[]
byte
)
(
uint64
,
bool
,
bool
)
{
k
:=
tokens
[
0
]
//expiry time offset
e
,
_
:=
strconv
.
ParseUint
(
tokens
[
1
],
10
,
64
)
...
...
@@ -473,10 +473,10 @@ func performSet(conn net.Conn, tokens []string, table *KeyValueStore, ch chan []
}
/*Delegate function reponsible for activities related to the GET command sent by the client.
*arguments: client connection, tokenized command sent by the client
, pointer to hashtable structure
*arguments: client connection, tokenized command sent by the client
*return: pointer to value corresponding to the key given by client, success or failure
*/
func
performGet
(
conn
net
.
Conn
,
tokens
[]
string
,
table
*
KeyValueStore
)
(
*
Data
,
bool
)
{
func
performGet
(
conn
net
.
Conn
,
tokens
[]
string
)
(
*
Data
,
bool
)
{
k
:=
tokens
[
0
]
defer
table
.
Unlock
()
//lock because if key is expired, we'll delete it
...
...
@@ -498,10 +498,10 @@ func performGet(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, bo
}
/*Delegate function reponsible for activities related to the GETM command sent by the client.
*arguments: client connection, tokenized command sent by the client
, pointer to hashtable structure
*arguments: client connection, tokenized command sent by the client
*return: pointer to value corresponding to the key given by client, success or failure
*/
func
performGetm
(
conn
net
.
Conn
,
tokens
[]
string
,
table
*
KeyValueStore
)
(
*
Data
,
bool
)
{
func
performGetm
(
conn
net
.
Conn
,
tokens
[]
string
)
(
*
Data
,
bool
)
{
k
:=
tokens
[
0
]
defer
table
.
Unlock
()
table
.
Lock
()
...
...
@@ -526,11 +526,11 @@ func performGetm(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, b
}
/*Delegate function reponsible for activities related to the CAS command sent by the client.
*arguments: client connection, tokenized command sent by the client,
pointer to hashtable structure,
channel shared with myRead
*arguments: client connection, tokenized command sent by the client, channel shared with myRead
*return: new version of updated key (if it is updated), error status {0: error while reading new value, 1: key found and changed,
*2: version mismatch with key, 3: key not found}, whether to reply to client
*/
func
performCas
(
conn
net
.
Conn
,
tokens
[]
string
,
table
*
KeyValueStore
,
ch
chan
[]
byte
)
(
uint64
,
int
,
bool
)
{
func
performCas
(
conn
net
.
Conn
,
tokens
[]
string
,
ch
chan
[]
byte
)
(
uint64
,
int
,
bool
)
{
k
:=
tokens
[
0
]
e
,
_
:=
strconv
.
ParseUint
(
tokens
[
1
],
10
,
64
)
ve
,
_
:=
strconv
.
ParseUint
(
tokens
[
2
],
10
,
64
)
...
...
@@ -580,10 +580,10 @@ func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan []
}
/*Delegate function reponsible for activities related to the DELETE command sent by the client.
*arguments: client connection, tokenized command sent by the client
, pointer to hashtable structure
*arguments: client connection, tokenized command sent by the client
*return: integer secifying error state {0: found and deleted, 1: found but expired (deleted but client told non-existent, 2: key not found}
*/
func
performDelete
(
conn
net
.
Conn
,
tokens
[]
string
,
table
*
KeyValueStore
)
int
{
func
performDelete
(
conn
net
.
Conn
,
tokens
[]
string
)
int
{
k
:=
tokens
[
0
]
logger
.
Println
(
tokens
)
flag
:=
1
...
...
@@ -604,10 +604,10 @@ func performDelete(conn net.Conn, tokens []string, table *KeyValueStore) int {
}
/*Simple function that dumps the contents of the hashtable
*arguments:
pointer to the hashtable structur
e
*arguments:
non
e
*return: none
*/
func
debug
(
table
*
KeyValueStore
)
{
func
debug
()
{
logger
.
Println
(
"----start debug----"
)
for
key
,
val
:=
range
(
*
table
)
.
dictionary
{
logger
.
Println
(
key
,
val
)
...
...
@@ -651,7 +651,28 @@ func CustomSplitter(data []byte, atEOF bool) (advance int, token []byte, err err
*arguments: none
*return: none
*/
func
main
()
{
//func main() {
// toLog := ""
// if len(os.Args) > 1 {
// toLog = os.Args[1]
// }
// //toLog = "s"
// if toLog != "" {
// logf, _ := os.OpenFile("serverlog.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
// defer logf.Close()
// logger = log.New(logf, "SERVER: ", log.Ltime|log.Lshortfile)
// //logger = log.New(os.Stdout, "SERVER: ", log.Ltime|log.Lshortfile)
// } else {
// logger = log.New(ioutil.Discard, "SERVER: ", log.Ldate)
// }
// go startServer()
// var input string
// fmt.Scanln(&input)
//}
func
InitKVStore
()
{
toLog
:=
""
if
len
(
os
.
Args
)
>
1
{
toLog
=
os
.
Args
[
1
]
...
...
@@ -667,9 +688,8 @@ func main() {
logger
=
log
.
New
(
ioutil
.
Discard
,
"SERVER: "
,
log
.
Ldate
)
}
go
startServer
()
var
input
string
fmt
.
Scanln
(
&
input
)
//initialize key value store
table
=
&
KeyValueStore
{
dictionary
:
make
(
map
[
string
]
*
Data
)}
}
//server will not call this, we'll call it from test cases to clear the map
...
...
assignment2/raft.go
View file @
6ebcc6a3
package
main
package
raft
import
(
"fmt"
...
...
@@ -211,7 +211,8 @@ func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *Reply) error {
return
nil
}
func
initializeInterServerCommunication
(
this_server
*
ServerConfig
)
{
func
InitializeRaft
(
this_server
*
ServerConfig
)
{
initializeLogger
(
this_server
.
Id
)
//initialize the logger
appendRpc
:=
new
(
AppendEntries
)
rpc
.
Register
(
appendRpc
)
listener
,
e
:=
net
.
Listen
(
"tcp"
,
":"
+
strconv
.
Itoa
(
this_server
.
LogPort
))
...
...
@@ -241,28 +242,28 @@ func initializeLogger(serverId int) {
Info
.
Println
(
"Initialized server"
)
}
func
main
()
{
server_id
,
err
:=
strconv
.
Atoi
(
os
.
Args
[
1
])
if
err
!=
nil
{
Info
.
Println
(
"argument "
,
os
.
Args
[
1
],
"is not string"
)
}
//
func main() {
//
server_id, err := strconv.Atoi(os.Args[1])
//
if err != nil {
//
Info.Println("argument ", os.Args[1], "is not string")
//
}
initializeLogger
(
server_id
)
Info
.
Println
(
"Start"
)
//
initializeLogger(server_id)
//
Info.Println("Start")
this_server
,
_
:=
NewServerConfig
(
server_id
)
//
this_server, _ := NewServerConfig(server_id)
num_servers
,
err2
:=
strconv
.
Atoi
((
os
.
Args
[
2
]))
if
err2
!=
nil
{
Info
.
Println
(
"argument "
,
os
.
Args
[
2
],
"is not string"
)
}
cluster_config
,
_
:=
NewClusterConfig
(
num_servers
)
//
num_servers, err2 := strconv.Atoi((os.Args[2]))
//
if err2 != nil {
//
Info.Println("argument ", os.Args[2], "is not string")
//
}
//
cluster_config, _ := NewClusterConfig(num_servers)
Info
.
Println
(
reflect
.
TypeOf
(
this_server
))
Info
.
Println
(
reflect
.
TypeOf
(
cluster_config
))
//
Info.Println(reflect.TypeOf(this_server))
//
Info.Println(reflect.TypeOf(cluster_config))
initializeInterServerCommunication
(
this_server
)
//
initializeInterServerCommunication(this_server)
var
dummy_input
string
fmt
.
Scanln
(
&
dummy_input
)
}
//
var dummy_input string
//
fmt.Scanln(&dummy_input)
//
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment