Commit fe009bef authored by Sushant Mahajan's avatar Sushant Mahajan

added code for follower

parent 495baaba
package main
import (
"fmt"
"raft"
)
const (
SERVERS = 5
)
func main() {
dummyCh := make(chan bool)
commitCh := make(chan raft.LogEntry)
fmt.Println("Started")
for i := 1; i <= 5; i++ {
go raft.Start(i, commitCh, dummyCh, true)
}
if <-dummyCh {
fmt.Println("khattam")
}
}
...@@ -21,6 +21,11 @@ const ( ...@@ -21,6 +21,11 @@ const (
LEADER = iota LEADER = iota
CANDIDATE CANDIDATE
FOLLOWER FOLLOWER
VOTED_FOR = "votedFor"
CURRENT_TERM = "currentTerm"
FILE_WRITTEN = 0
FILE_ERR = -1
NULL_VOTE = 0
) )
// Global variable for generating unique log sequence numbers // Global variable for generating unique log sequence numbers
...@@ -61,6 +66,12 @@ type VoteRequest struct { ...@@ -61,6 +66,12 @@ type VoteRequest struct {
} }
type AppendRPC struct { type AppendRPC struct {
term int
leaderId int
prevLogIndex int
prevLogTerm int
leaderCommit int
entries []*LogEntryData
} }
type Timeout struct { type Timeout struct {
...@@ -85,6 +96,8 @@ type Raft struct { ...@@ -85,6 +96,8 @@ type Raft struct {
eventCh chan RaftEvent //receive events related to various states eventCh chan RaftEvent //receive events related to various states
votedFor int votedFor int
currentTerm int currentTerm int
commitIndex int
voters int
} }
// Log entry interface // Log entry interface
...@@ -99,29 +112,52 @@ type LogEntryData struct { ...@@ -99,29 +112,52 @@ type LogEntryData struct {
Id Lsn // Unique identifier for log entry Id Lsn // Unique identifier for log entry
Data []byte // Data bytes Data []byte // Data bytes
Committed bool // Commit status Committed bool // Commit status
Term int //term number
conn net.Conn // Connection for communicating with client conn net.Conn // Connection for communicating with client
} }
func getCurrentTerm(serverId int, info *log.Logger) int { func (rft *Raft) persistLog() {
if file, err := os.Open("currentTerm" + strconv.Itoa(serverId)); err != nil {
ioutil.WriteFile("currentTerm"+strconv.Itoa(serverId), []byte("0"), 0666) }
info.Println("wrote in term file:0")
func (rft *Raft) readLogFromDisk() {
}
func getSingleDataFromFile(name string, serverId int, info *log.Logger) int {
filename := name + strconv.Itoa(serverId)
if file, err := os.Open(filename); err != nil {
defer file.Close()
ioutil.WriteFile(filename, []byte("0"), 0666)
info.Println("wrote in " + filename + " file")
return 0 return 0
} else { } else {
if data, err := ioutil.ReadFile(file.Name()); err != nil { if data, err := ioutil.ReadFile(file.Name()); err != nil {
info.Println("error reading file") info.Println("error reading file " + filename)
return -1 return FILE_ERR
} else { } else {
info.Println("read from file") info.Println("read from file " + filename)
if t, err2 := strconv.Atoi(string(data)); err2 != nil { if t, err2 := strconv.Atoi(string(data)); err2 != nil {
info.Println("error converting") info.Println("error converting")
return -1 return FILE_ERR
} else { } else {
info.Println("Converted success", t) info.Println("Converted success "+filename, t)
return t return t
} }
} }
return -1 }
}
func writeFile(name string, serverId int, data int, info *log.Logger) int {
filename := name + strconv.Itoa(serverId)
if file, err := os.Open(filename); err != nil {
defer file.Close()
return FILE_ERR
} else {
ioutil.WriteFile(filename, []byte(strconv.Itoa(data)), 0666)
info.Println("wrote in " + filename + " file")
return FILE_WRITTEN //file written
} }
} }
...@@ -136,7 +172,8 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, ev ...@@ -136,7 +172,8 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, ev
rft.id = thisServerId rft.id = thisServerId
rft.eventCh = eventCh rft.eventCh = eventCh
rft.Info = getLogger(thisServerId, toDebug) rft.Info = getLogger(thisServerId, toDebug)
rft.currentTerm = getCurrentTerm(thisServerId, rft.Info) rft.currentTerm = getSingleDataFromFile(CURRENT_TERM, thisServerId, rft.Info)
getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file.
return rft, nil return rft, nil
} }
...@@ -224,10 +261,9 @@ func (e ErrRedirect) Error() string { ...@@ -224,10 +261,9 @@ func (e ErrRedirect) Error() string {
func (rft *Raft) loop() { func (rft *Raft) loop() {
state := FOLLOWER state := FOLLOWER
for { for {
rft.Info.Println("hello")
switch state { switch state {
case FOLLOWER: case FOLLOWER:
state = follower() state = rft.follower()
// case CANDIDATE: // case CANDIDATE:
// state = candidate() // state = candidate()
// case LEADER: // case LEADER:
...@@ -242,31 +278,102 @@ func getTimer() *time.Timer { ...@@ -242,31 +278,102 @@ func getTimer() *time.Timer {
return time.NewTimer(time.Millisecond * time.Duration((rand.Intn(MAX_TIMEOUT)+MIN_TIMEOUT)%MAX_TIMEOUT)) return time.NewTimer(time.Millisecond * time.Duration((rand.Intn(MAX_TIMEOUT)+MIN_TIMEOUT)%MAX_TIMEOUT))
} }
func reInitializeTimer(t *time.Timer) *time.Timer {
t.Stop()
return getTimer()
}
func (rft *Raft) grantVote(reply bool, currentTerm int) {
if reply {
rft.voters++
}
}
func (rft *Raft) replyAppendRPC(reply bool, currentTerm int) {
//
}
func (rft *Raft) updateTermAndVote(term int) {
writeFile(CURRENT_TERM, rft.id, term, rft.Info)
rft.currentTerm = term
writeFile(VOTED_FOR, rft.id, NULL_VOTE, rft.Info)
rft.votedFor = NULL_VOTE
}
func (rft *Raft) follower() int { func (rft *Raft) follower() int {
//start candidate timeout //start candidate timeout
candTimer := getTimer() electionTimeout := getTimer()
for { for {
//wrap in select //wrap in select
select { select {
case <-candTimer.C: case <-electionTimeout.C:
return CANDIDATE return CANDIDATE
case event := <-rft.eventCh: case event := <-rft.eventCh:
switch event.(type) { switch event.(type) {
case *ClientAppend: case *ClientAppend:
// Do not handle clients in follower mode. Send it back up the //Do not handle clients in follower mode.
// pipe with committed = false //Send it back up the pipeline.
event.(*ClientAppend).logEntry.SetCommitted(false) event.(*ClientAppend).logEntry.SetCommitted(false)
rft.commitCh <- event.(*ClientAppend).logEntry rft.eventCh <- event.(*ClientAppend).logEntry
case *VoteRequest: case *VoteRequest:
req := event.(*VoteRequest) req := event.(*VoteRequest)
reply := false
if req.term < rft.currentTerm {
reply = false
}
if req.term > rft.currentTerm ||
req.lastLogTerm > rft.currentTerm ||
(req.lastLogTerm == rft.currentTerm && req.lastLogIndex >= len(rft.LogArray)) {
rft.updateTermAndVote(req.term)
reply = true
}
if reply && rft.votedFor == NULL_VOTE {
electionTimeout = reInitializeTimer(electionTimeout)
writeFile(VOTED_FOR, rft.id, req.candidateId, rft.Info)
rft.votedFor = req.candidateId
rafts[req.candidateId].grantVote(reply, rft.currentTerm)
}
case *AppendRPC:
electionTimeout = reInitializeTimer(electionTimeout)
req := event.(*AppendRPC)
reply := true
if req.term < rft.currentTerm { if req.term < rft.currentTerm {
//reply as - not accepted as leader reply = false
} }
if req.term > rft.currentTerm { if req.term > rft.currentTerm {
//update currentTerm rft.updateTermAndVote(req.term)
reply = true
}
//first condition to prevent out of bounds except
if len(rft.LogArray) < req.prevLogIndex || rft.LogArray[req.prevLogIndex].Term != req.prevLogTerm {
reply = false
}
if reply {
i := req.prevLogIndex + 1
for ; i < len(rft.LogArray); i++ {
if req.entries[i-req.prevLogIndex-1].Term != rft.LogArray[i].Term {
break
}
}
rft.LogArray = append(rft.LogArray[0:i], req.entries[i-req.prevLogIndex-1:]...)
//todo:also add to log
if req.leaderCommit > rft.commitIndex {
if req.leaderCommit > len(rft.LogArray)-1 {
rft.commitIndex = len(rft.LogArray) - 1
} else {
rft.commitIndex = req.leaderCommit
}
}
} }
//condition for - if not voted in current term rafts[req.leaderId].replyAppendRPC(reply, rft.currentTerm)
} }
} }
} }
......
...@@ -23,7 +23,8 @@ func getLogger(serverId int, toDebug bool) (l *log.Logger) { ...@@ -23,7 +23,8 @@ func getLogger(serverId int, toDebug bool) (l *log.Logger) {
return l return l
} }
func Start(serverId int, commitCh chan LogEntry, eventCh chan RaftEvent, dummyCh chan bool, toDebug bool) { func Start(serverId int, commitCh chan LogEntry, dummyCh chan bool, toDebug bool) {
eventCh := make(chan RaftEvent)
clusterConfig, _ := NewClusterConfig(5) clusterConfig, _ := NewClusterConfig(5)
rft, _ := NewRaft(clusterConfig, serverId, commitCh, eventCh, true) rft, _ := NewRaft(clusterConfig, serverId, commitCh, eventCh, true)
if rafts == nil { if rafts == nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment