Commit c330e0be authored by Sushant Mahajan's avatar Sushant Mahajan

followers do not write back to client now

parent 7a0f540c
......@@ -49,6 +49,9 @@ type Data struct {
isPerpetual bool //specifies that the key does not expire
}
//leader check
var sid int
//get value
func (d *Data) GetVal() []byte {
return d.value
......@@ -88,7 +91,9 @@ func (kvstr *KeyValueStore) GetDicKVstr() map[string]*Data {
func write(conn net.Conn, msg string) {
buf := []byte(msg)
buf = append(buf, []byte(CRLF)...)
if sid == 1 {
conn.Write(buf)
}
}
/*Basic validations for various commands
......@@ -196,7 +201,13 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
func MonitorCommitChannel(ch chan LogEntry) {
for {
temp := <-ch
conn := temp.(*LogEntryData).conn
var conn net.Conn
if sid == 1 {
conn = temp.(*LogEntryData).conn
} else {
conn = nil
}
cmd := new(utils.Command)
buffer := bytes.NewBuffer(temp.GetData())
......@@ -232,7 +243,7 @@ func ParseInput(conn net.Conn, cmd *utils.Command) {
if isValid(SET, tokens, conn) != 0 {
return
}
if ver, ok, r := performSet(conn, tokens[1:len(tokens)], cmd); ok {
if ver, ok, r := performSet(tokens[1:len(tokens)], cmd); ok {
//debug(table)
logger.Println(ver)
if r {
......@@ -248,7 +259,7 @@ func ParseInput(conn net.Conn, cmd *utils.Command) {
if isValid(GET, tokens, conn) != 0 {
return
}
if data, ok := performGet(conn, tokens[1:len(tokens)]); ok {
if data, ok := performGet(tokens[1:len(tokens)]); ok {
logger.Println("sending", tokens[1], "data")
buffer.Reset()
buffer.WriteString(VALUE)
......@@ -269,7 +280,7 @@ func ParseInput(conn net.Conn, cmd *utils.Command) {
if isValid(GETM, tokens, conn) != 0 {
return
}
if data, ok := performGetm(conn, tokens[1:len(tokens)]); ok {
if data, ok := performGetm(tokens[1:len(tokens)]); ok {
logger.Println("sending", tokens[1], "metadata")
buffer.Reset()
buffer.WriteString(VALUE)
......@@ -298,7 +309,7 @@ func ParseInput(conn net.Conn, cmd *utils.Command) {
if isValid(CAS, tokens, conn) != 0 {
return
}
if ver, ok, r := performCas(conn, tokens[1:len(tokens)], cmd); r {
if ver, ok, r := performCas(tokens[1:len(tokens)], cmd); r {
if r {
switch ok {
case 0:
......@@ -329,7 +340,7 @@ func ParseInput(conn net.Conn, cmd *utils.Command) {
if isValid(DELETE, tokens, conn) != 0 {
return
}
if ok := performDelete(conn, tokens[1:len(tokens)]); ok == 0 {
if ok := performDelete(tokens[1:len(tokens)]); ok == 0 {
write(conn, DELETED)
} else {
write(conn, ERR_NOT_FOUND)
......@@ -344,10 +355,10 @@ func ParseInput(conn net.Conn, cmd *utils.Command) {
}
/*Delegate function responsible for all parsing and hashtable interactions for the SET command sent by client
*arguments: client connection, tokenized command sent by the client, command structure @utils.Command
*arguments: tokenized command sent by the client, command structure @utils.Command
*return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client
*/
func performSet(conn net.Conn, tokens []string, cmd *utils.Command) (uint64, bool, bool) {
func performSet(tokens []string, cmd *utils.Command) (uint64, bool, bool) {
k := tokens[0]
//expiry time offset
e, _ := strconv.ParseUint(tokens[1], 10, 64)
......@@ -381,10 +392,10 @@ func performSet(conn net.Conn, tokens []string, cmd *utils.Command) (uint64, boo
}
/*Delegate function reponsible for activities related to the GET command sent by the client.
*arguments: client connection, tokenized command sent by the client
*arguments: tokenized command sent by the client
*return: pointer to value corresponding to the key given by client, success or failure
*/
func performGet(conn net.Conn, tokens []string) (*Data, bool) {
func performGet(tokens []string) (*Data, bool) {
k := tokens[0]
defer table.Unlock()
//lock because if key is expired, we'll delete it
......@@ -406,10 +417,10 @@ func performGet(conn net.Conn, tokens []string) (*Data, bool) {
}
/*Delegate function reponsible for activities related to the GETM command sent by the client.
*arguments: client connection, tokenized command sent by the client
*arguments: tokenized command sent by the client
*return: pointer to value corresponding to the key given by client, success or failure
*/
func performGetm(conn net.Conn, tokens []string) (*Data, bool) {
func performGetm(tokens []string) (*Data, bool) {
k := tokens[0]
defer table.Unlock()
table.Lock()
......@@ -434,11 +445,11 @@ func performGetm(conn net.Conn, tokens []string) (*Data, bool) {
}
/*Delegate function reponsible for activities related to the CAS command sent by the client.
*arguments: client connection, tokenized command sent by the client, cmd pointer @utils.Command
*arguments: tokenized command sent by the client, cmd pointer @utils.Command
*return: new version of updated key (if it is updated), error status {0: error while reading new value, 1: key found and changed,
*2: version mismatch with key, 3: key not found}, whether to reply to client
*/
func performCas(conn net.Conn, tokens []string, cmd *utils.Command) (uint64, int, bool) {
func performCas(tokens []string, cmd *utils.Command) (uint64, int, bool) {
k := tokens[0]
e, _ := strconv.ParseUint(tokens[1], 10, 64)
ve, _ := strconv.ParseUint(tokens[2], 10, 64)
......@@ -480,10 +491,10 @@ func performCas(conn net.Conn, tokens []string, cmd *utils.Command) (uint64, int
}
/*Delegate function reponsible for activities related to the DELETE command sent by the client.
*arguments: client connection, tokenized command sent by the client
*arguments: tokenized command sent by the client
*return: integer secifying error state {0: found and deleted, 1: found but expired (deleted but client told non-existent, 2: key not found}
*/
func performDelete(conn net.Conn, tokens []string) int {
func performDelete(tokens []string) int {
k := tokens[0]
logger.Println(tokens)
flag := 1
......@@ -519,8 +530,9 @@ func Debug() {
*arguments: Logger
*return: none
*/
func InitKVStore(log *log.Logger) {
func InitKVStore(log *log.Logger, id int) {
logger = log
sid = id
//initialize key value store
table = &KeyValueStore{dictionary: make(map[string]*Data)}
......
......@@ -22,9 +22,6 @@ var Info *log.Logger
// Global variable for generating unique log sequence numbers
var lsn Lsn
// Flag for enabling/disabling logging functionality
var DEBUG = true
// See Log.Append. Implements Error interface.
type ErrRedirect int
......
......@@ -172,7 +172,7 @@ func main() {
commitCh := make(chan raft.LogEntry)
rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info)
raft.InitKVStore(Info)
raft.InitKVStore(Info, sid)
go raft.MonitorCommitChannel(commitCh) //for kvstore
go initClientCommunication(server, rft, ch1)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment