Commit 5e03357b authored by Bharath Radhakrishnan's avatar Bharath Radhakrishnan

fixed compilation issues and general housekeeping

parent 40cf442d
package handler package connhandler
import (
"bufio"
"bytes"
"encoding/gob"
"kvstore"
"net"
"raft"
"strconv"
"strings"
"time"
"utils"
)
/* /*
*Helper function to read value or cause timeout after READ_TIMEOUT seconds *Helper function to read value or cause timeout after READ_TIMEOUT seconds
...@@ -13,7 +26,7 @@ func readValue(ch chan []byte, n uint64) ([]byte, bool) { ...@@ -13,7 +26,7 @@ func readValue(ch chan []byte, n uint64) ([]byte, bool) {
up := make(chan bool, 1) up := make(chan bool, 1)
//after 5 seconds passed reading value, we'll just send err to client //after 5 seconds passed reading value, we'll just send err to client
go func() { go func() {
time.Sleep(READ_TIMEOUT * time.Second) time.Sleep(kvstore.READ_TIMEOUT * time.Second)
up <- true up <- true
}() }()
...@@ -100,21 +113,52 @@ func MyRead(ch chan []byte, conn net.Conn) { ...@@ -100,21 +113,52 @@ func MyRead(ch chan []byte, conn net.Conn) {
*/ */
func Write(conn net.Conn, msg string) { func Write(conn net.Conn, msg string) {
buf := []byte(msg) buf := []byte(msg)
buf = append(buf, []byte(CRLF)...) buf = append(buf, []byte(kvstore.CRLF)...)
conn.Write(buf) conn.Write(buf)
} }
func handleClient(conn net.Conn) { func HandleClient(conn net.Conn, rft *raft.Raft) {
defer conn.Close() defer conn.Close()
//channel for every connection for every client //channel for every connection for every client
ch := make(chan []byte) ch := make(chan []byte)
go myRead(ch, conn) go MyRead(ch, conn)
for { for {
command := new(utils.Command)
msg := <-ch msg := <-ch
if len(msg) == 0 { if len(msg) == 0 {
continue continue
} }
//kvstore.ParseInput(conn, string(msg), table, ch) command.Cmd = msg
flag := false
nr := uint64(0)
tokens := strings.Fields(string(msg))
if kvstore.IsCas(tokens[0]) {
n, _ := strconv.ParseUint(tokens[3], 10, 64)
nr = n
flag = true
} else if kvstore.IsSet(tokens[0]) {
n, _ := strconv.ParseUint(tokens[2], 10, 64)
nr = n
flag = true
}
if flag {
if v, err := readValue(ch, nr); err {
Write(conn, kvstore.ERR_CMD_ERR)
} else {
command.Val = v
//command.isVal = true
}
}
buffer := new(bytes.Buffer)
// writing
enc := gob.NewEncoder(buffer)
err := enc.Encode(command)
if err != nil {
//log.Fatal("encode error:", err)
}
rft.Append(buffer.Bytes())
} }
} }
package kvstore package kvstore
import ( import (
"bufio"
"bytes" "bytes"
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
...@@ -107,16 +105,16 @@ var table *KeyValueStore ...@@ -107,16 +105,16 @@ var table *KeyValueStore
// } // }
//} //}
///*Simple write function to send information to the client /*Simple write function to send information to the client
// *arguments: client connection, msg to send to the client *arguments: client connection, msg to send to the client
// *return: none *return: none
// */ */
//func write(conn net.Conn, msg string) { func write(conn net.Conn, msg string) {
// buf := []byte(msg) buf := []byte(msg)
// buf = append(buf, []byte(CRLF)...) buf = append(buf, []byte(CRLF)...)
// logger.Println(buf, len(buf)) //logger.Println(buf, len(buf))
// conn.Write(buf) conn.Write(buf)
//} }
/*After initial establishment of the connection with the client, this go routine handles further interaction /*After initial establishment of the connection with the client, this go routine handles further interaction
*arguments: client connection *arguments: client connection
...@@ -268,7 +266,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) { ...@@ -268,7 +266,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
if isValid(SET, tokens, conn) != 0 { if isValid(SET, tokens, conn) != 0 {
return return
} }
if ver, ok, r := performSet(conn, tokens[1:len(tokens)], table, ch); ok { if ver, ok, r := performSet(conn, tokens[1:len(tokens)], ch); ok {
//debug(table) //debug(table)
logger.Println(ver) logger.Println(ver)
if r { if r {
...@@ -284,7 +282,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) { ...@@ -284,7 +282,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
if isValid(GET, tokens, conn) != 0 { if isValid(GET, tokens, conn) != 0 {
return return
} }
if data, ok := performGet(conn, tokens[1:len(tokens)], table); ok { if data, ok := performGet(conn, tokens[1:len(tokens)]); ok {
logger.Println("sending", tokens[1], "data") logger.Println("sending", tokens[1], "data")
buffer.Reset() buffer.Reset()
buffer.WriteString(VALUE) buffer.WriteString(VALUE)
...@@ -305,7 +303,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) { ...@@ -305,7 +303,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
if isValid(GETM, tokens, conn) != 0 { if isValid(GETM, tokens, conn) != 0 {
return return
} }
if data, ok := performGetm(conn, tokens[1:len(tokens)], table); ok { if data, ok := performGetm(conn, tokens[1:len(tokens)]); ok {
logger.Println("sending", tokens[1], "metadata") logger.Println("sending", tokens[1], "metadata")
buffer.Reset() buffer.Reset()
buffer.WriteString(VALUE) buffer.WriteString(VALUE)
...@@ -334,7 +332,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) { ...@@ -334,7 +332,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
if isValid(CAS, tokens, conn) != 0 { if isValid(CAS, tokens, conn) != 0 {
return return
} }
if ver, ok, r := performCas(conn, tokens[1:len(tokens)], table, ch); r { if ver, ok, r := performCas(conn, tokens[1:len(tokens)], ch); r {
if r { if r {
switch ok { switch ok {
case 0: case 0:
...@@ -365,7 +363,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) { ...@@ -365,7 +363,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
if isValid(DELETE, tokens, conn) != 0 { if isValid(DELETE, tokens, conn) != 0 {
return return
} }
if ok := performDelete(conn, tokens[1:len(tokens)], table); ok == 0 { if ok := performDelete(conn, tokens[1:len(tokens)]); ok == 0 {
write(conn, DELETED) write(conn, DELETED)
} else { } else {
write(conn, ERR_NOT_FOUND) write(conn, ERR_NOT_FOUND)
...@@ -384,11 +382,11 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) { ...@@ -384,11 +382,11 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
*return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client *return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client
*/ */
func performSet(conn net.Conn, tokens []string, ch chan []byte) (uint64, bool, bool) { func performSet(conn net.Conn, tokens []string, ch chan []byte) (uint64, bool, bool) {
k := tokens[0] //-k := tokens[0]
//expiry time offset //expiry time offset
e, _ := strconv.ParseUint(tokens[1], 10, 64) //-e, _ := strconv.ParseUint(tokens[1], 10, 64)
//numbytes //numbytes
n, _ := strconv.ParseUint(tokens[2], 10, 64) //-n, _ := strconv.ParseUint(tokens[2], 10, 64)
r := true r := true
if len(tokens) == 4 && tokens[3] == NOREPLY { if len(tokens) == 4 && tokens[3] == NOREPLY {
...@@ -397,32 +395,33 @@ func performSet(conn net.Conn, tokens []string, ch chan []byte) (uint64, bool, b ...@@ -397,32 +395,33 @@ func performSet(conn net.Conn, tokens []string, ch chan []byte) (uint64, bool, b
logger.Println(r) logger.Println(r)
if v, err := readValue(ch, n); err { //if v, err := readValue(ch, n); err {
write(conn, ERR_CMD_ERR) // write(conn, ERR_CMD_ERR)
return 0, false, r // return 0, false, r
} else { //} else {
defer table.Unlock() // defer table.Unlock()
table.Lock() // table.Lock()
//critical section start // //critical section start
var val *Data // var val *Data
if _, ok := table.dictionary[k]; ok { // if _, ok := table.dictionary[k]; ok {
val = table.dictionary[k] // val = table.dictionary[k]
} else { // } else {
val = new(Data) // val = new(Data)
table.dictionary[k] = val // table.dictionary[k] = val
} // }
val.numBytes = n // val.numBytes = n
val.version++ // val.version++
if e == 0 { // if e == 0 {
val.isPerpetual = true // val.isPerpetual = true
val.expTime = 0 // val.expTime = 0
} else { // } else {
val.isPerpetual = false // val.isPerpetual = false
val.expTime = e + uint64(time.Now().Unix()) // val.expTime = e + uint64(time.Now().Unix())
} // }
val.value = v // val.value = v
return val.version, true, r // return val.version, true, r
} //}
return 2, true, true
} }
/*Delegate function reponsible for activities related to the GET command sent by the client. /*Delegate function reponsible for activities related to the GET command sent by the client.
...@@ -496,40 +495,41 @@ func performCas(conn net.Conn, tokens []string, ch chan []byte) (uint64, int, bo ...@@ -496,40 +495,41 @@ func performCas(conn net.Conn, tokens []string, ch chan []byte) (uint64, int, bo
} }
//read value //read value
if v, err := readValue(ch, n); err { //if v, err := readValue(ch, n); err {
return 0, 1, r // return 0, 1, r
} else { //} else {
defer table.Unlock() // defer table.Unlock()
table.Lock() // table.Lock()
if val, ok := table.dictionary[k]; ok { // if val, ok := table.dictionary[k]; ok {
if val.version == ve { // if val.version == ve {
if val.isPerpetual || val.expTime >= uint64(time.Now().Unix()) { // if val.isPerpetual || val.expTime >= uint64(time.Now().Unix()) {
//if expiry time is zero, key should not be deleted // //if expiry time is zero, key should not be deleted
if e == 0 { // if e == 0 {
val.isPerpetual = true // val.isPerpetual = true
val.expTime = 0 // val.expTime = 0
} else { // } else {
val.isPerpetual = false // val.isPerpetual = false
val.expTime = e + uint64(time.Now().Unix()) // val.expTime = e + uint64(time.Now().Unix())
} // }
val.numBytes = n // val.numBytes = n
val.version++ // val.version++
val.value = v // val.value = v
//key found and changed // //key found and changed
return val.version, 0, r // return val.version, 0, r
} else { // } else {
logger.Println("expired key found!") // logger.Println("expired key found!")
//version found but key expired, can delete key safely and tell client that it does not exist // //version found but key expired, can delete key safely and tell client that it does not exist
delete(table.dictionary, k) // delete(table.dictionary, k)
return 0, 3, r // return 0, 3, r
} // }
} // }
//version mismatch // //version mismatch
return 0, 2, r // return 0, 2, r
} // }
//key not found // //key not found
return 0, 3, r // return 0, 3, r
} //}
return 1, 1, true
} }
/*Delegate function reponsible for activities related to the DELETE command sent by the client. /*Delegate function reponsible for activities related to the DELETE command sent by the client.
...@@ -613,11 +613,12 @@ func InitKVStore() { ...@@ -613,11 +613,12 @@ func InitKVStore() {
table = &KeyValueStore{dictionary: make(map[string]*Data)} table = &KeyValueStore{dictionary: make(map[string]*Data)}
} }
func IsCasOrSet(msg string) bool { func IsCas(msg string) bool {
tokens := strings.Fields(msg) return msg == CAS
if len(tokens) >= 1 { }
return tokens[0] == SET || tokens[0] == CAS
} func IsSet(msg string) bool {
return msg == SET
} }
//server will not call this, we'll call it from test cases to clear the map //server will not call this, we'll call it from test cases to clear the map
......
package raft package raft
import ( import (
"fmt"
"io/ioutil"
"log" "log"
"net"
"net/rpc" "net/rpc"
"os"
"reflect"
"strconv" "strconv"
"time" "time"
) )
...@@ -201,81 +196,3 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) { ...@@ -201,81 +196,3 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) {
func (e ErrRedirect) Error() string { func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(0) return "Redirect to server " + strconv.Itoa(0)
} }
//func start_rpc(this_server *ServerConfig) {
// //rpc.Register()
//}
func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *Reply) error {
reply.X = args.X
return nil
}
func initInterServerCommunication(this_server *ServerConfig) {
appendRpc := new(AppendEntries)
rpc.Register(appendRpc)
listener, e := net.Listen("tcp", ":"+strconv.Itoa(this_server.LogPort))
if e != nil {
Info.Fatal("listen error:", e)
}
for {
if conn, err := listener.Accept(); err != nil {
Info.Fatal("accept error: " + err.Error())
} else {
Info.Printf("new connection established\n")
go rpc.ServeConn(conn)
}
}
}
// Initialize Logger
func initLogger(serverId int) {
// Logger Initializaion
if !DEBUG {
Info = log.New(ioutil.Discard, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
} else {
Info = log.New(os.Stdout, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
}
Info.Println("Initialized server")
}
func initClientCommunication(server) {
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort))
if e != nil {
Info.Fatal("client listen error:", e)
}
for {
if conn, err := listener.Accept(); err != nil {
Info.Fatal("client accept error: " + err.Error())
} else {
Info.Printf("client new connection established\n")
go handleClient(conn)
}
}
}
func main() {
sid, err := strconv.Atoi(os.Args[1])
if err != nil {
Info.Println("argument ", os.Args[1], "is not string")
}
initLogger(sid)
Info.Println("Start")
server, _ := NewServerConfig(sid)
serverCount, err2 := strconv.Atoi((os.Args[2]))
if err2 != nil {
Info.Println("argument ", os.Args[2], "is not string")
}
cluster_config, _ := NewClusterConfig(serverCount)
initClientCommunication(server)
initInterServerCommunication(server)
var dummy string
fmt.Scanln(&dummy)
}
...@@ -3,10 +3,8 @@ package main ...@@ -3,10 +3,8 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
//"log"
"os/exec" "os/exec"
"strconv" "strconv"
//"syscall"
) )
//constant values used //constant values used
...@@ -15,7 +13,7 @@ const ( ...@@ -15,7 +13,7 @@ const (
) )
func TestServersCommunic(i int) { func TestServersCommunic(i int) {
cmd := exec.Command("go", "run", "replic_kvstore.go", strconv.Itoa(i+1), strconv.Itoa(NUM_SERVERS)) cmd := exec.Command("go", "run", "server.go", strconv.Itoa(i+1), strconv.Itoa(NUM_SERVERS))
f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil { if err != nil {
fmt.Println("error opening file: %v", err) fmt.Println("error opening file: %v", err)
...@@ -25,11 +23,6 @@ func TestServersCommunic(i int) { ...@@ -25,11 +23,6 @@ func TestServersCommunic(i int) {
cmd.Stdout = f cmd.Stdout = f
cmd.Stderr = f cmd.Stderr = f
cmd.Run() cmd.Run()
/*if err != nil {
fmt.Println(err)
}
fmt.Println(string(out))*/
} }
func main() { func main() {
......
...@@ -2,18 +2,15 @@ ...@@ -2,18 +2,15 @@
package main package main
import ( import (
"cs733/assignment2/handler" "connhandler"
"cs733/assignment2/kvstore"
"cs733/assignment2/raft"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
"net/rpc" "net/rpc"
"os" "os"
"reflect" "raft"
"strconv" "strconv"
"time"
) )
// Logger // Logger
...@@ -22,6 +19,8 @@ var Info *log.Logger ...@@ -22,6 +19,8 @@ var Info *log.Logger
// Flag for enabling/disabling logging functionality // Flag for enabling/disabling logging functionality
var DEBUG = true var DEBUG = true
type AppendEntries struct{}
type Args struct { type Args struct {
X int X int
} }
...@@ -30,17 +29,15 @@ type Reply struct { ...@@ -30,17 +29,15 @@ type Reply struct {
X int X int
} }
type AppendEntries struct{}
func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *Reply) error { func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *Reply) error {
reply.X = args.X reply.X = args.X
return nil return nil
} }
func initInterServerCommunication(server *raft.ServerConfig, rft *raft.SharedLog) { func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft) {
appendRpc := new(AppendEntries) appendRpc := new(AppendEntries)
rpc.Register(appendRpc) rpc.Register(appendRpc)
listener, e := net.Listen("tcp", ":"+strconv.Itoa(this_server.LogPort)) listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.LogPort))
if e != nil { if e != nil {
Info.Fatal("listen error:", e) Info.Fatal("listen error:", e)
} }
...@@ -67,7 +64,7 @@ func initLogger(serverId int) { ...@@ -67,7 +64,7 @@ func initLogger(serverId int) {
Info.Println("Initialized server") Info.Println("Initialized server")
} }
func initClientCommunication(server *raft.ServerConfig, rft *raft.SharedLog) { func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft) {
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort)) listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort))
if e != nil { if e != nil {
Info.Fatal("client listen error:", e) Info.Fatal("client listen error:", e)
...@@ -77,7 +74,7 @@ func initClientCommunication(server *raft.ServerConfig, rft *raft.SharedLog) { ...@@ -77,7 +74,7 @@ func initClientCommunication(server *raft.ServerConfig, rft *raft.SharedLog) {
Info.Fatal("client accept error: " + err.Error()) Info.Fatal("client accept error: " + err.Error())
} else { } else {
Info.Printf("client new connection established\n") Info.Printf("client new connection established\n")
go handler.HandleClient(conn, rft) go connhandler.HandleClient(conn, rft)
} }
} }
} }
...@@ -100,7 +97,7 @@ func main() { ...@@ -100,7 +97,7 @@ func main() {
clusterConfig, _ := raft.NewClusterConfig(serverCount) clusterConfig, _ := raft.NewClusterConfig(serverCount)
commitCh := make(chan raft.LogEntry) commitCh := make(chan raft.LogEntry)
rft := raft.NewRaft(clusterConfig, sid, commitCh) rft, _ := raft.NewRaft(clusterConfig, sid, commitCh)
initClientCommunication(server, rft) initClientCommunication(server, rft)
initInterServerCommunication(server, rft) initInterServerCommunication(server, rft)
......
...@@ -2,23 +2,23 @@ ...@@ -2,23 +2,23 @@
package utils package utils
import ( import (
"bytes"
"encoding/gob" "encoding/gob"
"fmt"
) )
type Command struct { type Command struct {
cmd []byte Cmd []byte
val []byte Val []byte
} }
func (d *Command) GobEncode() ([]byte, error) { func (d *Command) GobEncode() ([]byte, error) {
w := new(bytes.Buffer) w := new(bytes.Buffer)
encoder := gob.NewEncoder(w) encoder := gob.NewEncoder(w)
err := encoder.Encode(d.cmd) err := encoder.Encode(d.Cmd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = encoder.Encode(d.val) err = encoder.Encode(d.Val)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -28,9 +28,9 @@ func (d *Command) GobEncode() ([]byte, error) { ...@@ -28,9 +28,9 @@ func (d *Command) GobEncode() ([]byte, error) {
func (d *Command) GobDecode(buf []byte) error { func (d *Command) GobDecode(buf []byte) error {
r := bytes.NewBuffer(buf) r := bytes.NewBuffer(buf)
decoder := gob.NewDecoder(r) decoder := gob.NewDecoder(r)
err := decoder.Decode(&d.cmd) err := decoder.Decode(&d.Cmd)
if err != nil { if err != nil {
return err return err
} }
return decoder.Decode(&d.val) return decoder.Decode(&d.Val)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment