Commit 35ce3a0f authored by Sushant Mahajan's avatar Sushant Mahajan

added separate handler

parent 8a91a8a3
package handler
/*
*Helper function to read value or cause timeout after READ_TIMEOUT seconds
*parameters: channel to read data from, threshold number of bytes to read
*returns: the value string and error state
*/
func readValue(ch chan []byte, n uint64) ([]byte, bool) {
//now we need to read the value which should have been sent
valReadLength := uint64(0)
var v []byte
err := false
up := make(chan bool, 1)
//after 5 seconds passed reading value, we'll just send err to client
go func() {
time.Sleep(READ_TIMEOUT * time.Second)
up <- true
}()
//use select for the data channel and the timeout channel
for valReadLength < n+2 {
select {
case temp := <-ch:
valReadLength += uint64(len(temp))
if valReadLength > n+2 {
err = true
break
}
v = append(v, temp...)
case <-up:
err = true
break
}
//will be true if timeout occurs
if err {
break
}
}
if err {
return []byte{0}, err
}
return v[:n], err
}
/*Copied from the bufio.Scanner (originally ScanLines). By default it splits by '\n' but now we want it to split by '\r\n'
*arguments: data in bytes, is eof reached
*return: next sequence of bytes, chunk of data found, err state
*/
func CustomSplitter(data []byte, atEOF bool) (advance int, token []byte, err error) {
omega := 0
if atEOF && len(data) == 0 {
return 0, nil, nil
}
for {
if i := bytes.IndexByte(data[omega:], '\n'); i >= 0 {
//here we add omega as we are using the complete data array instead of the slice where we found '\n'
if i > 0 && data[omega+i-1] == '\r' {
//next byte begins at i+1 and data[0:i+1] returned
return omega + i + 1, data[:omega+i+1], nil
} else {
//move the omega index to the byte after \n
omega += i + 1
}
} else {
//need to break free the chains
break
}
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
/*Function to read data from the connection and put it on the channel so it could be read in a systematic fashion.
*arguments: channel shared between this go routine and other functions performing actions based on the commands given, client connection
*return: none
*/
func MyRead(ch chan []byte, conn net.Conn) {
scanner := bufio.NewScanner(conn)
scanner.Split(CustomSplitter)
for {
if ok := scanner.Scan(); !ok {
break
} else {
temp := scanner.Bytes()
ch <- temp
}
}
}
/*Simple write function to send information to the client
*arguments: client connection, msg to send to the client
*return: none
*/
func Write(conn net.Conn, msg string) {
buf := []byte(msg)
buf = append(buf, []byte(CRLF)...)
conn.Write(buf)
}
func handleClient(conn net.Conn) {
defer conn.Close()
//channel for every connection for every client
ch := make(chan []byte)
go myRead(ch, conn)
for {
msg := <-ch
if len(msg) == 0 {
continue
}
//kvstore.ParseInput(conn, string(msg), table, ch)
}
}
......@@ -93,50 +93,50 @@ var table *KeyValueStore
*arguments: channel shared between this go routine and other functions performing actions based on the commands given, client connection
*return: none
*/
func myRead(ch chan []byte, conn net.Conn) {
scanner := bufio.NewScanner(conn)
scanner.Split(CustomSplitter)
for {
if ok := scanner.Scan(); !ok {
break
} else {
temp := scanner.Bytes()
ch <- temp
logger.Println(temp, "$$")
}
}
}
//func myRead(ch chan []byte, conn net.Conn) {
// scanner := bufio.NewScanner(conn)
// scanner.Split(CustomSplitter)
// for {
// if ok := scanner.Scan(); !ok {
// break
// } else {
// temp := scanner.Bytes()
// ch <- temp
// logger.Println(temp, "$$")
// }
// }
//}
/*Simple write function to send information to the client
*arguments: client connection, msg to send to the client
*return: none
*/
func write(conn net.Conn, msg string) {
buf := []byte(msg)
buf = append(buf, []byte(CRLF)...)
logger.Println(buf, len(buf))
conn.Write(buf)
}
///*Simple write function to send information to the client
// *arguments: client connection, msg to send to the client
// *return: none
// */
//func write(conn net.Conn, msg string) {
// buf := []byte(msg)
// buf = append(buf, []byte(CRLF)...)
// logger.Println(buf, len(buf))
// conn.Write(buf)
//}
/*After initial establishment of the connection with the client, this go routine handles further interaction
*arguments: client connection
*return: none
*/
func handleClient(conn net.Conn) {
defer conn.Close()
//channel for every connection for every client
ch := make(chan []byte)
go myRead(ch, conn)
for {
msg := <-ch
logger.Println("Channel: ", msg, string(msg))
if len(msg) == 0 {
continue
}
parseInput(conn, string(msg), table, ch)
}
}
//func handleClient(conn net.Conn) {
// defer conn.Close()
// //channel for every connection for every client
// ch := make(chan []byte)
// go myRead(ch, conn)
// for {
// msg := <-ch
// logger.Println("Channel: ", msg, string(msg))
// if len(msg) == 0 {
// continue
// }
// parseInput(conn, string(msg), table, ch)
// }
//}
/*Basic validations for various commands
*arguments: command to check against, other parmameters sent with the command (excluding the value), client connection
......@@ -251,7 +251,7 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
*arguments: client connection, message from client, channel shared with myRead function
*return: none
*/
func parseInput(conn net.Conn, msg string, ch chan []byte) {
func ParseInput(conn net.Conn, msg string, ch chan []byte) {
tokens := strings.Fields(msg)
//general error, don't check for commands, avoid the pain ;)
if len(tokens) > MAX_CMD_ARGS || len(tokens) < MIN_CMD_ARGS {
......@@ -379,53 +379,6 @@ func parseInput(conn net.Conn, msg string, ch chan []byte) {
}
}
/*
*Helper function to read value or cause timeout after READ_TIMEOUT seconds
*parameters: channel to read data from, threshold number of bytes to read
*returns: the value string and error state
*/
func readValue(ch chan []byte, n uint64) ([]byte, bool) {
//now we need to read the value which should have been sent
valReadLength := uint64(0)
var v []byte
err := false
up := make(chan bool, 1)
//after 5 seconds passed reading value, we'll just send err to client
go func() {
time.Sleep(READ_TIMEOUT * time.Second)
up <- true
}()
//use select for the data channel and the timeout channel
for valReadLength < n+2 {
select {
case temp := <-ch:
logger.Println("Value chunk read!")
valReadLength += uint64(len(temp))
if valReadLength > n+2 {
err = true
break
}
v = append(v, temp...)
case <-up:
err = true
logger.Println("Oh, Oh timeout")
break
}
//will be true if timeout occurs
if err {
break
}
}
if err {
return []byte{0}, err
}
return v[:n], err
}
/*Delegate function responsible for all parsing and hashtable interactions for the SET command sent by client
*arguments: client connection, tokenized command sent by the client, channel shared with myRead
*return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client
......@@ -615,38 +568,6 @@ func debug() {
logger.Println("----end debug----")
}
/*Copied from the bufio.Scanner (originally ScanLines). By default it splits by '\n' but now we want it to split by '\r\n'
*arguments: data in bytes, is eof reached
*return: next sequence of bytes, chunk of data found, err state
*/
func CustomSplitter(data []byte, atEOF bool) (advance int, token []byte, err error) {
omega := 0
if atEOF && len(data) == 0 {
return 0, nil, nil
}
for {
if i := bytes.IndexByte(data[omega:], '\n'); i >= 0 {
//here we add omega as we are using the complete data array instead of the slice where we found '\n'
if i > 0 && data[omega+i-1] == '\r' {
//next byte begins at i+1 and data[0:i+1] returned
return omega + i + 1, data[:omega+i+1], nil
} else {
//move the omega index to the byte after \n
omega += i + 1
}
} else {
//need to break free the chains
break
}
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
/*Entry point of this program. Initializes the start of ther server and sets up the logger.
*arguments: none
*return: none
......@@ -692,6 +613,13 @@ func InitKVStore() {
table = &KeyValueStore{dictionary: make(map[string]*Data)}
}
func IsCasOrSet(msg string) bool {
tokens := strings.Fields(msg)
if len(tokens) >= 1 {
return tokens[0] == SET || tokens[0] == CAS
}
}
//server will not call this, we'll call it from test cases to clear the map
func ReInitServer() {
defer table.Unlock()
......
......@@ -202,17 +202,16 @@ func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(0)
}
func start_rpc(this_server *ServerConfig) {
//rpc.Register()
}
//func start_rpc(this_server *ServerConfig) {
// //rpc.Register()
//}
func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *Reply) error {
reply.X = args.X
return nil
}
func InitializeRaft(this_server *ServerConfig) {
initializeLogger(this_server.Id) //initialize the logger
func initInterServerCommunication(this_server *ServerConfig) {
appendRpc := new(AppendEntries)
rpc.Register(appendRpc)
listener, e := net.Listen("tcp", ":"+strconv.Itoa(this_server.LogPort))
......@@ -230,7 +229,7 @@ func InitializeRaft(this_server *ServerConfig) {
}
// Initialize Logger
func initializeLogger(serverId int) {
func initLogger(serverId int) {
// Logger Initializaion
if !DEBUG {
Info = log.New(ioutil.Discard, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
......@@ -242,28 +241,41 @@ func initializeLogger(serverId int) {
Info.Println("Initialized server")
}
//func main() {
// server_id, err := strconv.Atoi(os.Args[1])
// if err != nil {
// Info.Println("argument ", os.Args[1], "is not string")
// }
func initClientCommunication(server) {
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort))
if e != nil {
Info.Fatal("client listen error:", e)
}
for {
if conn, err := listener.Accept(); err != nil {
Info.Fatal("client accept error: " + err.Error())
} else {
Info.Printf("client new connection established\n")
go handleClient(conn)
}
}
}
// initializeLogger(server_id)
// Info.Println("Start")
func main() {
sid, err := strconv.Atoi(os.Args[1])
if err != nil {
Info.Println("argument ", os.Args[1], "is not string")
}
// this_server, _ := NewServerConfig(server_id)
initLogger(sid)
Info.Println("Start")
// num_servers, err2 := strconv.Atoi((os.Args[2]))
// if err2 != nil {
// Info.Println("argument ", os.Args[2], "is not string")
// }
// cluster_config, _ := NewClusterConfig(num_servers)
server, _ := NewServerConfig(sid)
// Info.Println(reflect.TypeOf(this_server))
// Info.Println(reflect.TypeOf(cluster_config))
serverCount, err2 := strconv.Atoi((os.Args[2]))
if err2 != nil {
Info.Println("argument ", os.Args[2], "is not string")
}
cluster_config, _ := NewClusterConfig(serverCount)
// initializeInterServerCommunication(this_server)
initClientCommunication(server)
initInterServerCommunication(server)
// var dummy_input string
// fmt.Scanln(&dummy_input)
//}
var dummy string
fmt.Scanln(&dummy)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment