Commit e6e02994 authored by Sushant Mahajan's avatar Sushant Mahajan

remove useless RPCs, use reply for further communication

parent f18eef0f
......@@ -115,20 +115,20 @@ type Raft struct {
clusterConfig *ClusterConfig // Cluster
id int // Server id
sync.RWMutex
Info *log.Logger //log for raft instance
eventCh chan RaftEvent //receive events related to various states
votedFor int
currentTerm int
commitIndex int
voters int
monitorVotesCh chan RaftEvent
shiftStatusCh chan int
ackCh chan RaftEvent
et *time.Timer
isLeader bool
lastApplied int
nextIndex []int
matchIndex []int
Info *log.Logger //log for raft instance
eventCh chan RaftEvent //receive events related to various states
votedFor int
currentTerm int
commitIndex int
voters int
shiftStatusCh chan int
voteReplyCh chan RaftEvent
appendReplyCh chan RaftEvent
et *time.Timer
isLeader bool
lastApplied int
nextIndex []int
matchIndex []int
}
// Log entry interface
......@@ -242,10 +242,11 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, In
} else {
rft.currentTerm = 0
}
rft.monitorVotesCh = make(chan RaftEvent)
rft.ackCh = make(chan RaftEvent)
rft.eventCh = make(chan RaftEvent)
rft.shiftStatusCh = make(chan int)
rft.appendReplyCh = make(chan RaftEvent)
rft.voteReplyCh = make(chan RaftEvent)
getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file.
rft.isLeader = false
rft.nextIndex = make([]int, len(config.Servers))
......@@ -313,10 +314,20 @@ func (rft *Raft) AddToEventChannel(entry RaftEvent) {
rft.eventCh <- entry
}
//AddToMonitorVotesChannel
func (rft *Raft) AddToMonitorVotesChannel(entry RaftEvent) {
rft.Info.Println("Adding to monitor votes", entry)
rft.monitorVotesCh <- entry
//to be executed by follower
func (rft *Raft) FetchVoteReply() RaftEvent {
//follower puts the reply here after computations
//we need to send this as reply back to candidate
temp := <-rft.voteReplyCh
return temp
}
//to be executed by follower
func (rft *Raft) FetchAppendReply() RaftEvent {
//follower puts the reply here after computations
//we need to send this as reply back to candidate
temp := <-rft.appendReplyCh
return temp
}
func NewServerConfig(serverId int) (*ServerConfig, error) {
......@@ -345,72 +356,6 @@ func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(0)
}
func monitorVotesChannelRoutine(rft *Raft, killCh chan bool) {
majority := len(rft.clusterConfig.Servers) / 2
flag := false
for {
select {
case temp1 := <-rft.monitorVotesCh:
temp := temp1.(*VoteRequestReply)
rft.Info.Println("favorable vote")
if temp.Reply {
rft.voters++
if !rft.isLeader && rft.voters >= majority {
rft.shiftStatusCh <- LEADER
rft.isLeader = true
}
} else {
if rft.currentTerm < temp.CurrentTerm {
rft.updateTermAndVote(temp.CurrentTerm)
rft.shiftStatusCh <- FOLLOWER
}
}
case <-killCh:
flag = true
break
}
if flag {
break
}
}
}
func monitorAckChannel(rft *Raft, killCh chan bool) {
flag := false
for {
select {
case temp1 := <-rft.ackCh:
temp := temp1.(*AppendReply)
rft.Info.Println("Ack received")
if temp.Reply {
rft.nextIndex[temp.Fid] = temp.LogLength
rft.matchIndex[temp.Fid] = temp.LogLength
//update commitindex
for n := rft.commitIndex + 1; n < len(rft.LogArray); n++ {
maj := 0
for _, server := range rft.clusterConfig.Servers {
if rft.matchIndex[server.Id] >= n {
maj++
}
}
if maj > len(rft.clusterConfig.Servers)/2 && rft.LogArray[n].Term == rft.currentTerm {
rft.commitIndex = n
}
}
} else {
rft.nextIndex[temp.Fid]--
}
case <-killCh:
flag = true
break
}
if flag {
break
}
}
}
//entry loop to raft
func (rft *Raft) loop() {
state := FOLLOWER
......@@ -433,19 +378,48 @@ func getRandTime(log *log.Logger) time.Duration {
return t
}
func doCastVoteRPC(hostname string, logPort int, temp *VoteRequestReply, Info *log.Logger) {
Info.Println("Cast vote RPC")
//rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
func (rft *Raft) handleMajority(reply *VoteRequestReply) {
majority := len(rft.clusterConfig.Servers) / 2
rft.Info.Println("[C]: favorable vote")
if reply.Reply {
rft.voters++
rft.Info.Println("[C]: count", rft.voters)
if !rft.isLeader && rft.voters > majority {
rft.shiftStatusCh <- LEADER
rft.isLeader = true
}
} else {
if rft.currentTerm < reply.CurrentTerm {
rft.updateTermAndVote(reply.CurrentTerm)
rft.shiftStatusCh <- FOLLOWER
}
}
}
//leader will call this
func (rft *Raft) handleAppendReply(temp *AppendReply) {
//this is reply to heartbeat, ignore it
if temp.Fid == -1 {
return
}
if temp.Reply {
rft.nextIndex[temp.Fid] = temp.LogLength
rft.matchIndex[temp.Fid] = temp.LogLength
//update commitindex
for n := rft.commitIndex + 1; n < len(rft.LogArray); n++ {
maj := 0
for _, server := range rft.clusterConfig.Servers {
if rft.matchIndex[server.Id] >= n {
maj++
}
}
if maj > len(rft.clusterConfig.Servers)/2 && rft.LogArray[n].Term == rft.currentTerm {
rft.commitIndex = n
}
}
} else {
rft.nextIndex[temp.Fid]--
}
reply := new(Reply)
args := temp
Info.Println("Calling cast vote RPC", logPort)
castVoteCall := client.Go("RaftRPCService.CastVoteRPC", args, reply, nil) //let go allocate done channel
Info.Println("Reply", castVoteCall, reply.X)
castVoteCall = <-castVoteCall.Done
}
func doAppendReplyRPC(hostname string, logPort int, temp *AppendReply, Info *log.Logger) {
......@@ -457,39 +431,39 @@ func doAppendReplyRPC(hostname string, logPort int, temp *AppendReply, Info *log
}
reply := new(Reply)
args := temp
Info.Println("Calling AppendReply RPC", logPort)
Info.Println("[F]: Calling AppendReply RPC", logPort)
appendReplyCall := client.Go("RaftRPCService.AppendReplyRPC", args, reply, nil) //let go allocate done channel
appendReplyCall = <-appendReplyCall.Done
Info.Println("Reply", appendReplyCall, reply.X)
}
func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest, Info *log.Logger) {
Info.Println("Vote request RPC")
func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest, rft *Raft) {
rft.Info.Println("[C]:Vote request RPC")
//rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
rft.Info.Fatal("Dialing:", err)
}
reply := new(Reply)
reply := new(VoteRequestReply)
args := temp
Info.Println("Calling vote request RPC", logPort)
//rft.Info.Println("Calling vote request RPC", logPort)
voteReqCall := client.Go("RaftRPCService.VoteRequestRPC", args, reply, nil) //let go allocate done channel
voteReqCall = <-voteReqCall.Done
Info.Println("Reply", voteReqCall, reply.X)
rft.handleMajority(reply)
}
//make append entries rpc call to followers
func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC, Info *log.Logger) {
func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC, rft *Raft) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
rft.Info.Fatal("[L]: Dialing:", err)
}
reply := new(Reply)
reply := new(AppendReply)
args := temp
Info.Println("RPC Called", logPort)
rft.Info.Println("[L]: RPC Called", logPort)
appendCall := client.Go("RaftRPCService.AppendRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
Info.Println("Reply", appendCall, reply.X)
rft.handleAppendReply(reply)
}
func (rft *Raft) updateTermAndVote(term int) {
......@@ -499,18 +473,6 @@ func (rft *Raft) updateTermAndVote(term int) {
rft.votedFor = NULL_VOTE
}
func (rft *Raft) LogF(msg string) {
rft.Info.Println("F:", rft.id, msg)
}
func (rft *Raft) LogC(msg string) {
rft.Info.Println("C:", rft.id, msg)
}
func (rft *Raft) LogL(msg string) {
rft.Info.Println("L:", rft.id, msg)
}
func (rft *Raft) follower() int {
//start candidate timeout
rft.et = time.NewTimer(getRandTime(rft.Info))
......@@ -518,19 +480,19 @@ func (rft *Raft) follower() int {
//wrap in select
select {
case <-rft.et.C:
rft.LogF("election timeout")
rft.Info.Println("[F]: election timeout")
return CANDIDATE
case event := <-rft.eventCh:
switch event.(type) {
case *ClientAppend:
rft.LogF("got client append")
rft.Info.Println("[F]: got client append")
//Do not handle clients in follower mode.
//Send it back up the pipeline.
event.(*ClientAppend).logEntry.SetCommitted(false)
rft.eventCh <- event.(*ClientAppend).logEntry
case *VoteRequest:
rft.LogF("got vote request")
rft.Info.Println("[F]: got vote request")
req := event.(*VoteRequest)
reply := false
if req.Term < rft.currentTerm {
......@@ -546,15 +508,16 @@ func (rft *Raft) follower() int {
if reply && rft.votedFor == NULL_VOTE {
rft.et.Reset(getRandTime(rft.Info))
rft.LogF("reset timer after voting")
rft.Info.Println("[F]: timer reset, after vote")
writeFile(VOTED_FOR, rft.id, req.CandidateId, rft.Info)
rft.LogF("voted for " + strconv.Itoa(req.CandidateId))
rft.Info.Println("[F]: voted for ", strconv.Itoa(req.CandidateId))
rft.votedFor = req.CandidateId
}
//let the asker know about the vote
voteReply := &VoteRequestReply{rft.currentTerm, reply}
server := rft.clusterConfig.Servers[req.CandidateId]
doCastVoteRPC(server.Hostname, server.LogPort, voteReply, rft.Info)
//server := rft.clusterConfig.Servers[req.CandidateId]
//doCastVoteRPC(server.Hostname, server.LogPort, voteReply, rft.Info)
rft.voteReplyCh <- voteReply
case *AppendRPC:
//rft.LogF("got append rpc")
......@@ -562,7 +525,10 @@ func (rft *Raft) follower() int {
//rft.LogF("reset timer on appendRPC")
req := event.(*AppendRPC)
if len(req.Entries) == 0 { //heartbeat
//rft.LogF("got hearbeat from " + strconv.Itoa(req.leaderId))
rft.Info.Println("[F]: got hearbeat from " + strconv.Itoa(req.LeaderId))
temp := &AppendReply{-1, true, -1, -1}
rft.Info.Println("[F]: sending dummy reply to " + strconv.Itoa(req.LeaderId))
rft.appendReplyCh <- temp
continue
}
......@@ -580,7 +546,7 @@ func (rft *Raft) follower() int {
//first condition to prevent out of bounds except
if !(req.PrevLogIndex == LOG_INVALID_INDEX) && rft.LogArray[req.PrevLogIndex].Term != req.PrevLogTerm {
rft.LogF("terms unequal")
rft.Info.Println("[F]: terms unequal")
reply = false
}
......@@ -609,11 +575,12 @@ func (rft *Raft) follower() int {
}
temp := &AppendReply{rft.currentTerm, reply, rft.id, len(rft.LogArray)}
doAppendReplyRPC(rft.clusterConfig.Servers[req.LeaderId].Hostname, rft.clusterConfig.Servers[req.LeaderId].LogPort, temp, rft.Info)
rft.appendReplyCh <- temp
//doAppendReplyRPC(rft.clusterConfig.Servers[req.LeaderId].Hostname, rft.clusterConfig.Servers[req.LeaderId].LogPort, temp, rft.Info)
if reply {
rft.persistLog()
}
rft.Info.Println("F: log is size", len(rft.LogArray))
rft.Info.Println("[F]: log is size", len(rft.LogArray))
}
}
}
......@@ -621,7 +588,7 @@ func (rft *Raft) follower() int {
func (rft *Raft) candidate() int {
//increment current term
rft.LogC("became candidate")
rft.Info.Println("[C]: became candidate")
writeFile(CURRENT_TERM, rft.id, rft.currentTerm+1, rft.Info)
rft.currentTerm++
//vote for self
......@@ -630,7 +597,7 @@ func (rft *Raft) candidate() int {
rft.votedFor = rft.id
//reset timer
rft.et = time.NewTimer(getRandTime(rft.Info))
rft.Info.Println(rft.id, "candidate got new timer")
rft.Info.Println("[C]:", rft.id, "candidate got new timer")
//create a vote request object
req := &VoteRequest{
Term: rft.currentTerm,
......@@ -645,17 +612,18 @@ func (rft *Raft) candidate() int {
}
//reinitialize rft.monitorVotesCh
rft.monitorVotesCh = make(chan RaftEvent)
killCh := make(chan bool)
go monitorVotesChannelRoutine(rft, killCh)
//rft.monitorVotesCh = make(chan RaftEvent)
//killCh := make(chan bool)
//go monitorVotesChannelRoutine(rft, killCh)
//time.Sleep(time.Millisecond * 10)
//send vote request to all servers
for _, server := range rft.clusterConfig.Servers {
rft.Info.Println(server.Id)
//rft.Info.Println(server.Id)
if server.Id != rft.id {
rft.LogC("Sent vote request to " + strconv.Itoa(server.Id))
doVoteRequestRPC(server.Hostname, server.LogPort, req, rft.Info)
//rft.Info.Println("[C]: Vote request to " + strconv.Itoa(server.Id))
go doVoteRequestRPC(server.Hostname, server.LogPort, req, rft)
//rft.handleMajority(reply)
}
}
......@@ -663,24 +631,20 @@ func (rft *Raft) candidate() int {
select {
case status := <-rft.shiftStatusCh:
if status == LEADER {
rft.LogC("C to L")
killCh <- true
rft.Info.Println("[Switch]: C to L")
return LEADER
} else {
rft.LogC("C to F")
killCh <- true
rft.Info.Println("[Switch]: C to F")
return FOLLOWER
}
case <-rft.et.C:
rft.LogC("C to C")
killCh <- true
rft.Info.Println("[Switch]: C to C")
return CANDIDATE
case event := <-rft.eventCh:
switch event.(type) {
case (*AppendRPC):
rft.LogC("C to F")
rft.Info.Println("[Switch]: C to F")
rft.et.Reset(getRandTime(rft.Info))
killCh <- true
return FOLLOWER
}
}
......@@ -704,8 +668,8 @@ func enforceLog(rft *Raft) {
}
//appendRPC call
doAppendRPCCall(server.Hostname, server.LogPort, req, rft.Info)
rft.LogL("sent append entries to " + strconv.Itoa(server.Id))
doAppendRPCCall(server.Hostname, server.LogPort, req, rft)
rft.Info.Println("[L]: Sent append entries", strconv.Itoa(server.Id))
}
time.Sleep(time.Millisecond * 2)
}
......@@ -713,7 +677,7 @@ func enforceLog(rft *Raft) {
}
func (rft *Raft) leader() int {
rft.LogL("became leader")
rft.Info.Println("[L]: became leader")
heartbeat := time.NewTimer(time.Millisecond * HEARTBEAT_TIMEOUT)
heartbeatReq := new(AppendRPC)
heartbeatReq.Entries = []*LogEntryData{}
......@@ -735,6 +699,13 @@ func (rft *Raft) leader() int {
Term: rft.currentTerm,
})
newEntry := &LogEntryData{
Id: 3,
Data: []byte("goodbye"),
Committed: false,
Term: rft.currentTerm,
}
//build nextIndex and matchIndex
for i := 0; i < len(rft.nextIndex); i++ {
rft.nextIndex[i] = 0
......@@ -742,6 +713,10 @@ func (rft *Raft) leader() int {
}
go enforceLog(rft)
go func() {
time.Sleep(time.Second * 2)
rft.LogArray = append(rft.LogArray, newEntry)
}()
for {
select {
......@@ -749,7 +724,8 @@ func (rft *Raft) leader() int {
for _, server := range rft.clusterConfig.Servers {
if server.Id != rft.id {
//doRPCCall for hearbeat
doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq, rft.Info)
go doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq, rft)
rft.Info.Println("[L]: Sent heartbeat", strconv.Itoa(server.Id))
}
}
heartbeat.Reset(time.Millisecond * HEARTBEAT_TIMEOUT)
......@@ -758,7 +734,7 @@ func (rft *Raft) leader() int {
switch event.(type) {
case *ClientAppend:
//write data to log
rft.LogL("got client data")
rft.Info.Println("[L]: got client data")
entry := event.(*ClientAppend).logEntry
rft.LogArray = append(rft.LogArray, entry)
//todo:apply to state machine
......
......@@ -73,27 +73,27 @@ type Reply struct {
//arguments: pointer to argument struct (has LogEntryData), pointer to reply struct
//returns: error
//receiver: pointer to AppendEntries
func (t *RaftRPCService) AppendRPC(args *raft.AppendRPC, reply *Reply) error {
func (t *RaftRPCService) AppendRPC(args *raft.AppendRPC, reply *raft.AppendReply) error {
Info.Println("append RPC invoked")
rft.AddToEventChannel(args)
reply.X = 1
temp := rft.FetchAppendReply().(*raft.AppendReply)
reply.CurrentTerm = temp.CurrentTerm
reply.Reply = temp.Reply
reply.Fid = temp.Fid
reply.LogLength = temp.LogLength
return nil
/*Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
reply.X = 1
return nil*/
}
func (t *RaftRPCService) AppendReplyRPC(args *raft.AppendReply, reply *Reply) error {
Info.Println("append reply to leader RPC invoked")
rft.AddToEventChannel(args)
reply.X = 1
return nil
/*Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
reply.X = 1
return nil*/
}
//func (t *RaftRPCService) AppendReplyRPC(args *raft.AppendReply, reply *Reply) error {
// Info.Println("append reply to leader RPC invoked")
// rft.AddToEventChannel(args)
// reply.X = 1
// return nil
// /*Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
// rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
// reply.X = 1
// return nil*/
//}
//RPC for follower server. To let followers know that and entry can be committed.
//arguments: pointer to argument struct (has LogEntry), pointer to reply struct
......@@ -107,19 +107,21 @@ func (t *RaftRPCService) CommitRPC(args *raft.CommitData, reply *Reply) error {
return nil
}
func (t *RaftRPCService) VoteRequestRPC(args *raft.VoteRequest, reply *Reply) error {
func (t *RaftRPCService) VoteRequestRPC(args *raft.VoteRequest, reply *raft.VoteRequestReply) error {
Info.Println("Request Vote RPC received")
rft.AddToEventChannel(args)
reply.X = 1
temp := rft.FetchVoteReply().(*raft.VoteRequestReply)
reply.CurrentTerm = temp.CurrentTerm
reply.Reply = temp.Reply
return nil
}
func (t *RaftRPCService) CastVoteRPC(args *raft.VoteRequestReply, reply *Reply) error {
Info.Println("Cast Vote RPC received")
rft.AddToMonitorVotesChannel(args)
reply.X = 1
return nil
}
//func (t *RaftRPCService) CastVoteRPC(args *raft.VoteRequestReply, reply *Reply) error {
// Info.Println("Cast Vote RPC received")
// rft.AddToMonitorVotesChannel(args)
// reply.X = 1
// return nil
//}
//Initialize all the things necessary for start the server for inter raft communication.
//The servers are running on ports 20000+serverId. {0..4}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment