Commit cbd36253 authored by Sushant Mahajan's avatar Sushant Mahajan

completed append rpc reply

parent fd164d82
...@@ -75,8 +75,12 @@ type VoteRequestReply struct { ...@@ -75,8 +75,12 @@ type VoteRequestReply struct {
reply bool reply bool
} }
//just alias the babe type AppendReply struct {
type AppendReply VoteRequestReply currentTerm int
reply bool
fid int
logLength int
}
type AppendRPC struct { type AppendRPC struct {
term int term int
...@@ -368,49 +372,35 @@ func monitorVotesChannelRoutine(rft *Raft) { ...@@ -368,49 +372,35 @@ func monitorVotesChannelRoutine(rft *Raft) {
} }
func monitorAckChannel(rft *Raft, killCh chan bool) { func monitorAckChannel(rft *Raft, killCh chan bool) {
/*func (rft *Raft) replyAppendRPC(reply bool, currentTerm int, fId int) { flag := false
if reply {
rft.nextIndex[fId-1] = len(rafts[fId].LogArray)
rft.matchIndex[fId-1] = len(rafts[fId].LogArray)
} else {
rft.nextIndex[fId-1]--
}
}
for { for {
select { select {
case temp := <-rft.: case temp := <-rft.ackCh:
Info.Println("Ack Received:", temp) rft.Info.Println("Ack received")
acks_received += temp if temp.reply {
if acks_received == required_acks { rft.nextIndex[temp.fid] = temp.logLength
Info.Println("Majority Achieved", log_entry.(*LogEntryData).Id) rft.matchIndex[temp.fid] = temp.logLength
rft.LogArray[log_entry.(*LogEntryData).Id].Committed = true //update commitindex
//Info.Println(rft.LogArray) for n := rft.commitIndex + 1; n < len(rft.LogArray); n++ {
rft.commitCh <- log_entry maj := 0
for _, server := range rft.clusterConfig.Servers {
temp := new(CommitData) if rft.matchIndex[server.Id] >= n {
temp.Id = log_entry.(*LogEntryData).Id maj++
for _, server := range rft.clusterConfig.Servers[1:] {
go doCommitRPCCall(server.Hostname, server.LogPort, temp)
}
majCh <- true
err = true
break
} }
case <-up:
Info.Println("Error")
err = true
break
} }
if err { if maj > len(rft.clusterConfig.Servers)/2 && rft.LogArray[n].Term == currentTerm {
rft.commitIndex = n
}
}
} else {
rft.nextIndex[temp.fid]--
}
case <-killCh:
flag = true
break break
} }
}*/ if flag {
for{ break
select{
case temp:=rft.ackCh
} }
} }
} }
...@@ -452,6 +442,21 @@ func doCastVoteRPC(hostname string, logPort int, temp *VoteRequestReply) { ...@@ -452,6 +442,21 @@ func doCastVoteRPC(hostname string, logPort int, temp *VoteRequestReply) {
Info.Println("Reply", castVoteCall, reply.X) Info.Println("Reply", castVoteCall, reply.X)
} }
func doAppendReplyRPC(hostname string, logPort int, temp *AppendReply) {
Info.Println("append reply RPC")
//rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("Calling AppendReply RPC", logPort)
appendReplyCall := client.Go("AppendEntries.AppendReplyRPC", args, reply, nil) //let go allocate done channel
appendReplyCall = <-appendReplyCall.Done
Info.Println("Reply", appendReplyCall, reply.X)
}
func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest) { func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest) {
Info.Println("Vote request RPC") Info.Println("Vote request RPC")
//rpc call to the caller //rpc call to the caller
...@@ -481,16 +486,6 @@ func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC) { ...@@ -481,16 +486,6 @@ func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC) {
Info.Println("Reply", appendCall, reply.X) Info.Println("Reply", appendCall, reply.X)
} }
//receiver is leader
/*func (rft *Raft) replyAppendRPC(reply bool, currentTerm int, fId int) {
if reply {
rft.nextIndex[fId-1] = len(rafts[fId].LogArray)
rft.matchIndex[fId-1] = len(rafts[fId].LogArray)
} else {
rft.nextIndex[fId-1]--
}
}*/
func (rft *Raft) updateTermAndVote(term int) { func (rft *Raft) updateTermAndVote(term int) {
writeFile(CURRENT_TERM, rft.id, term, rft.Info) writeFile(CURRENT_TERM, rft.id, term, rft.Info)
rft.currentTerm = term rft.currentTerm = term
...@@ -606,7 +601,9 @@ func (rft *Raft) follower() int { ...@@ -606,7 +601,9 @@ func (rft *Raft) follower() int {
} }
} }
} }
rafts[req.leaderId].replyAppendRPC(reply, rft.currentTerm, rft.id)
temp := &AppendReply{rft.currentTerm, reply, rft.id, len(rft.LogArray)}
doAppendReplyRPC(rft.clusterConfig.Servers[req.leaderId].Hostname, rft.clusterConfig.Servers[req.leaderId].LogPort, temp)
if reply { if reply {
rft.persistLog() rft.persistLog()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment