Commit e36c6dbe authored by Sushant Mahajan's avatar Sushant Mahajan

handling per client reply from kv store

parent 5e03357b
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"kvstore"
"net" "net"
"raft" "raft"
"strconv" "strconv"
...@@ -26,7 +25,7 @@ func readValue(ch chan []byte, n uint64) ([]byte, bool) { ...@@ -26,7 +25,7 @@ func readValue(ch chan []byte, n uint64) ([]byte, bool) {
up := make(chan bool, 1) up := make(chan bool, 1)
//after 5 seconds passed reading value, we'll just send err to client //after 5 seconds passed reading value, we'll just send err to client
go func() { go func() {
time.Sleep(kvstore.READ_TIMEOUT * time.Second) time.Sleep(5 * time.Second)
up <- true up <- true
}() }()
...@@ -113,15 +112,24 @@ func MyRead(ch chan []byte, conn net.Conn) { ...@@ -113,15 +112,24 @@ func MyRead(ch chan []byte, conn net.Conn) {
*/ */
func Write(conn net.Conn, msg string) { func Write(conn net.Conn, msg string) {
buf := []byte(msg) buf := []byte(msg)
buf = append(buf, []byte(kvstore.CRLF)...) buf = append(buf, []byte("\r\n")...)
conn.Write(buf) conn.Write(buf)
} }
func ReplyToClient(conn net.Conn, writeCh chan []byte) {
for {
w := <-writeCh
Write(conn, string(w))
}
}
func HandleClient(conn net.Conn, rft *raft.Raft) { func HandleClient(conn net.Conn, rft *raft.Raft) {
defer conn.Close() defer conn.Close()
//channel for every connection for every client //channel for every connection for every client
ch := make(chan []byte) ch := make(chan []byte)
writeCh := make(chan []byte)
go MyRead(ch, conn) go MyRead(ch, conn)
go ReplyToClient(conn, writeCh)
for { for {
command := new(utils.Command) command := new(utils.Command)
...@@ -133,18 +141,18 @@ func HandleClient(conn net.Conn, rft *raft.Raft) { ...@@ -133,18 +141,18 @@ func HandleClient(conn net.Conn, rft *raft.Raft) {
flag := false flag := false
nr := uint64(0) nr := uint64(0)
tokens := strings.Fields(string(msg)) tokens := strings.Fields(string(msg))
if kvstore.IsCas(tokens[0]) { if tokens[0] == "CAS" {
n, _ := strconv.ParseUint(tokens[3], 10, 64) n, _ := strconv.ParseUint(tokens[3], 10, 64)
nr = n nr = n
flag = true flag = true
} else if kvstore.IsSet(tokens[0]) { } else if tokens[0] == "SET" {
n, _ := strconv.ParseUint(tokens[2], 10, 64) n, _ := strconv.ParseUint(tokens[2], 10, 64)
nr = n nr = n
flag = true flag = true
} }
if flag { if flag {
if v, err := readValue(ch, nr); err { if v, err := readValue(ch, nr); err {
Write(conn, kvstore.ERR_CMD_ERR) Write(conn, "ERR_CMD_ERR")
} else { } else {
command.Val = v command.Val = v
//command.isVal = true //command.isVal = true
...@@ -159,6 +167,6 @@ func HandleClient(conn net.Conn, rft *raft.Raft) { ...@@ -159,6 +167,6 @@ func HandleClient(conn net.Conn, rft *raft.Raft) {
//log.Fatal("encode error:", err) //log.Fatal("encode error:", err)
} }
rft.Append(buffer.Bytes()) rft.Append(buffer.Bytes(), writeCh)
} }
} }
package kvstore package raft
import ( import (
"bytes" "bytes"
...@@ -61,50 +61,6 @@ var logger *log.Logger ...@@ -61,50 +61,6 @@ var logger *log.Logger
//cache //cache
var table *KeyValueStore var table *KeyValueStore
/*Function to start the server and accept connections.
*arguments: none
*return: none
*/
//func startServer() {
// logger.Println("Server started")
// listener, err := net.Listen("tcp", ":5000")
// if err != nil {
// logger.Println("Could not start server!")
// }
// //initialize key value store
// table = &KeyValueStore{dictionary: make(map[string]*Data)}
// //infinite loop
// for {
// conn, err := listener.Accept()
// if err != nil {
// logger.Println(err)
// continue
// }
// go handleClient(conn) //client connection handler
// }
//}
/*Function to read data from the connection and put it on the channel so it could be read in a systematic fashion.
*arguments: channel shared between this go routine and other functions performing actions based on the commands given, client connection
*return: none
*/
//func myRead(ch chan []byte, conn net.Conn) {
// scanner := bufio.NewScanner(conn)
// scanner.Split(CustomSplitter)
// for {
// if ok := scanner.Scan(); !ok {
// break
// } else {
// temp := scanner.Bytes()
// ch <- temp
// logger.Println(temp, "$$")
// }
// }
//}
/*Simple write function to send information to the client /*Simple write function to send information to the client
*arguments: client connection, msg to send to the client *arguments: client connection, msg to send to the client
*return: none *return: none
...@@ -112,30 +68,9 @@ var table *KeyValueStore ...@@ -112,30 +68,9 @@ var table *KeyValueStore
func write(conn net.Conn, msg string) { func write(conn net.Conn, msg string) {
buf := []byte(msg) buf := []byte(msg)
buf = append(buf, []byte(CRLF)...) buf = append(buf, []byte(CRLF)...)
//logger.Println(buf, len(buf))
conn.Write(buf) conn.Write(buf)
} }
/*After initial establishment of the connection with the client, this go routine handles further interaction
*arguments: client connection
*return: none
*/
//func handleClient(conn net.Conn) {
// defer conn.Close()
// //channel for every connection for every client
// ch := make(chan []byte)
// go myRead(ch, conn)
// for {
// msg := <-ch
// logger.Println("Channel: ", msg, string(msg))
// if len(msg) == 0 {
// continue
// }
// parseInput(conn, string(msg), table, ch)
// }
//}
/*Basic validations for various commands /*Basic validations for various commands
*arguments: command to check against, other parmameters sent with the command (excluding the value), client connection *arguments: command to check against, other parmameters sent with the command (excluding the value), client connection
*return: integer representing error state *return: integer representing error state
...@@ -568,31 +503,6 @@ func debug() { ...@@ -568,31 +503,6 @@ func debug() {
logger.Println("----end debug----") logger.Println("----end debug----")
} }
/*Entry point of this program. Initializes the start of ther server and sets up the logger.
*arguments: none
*return: none
*/
//func main() {
// toLog := ""
// if len(os.Args) > 1 {
// toLog = os.Args[1]
// }
// //toLog = "s"
// if toLog != "" {
// logf, _ := os.OpenFile("serverlog.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
// defer logf.Close()
// logger = log.New(logf, "SERVER: ", log.Ltime|log.Lshortfile)
// //logger = log.New(os.Stdout, "SERVER: ", log.Ltime|log.Lshortfile)
// } else {
// logger = log.New(ioutil.Discard, "SERVER: ", log.Ldate)
// }
// go startServer()
// var input string
// fmt.Scanln(&input)
//}
func InitKVStore() { func InitKVStore() {
toLog := "" toLog := ""
if len(os.Args) > 1 { if len(os.Args) > 1 {
...@@ -613,12 +523,10 @@ func InitKVStore() { ...@@ -613,12 +523,10 @@ func InitKVStore() {
table = &KeyValueStore{dictionary: make(map[string]*Data)} table = &KeyValueStore{dictionary: make(map[string]*Data)}
} }
func IsCas(msg string) bool { func MonitorCommitChannel(ch chan LogEntry, conn) {
return msg == CAS for {
} //do some shit
}
func IsSet(msg string) bool {
return msg == SET
} }
//server will not call this, we'll call it from test cases to clear the map //server will not call this, we'll call it from test cases to clear the map
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
//constant values used //constant values used
const ( const (
CLIENT_PORT = 9000 CLIENT_PORT = 9000
LOG_PORT = 8000
ACK_TIMEOUT = 5 ACK_TIMEOUT = 5
) )
...@@ -19,21 +20,9 @@ var Info *log.Logger ...@@ -19,21 +20,9 @@ var Info *log.Logger
// Flag for enabling/disabling logging functionality // Flag for enabling/disabling logging functionality
var DEBUG = true var DEBUG = true
type Lsn uint64 //Log sequence number, unique for all time.
type ErrRedirect int // See Log.Append. Implements Error interface. type ErrRedirect int // See Log.Append. Implements Error interface.
type LogEntry interface { type Lsn uint64 //Log sequence number, unique for all time.
Lsn() Lsn
Data() []byte
Committed() bool
}
type LogEntryData struct {
id Lsn
data []byte
committed bool
}
type ServerConfig struct { type ServerConfig struct {
Id int // Id of server. Must be unique Id int // Id of server. Must be unique
...@@ -58,6 +47,18 @@ type Raft struct { ...@@ -58,6 +47,18 @@ type Raft struct {
id int //this server id id int //this server id
} }
type LogEntry interface {
Lsn() Lsn
Data() []byte
Committed() bool
}
type LogEntryData struct {
id Lsn
data []byte
committed bool
}
type Args struct { type Args struct {
X int X int
} }
...@@ -78,18 +79,8 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (* ...@@ -78,18 +79,8 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*
return rft, nil return rft, nil
} }
//goroutine that monitors channel for commiting log entry
func monitorCommitChannel(raft *Raft) { //unidirectional -- can only read from the channel
for {
//var temp LogEntry
temp := <-raft.commitCh //receive from the channel
raft.log_array[temp.(*LogEntryData).id].committed = true //commit the value
//update the kv store here
}
}
//goroutine that monitors channel to check if the majority of servers have replied //goroutine that monitors channel to check if the majority of servers have replied
func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majEventCh chan int) { func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) {
acks_received := 0 acks_received := 0
num_servers := len(rft.cluster_config.Servers) num_servers := len(rft.cluster_config.Servers)
required_acks := num_servers / 2 required_acks := num_servers / 2
...@@ -106,8 +97,9 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majEven ...@@ -106,8 +97,9 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majEven
case temp := <-ack_ch: case temp := <-ack_ch:
acks_received += temp acks_received += temp
if acks_received == required_acks { if acks_received == required_acks {
rft.log_array[log_entry.(*LogEntryData).id].committed = true
rft.commitCh <- log_entry rft.commitCh <- log_entry
majEventCh <- 1 majCh <- true
err = true err = true
break break
} }
...@@ -136,20 +128,19 @@ func (entry *LogEntryData) Committed() bool { ...@@ -136,20 +128,19 @@ func (entry *LogEntryData) Committed() bool {
} }
//make raft implement the append function //make raft implement the append function
func (raft *Raft) Append(data []byte) (LogEntry, error) { func (rft *Raft) Append(data []byte, writeCh chan []byte) (LogEntry, error) {
if raft.id != 1 { if rft.id != 1 {
return nil, ErrRedirect(1) return nil, ErrRedirect(1)
} }
temp := new(LogEntryData) temp := new(LogEntryData)
temp.id = 1 temp.id = 1
temp.committed = false temp.committed = false
temp.data = data temp.data = data
raft.log_array = append(raft.log_array, temp) rft.log_array = append(rft.log_array, temp)
ackChan := make(chan int) ackChan := make(chan int)
majEventCh := make(chan int) majChan := make(chan bool)
go monitorAckChannel(raft, ackChan, temp, majEventCh) go monitorAckChannel(rft, ackChan, temp, majChan)
go monitorCommitChannel(raft)
for _, server := range cluster_config.Servers[1:] { for _, server := range cluster_config.Servers[1:] {
go func(ackChan chan int) { go func(ackChan chan int) {
...@@ -164,19 +155,20 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) { ...@@ -164,19 +155,20 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) {
ackChan <- reply.X ackChan <- reply.X
}(ackChan) }(ackChan)
} }
//channel will return 1 if majority
if <-majEventCh == 1 { if <-majChan {
raft.commitCh <- temp //
} }
return temp, nil return temp, nil
} }
func NewServerConfig(server_id int) (*ServerConfig, error) { func NewServerConfig(serverId int) (*ServerConfig, error) {
server := new(ServerConfig) server := new(ServerConfig)
server.Id = server_id server.Id = serverId
server.Hostname = "127.0.0.1" server.Hostname = "127.0.0.1"
server.ClientPort = CLIENT_PORT server.ClientPort = CLIENT_PORT + serverId
server.LogPort = CLIENT_PORT + server_id server.LogPort = LOG_PORT + serverId
return server, nil return server, nil
} }
......
...@@ -3,7 +3,6 @@ package main ...@@ -3,7 +3,6 @@ package main
import ( import (
"connhandler" "connhandler"
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net" "net"
...@@ -34,7 +33,7 @@ func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *Reply) error { ...@@ -34,7 +33,7 @@ func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *Reply) error {
return nil return nil
} }
func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft) { func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) {
appendRpc := new(AppendEntries) appendRpc := new(AppendEntries)
rpc.Register(appendRpc) rpc.Register(appendRpc)
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.LogPort)) listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.LogPort))
...@@ -49,6 +48,7 @@ func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft) { ...@@ -49,6 +48,7 @@ func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft) {
go rpc.ServeConn(conn) go rpc.ServeConn(conn)
} }
} }
ch <- true
} }
// Initialize Logger // Initialize Logger
...@@ -64,7 +64,7 @@ func initLogger(serverId int) { ...@@ -64,7 +64,7 @@ func initLogger(serverId int) {
Info.Println("Initialized server") Info.Println("Initialized server")
} }
func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft) { func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) {
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort)) listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort))
if e != nil { if e != nil {
Info.Fatal("client listen error:", e) Info.Fatal("client listen error:", e)
...@@ -77,10 +77,13 @@ func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft) { ...@@ -77,10 +77,13 @@ func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft) {
go connhandler.HandleClient(conn, rft) go connhandler.HandleClient(conn, rft)
} }
} }
ch <- true
} }
func main() { func main() {
sid, err := strconv.Atoi(os.Args[1]) sid, err := strconv.Atoi(os.Args[1])
ch1 := make(chan bool)
ch2 := make(chan bool)
if err != nil { if err != nil {
Info.Println("argument ", os.Args[1], "is not string") Info.Println("argument ", os.Args[1], "is not string")
} }
...@@ -99,9 +102,10 @@ func main() { ...@@ -99,9 +102,10 @@ func main() {
rft, _ := raft.NewRaft(clusterConfig, sid, commitCh) rft, _ := raft.NewRaft(clusterConfig, sid, commitCh)
initClientCommunication(server, rft) go initClientCommunication(server, rft, ch1)
initInterServerCommunication(server, rft) go initInterServerCommunication(server, rft, ch2)
var dummy string for <-ch1 && <-ch2 {
fmt.Scanln(&dummy)
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment