Commit e85133be authored by Sushant Mahajan's avatar Sushant Mahajan

modified kvstore to work with raft

parent dffef763
......@@ -144,6 +144,7 @@ func HandleClient(conn net.Conn, rft *raft.Raft) {
if flag {
if v, err := readValue(ch, nr); err {
Write(conn, "ERR_CMD_ERR")
continue
} else {
command.Val = v
//command.isVal = true
......
......@@ -10,6 +10,7 @@ import (
"strings"
"sync"
"time"
"utils"
)
/*Constants used throughout the program to identify commands, request, response, and error messages*/
......@@ -35,7 +36,7 @@ const (
ERR_INTERNAL = "ERR_INTERNAL"
//constant
MAX_CMD_ARGS = 6
MAX_CMD_ARGS = 5
MIN_CMD_ARGS = 2
READ_TIMEOUT = 5
)
......@@ -78,7 +79,7 @@ func write(conn net.Conn, msg string) {
func isValid(cmd string, tokens []string, conn net.Conn) int {
switch cmd {
case SET:
if len(tokens) > 5 || len(tokens) < 4 {
if len(tokens) != 4 {
logger.Println(cmd, ":Invalid no. of tokens")
write(conn, ERR_CMD_ERR)
return 1
......@@ -88,11 +89,6 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
write(conn, ERR_CMD_ERR)
return 1
}
if len(tokens) == 5 && tokens[4] != NOREPLY {
logger.Println(cmd, ":optional arg incorrect")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[2], 10, 64); err != nil {
logger.Println(cmd, ":expiry time invalid")
write(conn, ERR_CMD_ERR)
......@@ -129,7 +125,7 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
}
case CAS:
if len(tokens) > 6 || len(tokens) < 5 {
if len(tokens) != 5 {
logger.Println(cmd, ":Invalid number of tokens")
write(conn, ERR_CMD_ERR)
return 1
......@@ -139,11 +135,6 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
write(conn, ERR_CMD_ERR)
return 1
}
if len(tokens) == 6 && tokens[5] != NOREPLY {
logger.Println(cmd, ":optional arg incorrect")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[2], 10, 64); err != nil {
logger.Println(cmd, ":expiry time invalid")
write(conn, ERR_CMD_ERR)
......@@ -179,12 +170,26 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
return 0
}
func MonitorCommitChannel(ch chan LogEntry) {
for {
buffer := new(bytes.Buffer)
temp := <-ch
conn := temp.(*LogEntryData).conn
cmd := new(utils.Command)
if err := cmd.GobDecode(temp.Data()); err != nil {
log.Fatal("Error decoding command!")
}
ParseInput(conn, cmd)
}
}
/*Function parses the command provided by the client and delegates further action to command specific functions.
*Based on the return values of those functions, send appropriate messages to the client.
*arguments: client connection, message from client, channel shared with myRead function
*return: none
*/
func ParseInput(conn net.Conn, msg string, ch chan []byte) {
func ParseInput(conn net.Conn, cmd *utils.Command) {
msg := string(cmd.Cmd)
tokens := strings.Fields(msg)
//general error, don't check for commands, avoid the pain ;)
if len(tokens) > MAX_CMD_ARGS || len(tokens) < MIN_CMD_ARGS {
......@@ -201,7 +206,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
if isValid(SET, tokens, conn) != 0 {
return
}
if ver, ok, r := performSet(conn, tokens[1:len(tokens)], ch); ok {
if ver, ok, r := performSet(conn, tokens[1:len(tokens)], cmd); ok {
//debug(table)
logger.Println(ver)
if r {
......@@ -267,7 +272,7 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
if isValid(CAS, tokens, conn) != 0 {
return
}
if ver, ok, r := performCas(conn, tokens[1:len(tokens)], ch); r {
if ver, ok, r := performCas(conn, tokens[1:len(tokens)], cmd); r {
if r {
switch ok {
case 0:
......@@ -313,50 +318,40 @@ func ParseInput(conn net.Conn, msg string, ch chan []byte) {
}
/*Delegate function responsible for all parsing and hashtable interactions for the SET command sent by client
*arguments: client connection, tokenized command sent by the client, channel shared with myRead
*arguments: client connection, tokenized command sent by the client, command structure @utils.Command
*return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client
*/
func performSet(conn net.Conn, tokens []string, ch chan []byte) (uint64, bool, bool) {
//-k := tokens[0]
func performSet(conn net.Conn, tokens []string, cmd *utils.Command) (uint64, bool, bool) {
k := tokens[0]
//expiry time offset
//-e, _ := strconv.ParseUint(tokens[1], 10, 64)
e, _ := strconv.ParseUint(tokens[1], 10, 64)
//numbytes
//-n, _ := strconv.ParseUint(tokens[2], 10, 64)
n, _ := strconv.ParseUint(tokens[2], 10, 64)
r := true
if len(tokens) == 4 && tokens[3] == NOREPLY {
r = false
}
logger.Println(r)
//if v, err := readValue(ch, n); err {
// write(conn, ERR_CMD_ERR)
// return 0, false, r
//} else {
// defer table.Unlock()
// table.Lock()
// //critical section start
// var val *Data
// if _, ok := table.dictionary[k]; ok {
// val = table.dictionary[k]
// } else {
// val = new(Data)
// table.dictionary[k] = val
// }
// val.numBytes = n
// val.version++
// if e == 0 {
// val.isPerpetual = true
// val.expTime = 0
// } else {
// val.isPerpetual = false
// val.expTime = e + uint64(time.Now().Unix())
// }
// val.value = v
// return val.version, true, r
//}
return 2, true, true
defer table.Unlock()
table.Lock()
//critical section start
var val *Data
if _, ok := table.dictionary[k]; ok {
val = table.dictionary[k]
} else {
val = new(Data)
table.dictionary[k] = val
}
val.numBytes = n
val.version++
if e == 0 {
val.isPerpetual = true
val.expTime = 0
} else {
val.isPerpetual = false
val.expTime = e + uint64(time.Now().Unix())
}
val.value = cmd.Val
return val.version, true, r
}
/*Delegate function reponsible for activities related to the GET command sent by the client.
......@@ -413,11 +408,11 @@ func performGetm(conn net.Conn, tokens []string) (*Data, bool) {
}
/*Delegate function reponsible for activities related to the CAS command sent by the client.
*arguments: client connection, tokenized command sent by the client, channel shared with myRead
*arguments: client connection, tokenized command sent by the client, cmd pointer @utils.Command
*return: new version of updated key (if it is updated), error status {0: error while reading new value, 1: key found and changed,
*2: version mismatch with key, 3: key not found}, whether to reply to client
*/
func performCas(conn net.Conn, tokens []string, ch chan []byte) (uint64, int, bool) {
func performCas(conn net.Conn, tokens []string, cmd *utils.Command) (uint64, int, bool) {
k := tokens[0]
e, _ := strconv.ParseUint(tokens[1], 10, 64)
ve, _ := strconv.ParseUint(tokens[2], 10, 64)
......@@ -425,46 +420,37 @@ func performCas(conn net.Conn, tokens []string, ch chan []byte) (uint64, int, bo
r := true
logger.Println(k, e, ve, n, r)
if len(tokens) == 5 && tokens[4] == NOREPLY {
r = false
}
//read value
//if v, err := readValue(ch, n); err {
// return 0, 1, r
//} else {
// defer table.Unlock()
// table.Lock()
// if val, ok := table.dictionary[k]; ok {
// if val.version == ve {
// if val.isPerpetual || val.expTime >= uint64(time.Now().Unix()) {
// //if expiry time is zero, key should not be deleted
// if e == 0 {
// val.isPerpetual = true
// val.expTime = 0
// } else {
// val.isPerpetual = false
// val.expTime = e + uint64(time.Now().Unix())
// }
// val.numBytes = n
// val.version++
// val.value = v
// //key found and changed
// return val.version, 0, r
// } else {
// logger.Println("expired key found!")
// //version found but key expired, can delete key safely and tell client that it does not exist
// delete(table.dictionary, k)
// return 0, 3, r
// }
// }
// //version mismatch
// return 0, 2, r
// }
// //key not found
// return 0, 3, r
//}
return 1, 1, true
defer table.Unlock()
table.Lock()
if val, ok := table.dictionary[k]; ok {
if val.version == ve {
if val.isPerpetual || val.expTime >= uint64(time.Now().Unix()) {
//if expiry time is zero, key should not be deleted
if e == 0 {
val.isPerpetual = true
val.expTime = 0
} else {
val.isPerpetual = false
val.expTime = e + uint64(time.Now().Unix())
}
val.numBytes = n
val.version++
val.value = cmd.Val
//key found and changed
return val.version, 0, r
} else {
logger.Println("expired key found!")
//version found but key expired, can delete key safely and tell client that it does not exist
delete(table.dictionary, k)
return 0, 3, r
}
}
//version mismatch
return 0, 2, r
}
//key not found
return 0, 3, r
}
/*Delegate function reponsible for activities related to the DELETE command sent by the client.
......@@ -503,32 +489,13 @@ func debug() {
logger.Println("----end debug----")
}
func InitKVStore() {
toLog := ""
if len(os.Args) > 1 {
toLog = os.Args[1]
}
//toLog = "s"
if toLog != "" {
logf, _ := os.OpenFile("serverlog.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
defer logf.Close()
logger = log.New(logf, "SERVER: ", log.Ltime|log.Lshortfile)
//logger = log.New(os.Stdout, "SERVER: ", log.Ltime|log.Lshortfile)
} else {
logger = log.New(ioutil.Discard, "SERVER: ", log.Ldate)
}
func InitKVStore(log *log.Logger) {
logger := log
//initialize key value store
table = &KeyValueStore{dictionary: make(map[string]*Data)}
}
func MonitorCommitChannel(ch chan LogEntry) {
for {
//temp := <-ch
}
}
//server will not call this, we'll call it from test cases to clear the map
func ReInitServer() {
defer table.Unlock()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment