Commit a0cf72e5 authored by Sushant Mahajan's avatar Sushant Mahajan

modified server to accept any kind to value string including CR, LF etc

parent 8f6dc2ca
...@@ -35,6 +35,10 @@ const ( ...@@ -35,6 +35,10 @@ const (
ERR_NOT_FOUND = "ERR_NOT_FOUND" ERR_NOT_FOUND = "ERR_NOT_FOUND"
ERR_VERSION = "ERR_VERSION" ERR_VERSION = "ERR_VERSION"
ERR_INTERNAL = "ERR_INTERNAL" ERR_INTERNAL = "ERR_INTERNAL"
//constant
MAX_CMD_ARGS = 6
READ_TIMEOUT = 5
) )
//represents the value in the main hashtable (key, value) pair //represents the value in the main hashtable (key, value) pair
...@@ -88,13 +92,14 @@ func startServer() { ...@@ -88,13 +92,14 @@ func startServer() {
*arguments: channel shared between this go routine and other functions performing actions based on the commands given, client connection *arguments: channel shared between this go routine and other functions performing actions based on the commands given, client connection
*return: none *return: none
*/ */
func myRead(ch chan string, conn net.Conn) { func myRead(ch chan []byte, conn net.Conn) {
scanner := bufio.NewScanner(conn) scanner := bufio.NewScanner(conn)
scanner.Split(CustomSplitter)
for { for {
if ok := scanner.Scan(); !ok { if ok := scanner.Scan(); !ok {
break break
} else { } else {
temp := scanner.Text() temp := scanner.Bytes()
ch <- temp ch <- temp
logger.Println(temp, "$$") logger.Println(temp, "$$")
} }
...@@ -119,12 +124,12 @@ func write(conn net.Conn, msg string) { ...@@ -119,12 +124,12 @@ func write(conn net.Conn, msg string) {
func handleClient(conn net.Conn, table *KeyValueStore) { func handleClient(conn net.Conn, table *KeyValueStore) {
defer conn.Close() defer conn.Close()
//channel for every connection for every client //channel for every connection for every client
ch := make(chan string) ch := make(chan []byte)
go myRead(ch, conn) go myRead(ch, conn)
for { for {
msg := <-ch msg := <-ch
logger.Println("Channel: ", msg) logger.Println("Channel: ", msg, string(msg))
if len(msg) == 0 { if len(msg) == 0 {
continue continue
} }
...@@ -231,16 +236,16 @@ func isValid(cmd string, tokens []string, conn net.Conn) int { ...@@ -231,16 +236,16 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
*arguments: client connection, message from client, pointer to hashtable structure, channel shared with myRead function *arguments: client connection, message from client, pointer to hashtable structure, channel shared with myRead function
*return: none *return: none
*/ */
func parseInput(conn net.Conn, msg string, table *KeyValueStore, ch chan string) { func parseInput(conn net.Conn, msg string, table *KeyValueStore, ch chan []byte) {
tokens := strings.Fields(msg) tokens := strings.Fields(msg)
//general error, don't check for commands, avoid the pain ;) //general error, don't check for commands, avoid the pain ;)
if len(tokens) > 6 { if len(tokens) > MAX_CMD_ARGS {
write(conn, ERR_CMD_ERR) write(conn, ERR_CMD_ERR)
return return
} }
var buffer bytes.Buffer //for efficient string concatenation //for efficient string concatenation
//logger.Println(tokens) var buffer bytes.Buffer
switch tokens[0] { switch tokens[0] {
case SET: case SET:
if isValid(SET, tokens, conn) != 0 { if isValid(SET, tokens, conn) != 0 {
...@@ -362,30 +367,29 @@ func parseInput(conn net.Conn, msg string, table *KeyValueStore, ch chan string) ...@@ -362,30 +367,29 @@ func parseInput(conn net.Conn, msg string, table *KeyValueStore, ch chan string)
*parameters: channel to read data from, threshold number of bytes to read *parameters: channel to read data from, threshold number of bytes to read
*returns: the value string and error state *returns: the value string and error state
*/ */
func readValue(ch chan string, n uint64) ([]byte, bool) { func readValue(ch chan []byte, n uint64) ([]byte, bool) {
//now we need to read the value which should have been sent //now we need to read the value which should have been sent
valReadLength := uint64(0) valReadLength := uint64(0)
var v string var v []byte
err := false err := false
up := make(chan bool, 1) up := make(chan bool, 1)
//after 5 seconds passed reading value, we'll just send err to client //after 5 seconds passed reading value, we'll just send err to client
go func() { go func() {
time.Sleep(5 * time.Second) time.Sleep(READ_TIMEOUT * time.Second)
up <- true up <- true
}() }()
//use select for the data channel and the timeout channel //use select for the data channel and the timeout channel
for valReadLength < n { for valReadLength < n+2 {
select { select {
case temp := <-ch: case temp := <-ch:
logger.Println("Value chunk read!") logger.Println("Value chunk read!")
valReadLength += uint64(len(temp)) valReadLength += uint64(len(temp))
v += temp v = append(v, temp...)
case <-up: case <-up:
err = true err = true
logger.Println("Oh, Oh timeout") logger.Println("Oh, Oh timeout")
//write(conn, ERR_INTERNAL)
break break
} }
...@@ -398,17 +402,19 @@ func readValue(ch chan string, n uint64) ([]byte, bool) { ...@@ -398,17 +402,19 @@ func readValue(ch chan string, n uint64) ([]byte, bool) {
if err { if err {
return []byte{0}, err return []byte{0}, err
} }
return []byte(v), err return v[:n], err
} }
/*Delegate function responsible for all parsing and hashtable interactions for the SET command sent by client /*Delegate function responsible for all parsing and hashtable interactions for the SET command sent by client
*arguments: client connection, tokenized command sent by the client, pointer to hashtable structure, channel shared with myRead *arguments: client connection, tokenized command sent by the client, pointer to hashtable structure, channel shared with myRead
*return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client *return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client
*/ */
func performSet(conn net.Conn, tokens []string, table *KeyValueStore, ch chan string) (uint64, bool, bool) { func performSet(conn net.Conn, tokens []string, table *KeyValueStore, ch chan []byte) (uint64, bool, bool) {
k := tokens[0] k := tokens[0]
e, _ := strconv.ParseUint(tokens[1], 10, 64) //expiry time offset //expiry time offset
n, _ := strconv.ParseUint(tokens[2], 10, 64) //numbytes e, _ := strconv.ParseUint(tokens[1], 10, 64)
//numbytes
n, _ := strconv.ParseUint(tokens[2], 10, 64)
r := true r := true
if len(tokens) == 4 && tokens[3] == NOREPLY { if len(tokens) == 4 && tokens[3] == NOREPLY {
...@@ -452,11 +458,14 @@ func performSet(conn net.Conn, tokens []string, table *KeyValueStore, ch chan st ...@@ -452,11 +458,14 @@ func performSet(conn net.Conn, tokens []string, table *KeyValueStore, ch chan st
*/ */
func performGet(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, bool) { func performGet(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, bool) {
k := tokens[0] k := tokens[0]
defer table.RUnlock() defer table.Unlock()
table.RLock() //lock because if key is expired, we'll delete it
table.Lock()
//critical section begin //critical section begin
if v, ok := table.dictionary[k]; ok { if v, ok := table.dictionary[k]; ok {
if !v.isPerpetual && v.expTime < uint64(time.Now().Unix()) { if !v.isPerpetual && v.expTime < uint64(time.Now().Unix()) {
//delete the key
delete(table.dictionary, k)
return nil, false return nil, false
} }
data := new(Data) data := new(Data)
...@@ -474,11 +483,13 @@ func performGet(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, bo ...@@ -474,11 +483,13 @@ func performGet(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, bo
*/ */
func performGetm(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, bool) { func performGetm(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, bool) {
k := tokens[0] k := tokens[0]
defer table.RUnlock() defer table.Unlock()
table.RLock() table.Lock()
//critical section begin //critical section begin
if v, ok := table.dictionary[k]; ok { if v, ok := table.dictionary[k]; ok {
if !v.isPerpetual && v.expTime < uint64(time.Now().Unix()) { if !v.isPerpetual && v.expTime < uint64(time.Now().Unix()) {
//delete the key
delete(table.dictionary, k)
return nil, false return nil, false
} }
data := new(Data) data := new(Data)
...@@ -498,7 +509,7 @@ func performGetm(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, b ...@@ -498,7 +509,7 @@ func performGetm(conn net.Conn, tokens []string, table *KeyValueStore) (*Data, b
*return: new version of updated key (if it is updated), error status {0: error while reading new value, 1: key found and changed, *return: new version of updated key (if it is updated), error status {0: error while reading new value, 1: key found and changed,
*2: version mismatch with key, 3: key not found}, whether to reply to client *2: version mismatch with key, 3: key not found}, whether to reply to client
*/ */
func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan string) (uint64, int, bool) { func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan []byte) (uint64, int, bool) {
k := tokens[0] k := tokens[0]
e, _ := strconv.ParseUint(tokens[1], 10, 64) e, _ := strconv.ParseUint(tokens[1], 10, 64)
ve, _ := strconv.ParseUint(tokens[2], 10, 64) ve, _ := strconv.ParseUint(tokens[2], 10, 64)
...@@ -519,7 +530,8 @@ func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan st ...@@ -519,7 +530,8 @@ func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan st
if val, ok := table.dictionary[k]; ok { if val, ok := table.dictionary[k]; ok {
if val.version == ve { if val.version == ve {
if val.isPerpetual || val.expTime >= uint64(time.Now().Unix()) { if val.isPerpetual || val.expTime >= uint64(time.Now().Unix()) {
if e == 0 { //if expiry time is zero, key should not be deleted //if expiry time is zero, key should not be deleted
if e == 0 {
val.isPerpetual = true val.isPerpetual = true
val.expTime = 0 val.expTime = 0
} else { } else {
...@@ -530,7 +542,8 @@ func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan st ...@@ -530,7 +542,8 @@ func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan st
ver++ ver++
val.version = ver val.version = ver
val.value = v val.value = v
return val.version, 0, r //key found and changed //key found and changed
return val.version, 0, r
} else { } else {
logger.Println("expired key found!") logger.Println("expired key found!")
//version found but key expired, can delete key safely and tell client that it does not exist //version found but key expired, can delete key safely and tell client that it does not exist
...@@ -538,9 +551,11 @@ func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan st ...@@ -538,9 +551,11 @@ func performCas(conn net.Conn, tokens []string, table *KeyValueStore, ch chan st
return 0, 3, r return 0, 3, r
} }
} }
return 0, 2, r //version mismatch //version mismatch
return 0, 2, r
} }
return 0, 3, r //key not found //key not found
return 0, 3, r
} }
} }
...@@ -557,12 +572,15 @@ func performDelete(conn net.Conn, tokens []string, table *KeyValueStore) int { ...@@ -557,12 +572,15 @@ func performDelete(conn net.Conn, tokens []string, table *KeyValueStore) int {
//begin critical section //begin critical section
if v, ok := table.dictionary[k]; ok { if v, ok := table.dictionary[k]; ok {
if v.isPerpetual || v.expTime >= uint64(time.Now().Unix()) { if v.isPerpetual || v.expTime >= uint64(time.Now().Unix()) {
flag = 0 //found not expired //found not expired
flag = 0
} }
delete(table.dictionary, k) //delete anyway as expired or needs to be deleted //delete anyway as expired or needs to be deleted
delete(table.dictionary, k)
return flag return flag
} }
return 2 //key not found //key not found
return 2
} }
/*Simple function that dumps the contents of the hashtable /*Simple function that dumps the contents of the hashtable
...@@ -577,6 +595,38 @@ func debug(table *KeyValueStore) { ...@@ -577,6 +595,38 @@ func debug(table *KeyValueStore) {
logger.Println("----end debug----") logger.Println("----end debug----")
} }
/*Copied from the bufio.Scanner (originally ScanLines). By default it splits by '\n' but now we want it to split by '\r\n'
*arguments: data in bytes, is eof reached
*return: next sequence of bytes, chunk of data found, err state
*/
func CustomSplitter(data []byte, atEOF bool) (advance int, token []byte, err error) {
omega := 0
if atEOF && len(data) == 0 {
return 0, nil, nil
}
for {
if i := bytes.IndexByte(data[omega:], '\n'); i > 0 {
//here we add omega as we are using the complete data array instead of the slice where we found '\n'
if data[omega+i-1] == '\r' {
//next byte begins at i+1 and data[0:i+1] returned
return i + 1, data[0 : i+1], nil
} else {
//move the omega index to the byte after \n
omega = i + 1
}
} else {
//need to break free the chains
break
}
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
/*Entry point of this program. Initializes the start of ther server and sets up the logger. /*Entry point of this program. Initializes the start of ther server and sets up the logger.
*arguments: none *arguments: none
*return: none *return: none
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment