Commit 90c89103 authored by Sushant Mahajan's avatar Sushant Mahajan

completed candidate implementation

parent 5c9f2304
#! /bin/bash
rm {1..5} currentTerm* votedFor*
...@@ -26,6 +26,8 @@ const ( ...@@ -26,6 +26,8 @@ const (
FILE_WRITTEN = 0 FILE_WRITTEN = 0
FILE_ERR = -1 FILE_ERR = -1
NULL_VOTE = 0 NULL_VOTE = 0
LOG_INVALID_INDEX = -1
LOG_INVALID_TERM = -1
) )
// Global variable for generating unique log sequence numbers // Global variable for generating unique log sequence numbers
...@@ -98,6 +100,8 @@ type Raft struct { ...@@ -98,6 +100,8 @@ type Raft struct {
currentTerm int currentTerm int
commitIndex int commitIndex int
voters int voters int
monitorVotesCh chan bool
et *time.Timer
} }
// Log entry interface // Log entry interface
...@@ -165,14 +169,19 @@ func writeFile(name string, serverId int, data int, info *log.Logger) int { ...@@ -165,14 +169,19 @@ func writeFile(name string, serverId int, data int, info *log.Logger) int {
// commitCh is the channel that the kvstore waits on for committed messages. // commitCh is the channel that the kvstore waits on for committed messages.
// When the process starts, the local disk log is read and all committed // When the process starts, the local disk log is read and all committed
// entries are recovered and replayed // entries are recovered and replayed
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, eventCh chan RaftEvent, toDebug bool) (*Raft, error) { func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, eventCh chan RaftEvent, monitorVotesCh chan bool, toDebug bool) (*Raft, error) {
rft := new(Raft) rft := new(Raft)
rft.commitCh = commitCh rft.commitCh = commitCh
rft.clusterConfig = config rft.clusterConfig = config
rft.id = thisServerId rft.id = thisServerId
rft.eventCh = eventCh rft.eventCh = eventCh
rft.Info = getLogger(thisServerId, toDebug) rft.Info = getLogger(thisServerId, toDebug)
rft.currentTerm = getSingleDataFromFile(CURRENT_TERM, thisServerId, rft.Info) if v := getSingleDataFromFile(CURRENT_TERM, thisServerId, rft.Info); v != FILE_ERR {
rft.currentTerm = v
} else {
rft.currentTerm = 0
}
rft.monitorVotesCh = monitorVotesCh
getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file. getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file.
return rft, nil return rft, nil
} }
...@@ -261,7 +270,6 @@ func (e ErrRedirect) Error() string { ...@@ -261,7 +270,6 @@ func (e ErrRedirect) Error() string {
func (rft *Raft) loop() { func (rft *Raft) loop() {
state := FOLLOWER state := FOLLOWER
for { for {
rft.Info.Println("hello")
switch state { switch state {
case FOLLOWER: case FOLLOWER:
state = rft.follower() state = rft.follower()
...@@ -273,19 +281,20 @@ func (rft *Raft) loop() { ...@@ -273,19 +281,20 @@ func (rft *Raft) loop() {
} }
} }
func getTimer() *time.Timer { func getRandTime(log *log.Logger) time.Duration {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
return time.NewTimer(time.Millisecond * time.Duration((rand.Intn(MAX_TIMEOUT)+MIN_TIMEOUT)%MAX_TIMEOUT)) t := time.Millisecond * time.Duration(rand.Intn(MAX_TIMEOUT-MIN_TIMEOUT)+MIN_TIMEOUT)
} log.Println("New rand time", t)
return t
func reInitializeTimer(t *time.Timer) *time.Timer {
t.Stop()
return getTimer()
} }
func (rft *Raft) grantVote(reply bool, currentTerm int) { func (rft *Raft) grantVote(reply bool, currentTerm int) {
if reply { if reply {
rft.voters++ rft.voters++
rft.Info.Println(rft.id, "got vote")
if rft.voters >= len(rafts)/2+1 {
rft.monitorVotesCh <- true
}
} }
} }
...@@ -300,24 +309,38 @@ func (rft *Raft) updateTermAndVote(term int) { ...@@ -300,24 +309,38 @@ func (rft *Raft) updateTermAndVote(term int) {
rft.votedFor = NULL_VOTE rft.votedFor = NULL_VOTE
} }
func (rft *Raft) LogF(msg string) {
rft.Info.Println("F:", rft.id, msg)
}
func (rft *Raft) LogC(msg string) {
rft.Info.Println("C:", rft.id, msg)
}
func (rft *Raft) LogL(msg string) {
rft.Info.Println("L:", rft.id, msg)
}
func (rft *Raft) follower() int { func (rft *Raft) follower() int {
//start candidate timeout //start candidate timeout
electionTimeout := getTimer() rft.et = time.NewTimer(getRandTime(rft.Info))
for { for {
rft.Info.Println("xyz")
//wrap in select //wrap in select
select { select {
case <-electionTimeout.C: case <-rft.et.C:
rft.LogF("follower election timeout")
return CANDIDATE return CANDIDATE
case event := <-rft.eventCh: case event := <-rft.eventCh:
switch event.(type) { switch event.(type) {
case *ClientAppend: case *ClientAppend:
rft.LogF("got client append")
//Do not handle clients in follower mode. //Do not handle clients in follower mode.
//Send it back up the pipeline. //Send it back up the pipeline.
event.(*ClientAppend).logEntry.SetCommitted(false) event.(*ClientAppend).logEntry.SetCommitted(false)
rft.eventCh <- event.(*ClientAppend).logEntry rft.eventCh <- event.(*ClientAppend).logEntry
case *VoteRequest: case *VoteRequest:
rft.LogF("got vote request")
req := event.(*VoteRequest) req := event.(*VoteRequest)
reply := false reply := false
if req.term < rft.currentTerm { if req.term < rft.currentTerm {
...@@ -332,14 +355,19 @@ func (rft *Raft) follower() int { ...@@ -332,14 +355,19 @@ func (rft *Raft) follower() int {
} }
if reply && rft.votedFor == NULL_VOTE { if reply && rft.votedFor == NULL_VOTE {
electionTimeout = reInitializeTimer(electionTimeout) rft.et.Reset(getRandTime(rft.Info))
rft.LogF("reset timer after voting")
writeFile(VOTED_FOR, rft.id, req.candidateId, rft.Info) writeFile(VOTED_FOR, rft.id, req.candidateId, rft.Info)
rft.LogF("voted for " + strconv.Itoa(req.candidateId))
rft.votedFor = req.candidateId rft.votedFor = req.candidateId
rafts[req.candidateId].grantVote(reply, rft.currentTerm)
} }
//let leader know about the vote
rafts[req.candidateId].grantVote(reply, rft.currentTerm)
case *AppendRPC: case *AppendRPC:
electionTimeout = reInitializeTimer(electionTimeout) rft.LogF("got append rpc")
rft.et.Reset(getRandTime(rft.Info))
rft.LogF("reset timer on appendRPC")
req := event.(*AppendRPC) req := event.(*AppendRPC)
reply := true reply := true
if req.term < rft.currentTerm { if req.term < rft.currentTerm {
...@@ -374,6 +402,7 @@ func (rft *Raft) follower() int { ...@@ -374,6 +402,7 @@ func (rft *Raft) follower() int {
} }
} }
} }
rft.LogF("AppendRPC")
rafts[req.leaderId].replyAppendRPC(reply, rft.currentTerm) rafts[req.leaderId].replyAppendRPC(reply, rft.currentTerm)
} }
} }
...@@ -381,9 +410,60 @@ func (rft *Raft) follower() int { ...@@ -381,9 +410,60 @@ func (rft *Raft) follower() int {
} }
func (rft *Raft) candidate() int { func (rft *Raft) candidate() int {
return 1 //increment current term
rft.LogC("became candidate")
writeFile(CURRENT_TERM, rft.id, rft.currentTerm+1, rft.Info)
rft.currentTerm++
//vote for self
rft.voters = 1
writeFile(VOTED_FOR, rft.id, rft.id, rft.Info)
rft.votedFor = rft.id
//reset timer
rft.et = time.NewTimer(getRandTime(rft.Info))
rft.Info.Println(rft.id, "candidate got new timer")
//create a vote request object
req := &VoteRequest{
term: rft.currentTerm,
candidateId: rft.id,
}
if len(rft.LogArray) == 0 {
req.lastLogIndex = LOG_INVALID_INDEX
req.lastLogTerm = LOG_INVALID_TERM
} else {
req.lastLogIndex = len(rft.LogArray) - 1
req.lastLogTerm = rft.LogArray[req.lastLogIndex].Term
}
//send vote request to all servers
for i := 1; i <= len(rafts); i++ {
if i != rft.id {
rft.LogC("sent vote request to " + strconv.Itoa(rafts[i].id))
rafts[i].eventCh <- req
}
}
for {
select {
case <-rft.monitorVotesCh:
rft.LogC("C to L")
return LEADER
case <-rft.et.C:
rft.LogC("C to C")
return CANDIDATE
case event := <-rft.eventCh:
switch event.(type) {
case (*AppendRPC):
rft.LogC("C to F")
return FOLLOWER
}
}
}
} }
func (rft *Raft) leader() int { func (rft *Raft) leader() int {
return 1 select {
case <-rft.commitCh:
rft.LogL("hello")
}
return LEADER
} }
...@@ -26,8 +26,9 @@ func getLogger(serverId int, toDebug bool) (l *log.Logger) { ...@@ -26,8 +26,9 @@ func getLogger(serverId int, toDebug bool) (l *log.Logger) {
func Start(serverId int, toDebug bool) { func Start(serverId int, toDebug bool) {
eventCh := make(chan RaftEvent) eventCh := make(chan RaftEvent)
commitCh := make(chan LogEntry) commitCh := make(chan LogEntry)
monitorVotesCh := make(chan bool)
clusterConfig, _ := NewClusterConfig(5) clusterConfig, _ := NewClusterConfig(5)
rft, _ := NewRaft(clusterConfig, serverId, commitCh, eventCh, true) rft, _ := NewRaft(clusterConfig, serverId, commitCh, eventCh, monitorVotesCh, true)
if rafts == nil { if rafts == nil {
rafts = make(map[int]*Raft) rafts = make(map[int]*Raft)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment