Commit 0a0bba45 authored by Sushant Mahajan's avatar Sushant Mahajan

leader impl complete, log replication working

parent 189cf137
......@@ -292,14 +292,14 @@ func (rft *Raft) loop() {
func getRandTime(log *log.Logger) time.Duration {
rand.Seed(time.Now().UnixNano())
t := time.Millisecond * time.Duration(rand.Intn(MAX_TIMEOUT_ELEC-MIN_TIMEOUT_ELEC)+MIN_TIMEOUT_ELEC)
log.Println("New rand time", t)
//log.Println("New rand time", t)
return t
}
func (rft *Raft) grantVote(reply bool, currentTerm int) {
if reply {
rft.voters++
rft.Info.Println(rft.id, "got vote")
//rft.Info.Println(rft.id, "got vote")
if !rft.isLeader && rft.voters > len(rafts)/2 {
rft.LogC("got majority")
rft.monitorVotesCh <- true
......@@ -308,14 +308,13 @@ func (rft *Raft) grantVote(reply bool, currentTerm int) {
}
}
func (rft *Raft) replyAppendRPC(reply bool, currentTerm int, id int) {
defer rafts[id].Unlock()
rafts[id].Lock()
//receiver is leader
func (rft *Raft) replyAppendRPC(reply bool, currentTerm int, fId int) {
if reply {
rafts[id].nextIndex[id-1] = len(rft.LogArray) - 1
rafts[id].matchIndex[id-1] = len(rft.LogArray) - 1
rft.nextIndex[fId-1] = len(rafts[fId].LogArray)
rft.matchIndex[fId-1] = len(rafts[fId].LogArray)
} else {
rafts[id].nextIndex[id-1]--
rft.nextIndex[fId-1]--
}
}
......@@ -382,38 +381,46 @@ func (rft *Raft) follower() int {
rafts[req.candidateId].grantVote(reply, rft.currentTerm)
case *AppendRPC:
rft.LogF("got append rpc")
//rft.LogF("got append rpc")
rft.et.Reset(getRandTime(rft.Info))
rft.LogF("reset timer on appendRPC")
//rft.LogF("reset timer on appendRPC")
req := event.(*AppendRPC)
if len(req.entries) == 0 { //heartbeat
rft.LogF("got hearbeat from " + strconv.Itoa(req.leaderId))
//rft.LogF("got hearbeat from " + strconv.Itoa(req.leaderId))
continue
}
reply := true
if req.term < rft.currentTerm {
reply = false
}
if req.term > rft.currentTerm {
if req.prevLogIndex == LOG_INVALID_INDEX || req.prevLogIndex == LOG_INVALID_TERM {
rft.updateTermAndVote(req.term)
reply = true
} else if req.term < rft.currentTerm {
reply = false
} else if req.term > rft.currentTerm {
rft.updateTermAndVote(req.term)
reply = true
}
//first condition to prevent out of bounds except
if len(rft.LogArray) < req.prevLogIndex || rft.LogArray[req.prevLogIndex].Term != req.prevLogTerm {
if !(req.prevLogIndex == LOG_INVALID_INDEX) && rft.LogArray[req.prevLogIndex].Term != req.prevLogTerm {
rft.LogF("terms unequal")
reply = false
}
if reply {
i := req.prevLogIndex + 1
for ; i < len(rft.LogArray); i++ {
if req.entries[i-req.prevLogIndex-1].Term != rft.LogArray[i].Term {
if req.prevLogIndex == LOG_INVALID_INDEX || req.entries[i-req.prevLogIndex-1].Term != rft.LogArray[i].Term {
break
}
}
rft.LogArray = append(rft.LogArray[0:i], req.entries[i-req.prevLogIndex-1:]...)
if req.prevLogIndex == LOG_INVALID_INDEX {
rft.LogArray = append(rft.LogArray, req.entries...)
} else {
rft.LogArray = append(rft.LogArray[0:i], req.entries[i-req.prevLogIndex-1:]...)
}
//todo:also add to log
if req.leaderCommit > rft.commitIndex {
......@@ -424,8 +431,8 @@ func (rft *Raft) follower() int {
}
}
}
rft.LogF("AppendRPC")
rafts[req.leaderId].replyAppendRPC(reply, rft.currentTerm, rft.id)
rft.Info.Println("F: log is size", len(rft.LogArray))
}
}
}
......@@ -483,21 +490,26 @@ func (rft *Raft) candidate() int {
}
}
func (rft *Raft) enforceLog() {
req := &AppendRPC{
term: rft.currentTerm,
leaderId: rft.id,
leaderCommit: rft.commitIndex,
}
for i := 0; i < len(rft.nextIndex); i++ {
if len(rft.LogArray)-1 >= rft.nextIndex[i] {
req.entries = rft.LogArray[rft.nextIndex[i]:len(rft.LogArray)]
req.prevLogIndex = rft.nextIndex[i] - 1
req.prevLogTerm = rft.LogArray[rft.nextIndex[i]-1].Term
if !rafts[i+1].isLeader {
func enforceLog(rft *Raft) {
for {
for i := 0; i < len(rft.nextIndex); i++ {
if !rafts[i+1].isLeader && len(rft.LogArray)-1 >= rft.nextIndex[i] {
req := &AppendRPC{}
req.term = rft.currentTerm
req.leaderId = rft.id
req.leaderCommit = rft.commitIndex
req.entries = rft.LogArray[rft.nextIndex[i]:len(rft.LogArray)]
req.prevLogIndex = rft.nextIndex[i] - 1
if req.prevLogIndex <= 0 {
req.prevLogTerm = LOG_INVALID_TERM
} else {
req.prevLogTerm = rft.LogArray[rft.nextIndex[i]-1].Term
}
//send to other rafts
rafts[i+1].eventCh <- req
rft.LogL("sent append entries to " + strconv.Itoa(i+1))
}
time.Sleep(time.Millisecond * 2)
}
}
}
......@@ -508,20 +520,34 @@ func (rft *Raft) leader() int {
heartbeatReq := new(AppendRPC)
heartbeatReq.entries = []*LogEntryData{}
heartbeatReq.leaderId = rft.id
//reintialize nextIndex and matchIndex
rft.LogArray = append(
rft.LogArray,
&LogEntryData{
Id: 1,
Data: []byte("hello"),
Committed: false,
},
&LogEntryData{
Id: 2,
Data: []byte("world"),
Committed: false,
})
//build nextIndex and matchIndex
for i := 0; i < len(rft.nextIndex); i++ {
rft.nextIndex[i] = len(rft.LogArray)
rft.nextIndex[i] = 0
rft.matchIndex[i] = 0
}
//go rft.enforceLog()
go enforceLog(rft)
for {
select {
case <-heartbeat.C:
for k, v := range rafts {
if k != rft.id {
rft.LogL("sending AppendRPC " + strconv.Itoa(k))
//rft.LogL("sending AppendRPC " + strconv.Itoa(k))
v.eventCh <- heartbeatReq
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment