Commit f18eef0f authored by Sushant Mahajan's avatar Sushant Mahajan

leader election working

parent cbd36253
#! /bin/bash #! /bin/bash
rm {1..5} currentTerm* votedFor* log* rm {0..4} currentTerm* votedFor* log*
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"log" "log"
"math/rand" "math/rand"
"net" "net"
"net/rpc"
"os" "os"
"strconv" "strconv"
"sync" "sync"
...@@ -64,31 +65,31 @@ type ClientAppend struct { ...@@ -64,31 +65,31 @@ type ClientAppend struct {
} }
type VoteRequest struct { type VoteRequest struct {
term int Term int
candidateId int CandidateId int
lastLogIndex int LastLogIndex int
lastLogTerm int LastLogTerm int
} }
type VoteRequestReply struct { type VoteRequestReply struct {
currentTerm int CurrentTerm int
reply bool Reply bool
} }
type AppendReply struct { type AppendReply struct {
currentTerm int CurrentTerm int
reply bool Reply bool
fid int Fid int
logLength int LogLength int
} }
type AppendRPC struct { type AppendRPC struct {
term int Term int
leaderId int LeaderId int
prevLogIndex int PrevLogIndex int
prevLogTerm int PrevLogTerm int
leaderCommit int LeaderCommit int
entries []*LogEntryData Entries []*LogEntryData
} }
// Structure used for replying to the RPC calls // Structure used for replying to the RPC calls
...@@ -120,9 +121,9 @@ type Raft struct { ...@@ -120,9 +121,9 @@ type Raft struct {
currentTerm int currentTerm int
commitIndex int commitIndex int
voters int voters int
monitorVotesCh chan VoteRequestReply monitorVotesCh chan RaftEvent
shiftStatusCh chan int shiftStatusCh chan int
ackCh chan AppendReply ackCh chan RaftEvent
et *time.Timer et *time.Timer
isLeader bool isLeader bool
lastApplied int lastApplied int
...@@ -195,7 +196,7 @@ func getSingleDataFromFile(name string, serverId int, info *log.Logger) int { ...@@ -195,7 +196,7 @@ func getSingleDataFromFile(name string, serverId int, info *log.Logger) int {
if file, err := os.Open(filename); err != nil { if file, err := os.Open(filename); err != nil {
defer file.Close() defer file.Close()
ioutil.WriteFile(filename, []byte("0"), 0666) ioutil.WriteFile(filename, []byte("0"), 0666)
info.Println("wrote in " + filename + " file") //info.Println("wrote in " + filename + " file")
return 0 return 0
} else { } else {
if data, err := ioutil.ReadFile(file.Name()); err != nil { if data, err := ioutil.ReadFile(file.Name()); err != nil {
...@@ -207,7 +208,7 @@ func getSingleDataFromFile(name string, serverId int, info *log.Logger) int { ...@@ -207,7 +208,7 @@ func getSingleDataFromFile(name string, serverId int, info *log.Logger) int {
info.Println("error converting") info.Println("error converting")
return FILE_ERR return FILE_ERR
} else { } else {
info.Println("Converted success "+filename, t) //info.Println("Converted success "+filename, t)
return t return t
} }
} }
...@@ -221,7 +222,7 @@ func writeFile(name string, serverId int, data int, info *log.Logger) int { ...@@ -221,7 +222,7 @@ func writeFile(name string, serverId int, data int, info *log.Logger) int {
return FILE_ERR return FILE_ERR
} else { } else {
ioutil.WriteFile(filename, []byte(strconv.Itoa(data)), 0666) ioutil.WriteFile(filename, []byte(strconv.Itoa(data)), 0666)
info.Println("wrote in " + filename + " file") //info.Println("wrote in " + filename + " file")
return FILE_WRITTEN //file written return FILE_WRITTEN //file written
} }
} }
...@@ -230,19 +231,21 @@ func writeFile(name string, serverId int, data int, info *log.Logger) int { ...@@ -230,19 +231,21 @@ func writeFile(name string, serverId int, data int, info *log.Logger) int {
// commitCh is the channel that the kvstore waits on for committed messages. // commitCh is the channel that the kvstore waits on for committed messages.
// When the process starts, the local disk log is read and all committed // When the process starts, the local disk log is read and all committed
// entries are recovered and replayed // entries are recovered and replayed
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, eventCh chan RaftEvent, monitorVotesCh chan bool, toDebug bool) (*Raft, error) { func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, Info *log.Logger) (*Raft, error) {
rft := new(Raft) rft := new(Raft)
rft.commitCh = commitCh rft.commitCh = commitCh
rft.clusterConfig = config rft.clusterConfig = config
rft.id = thisServerId rft.id = thisServerId
rft.eventCh = eventCh rft.Info = Info
rft.Info = getLogger(thisServerId, toDebug)
if v := getSingleDataFromFile(CURRENT_TERM, thisServerId, rft.Info); v != FILE_ERR { if v := getSingleDataFromFile(CURRENT_TERM, thisServerId, rft.Info); v != FILE_ERR {
rft.currentTerm = v rft.currentTerm = v
} else { } else {
rft.currentTerm = 0 rft.currentTerm = 0
} }
rft.monitorVotesCh = monitorVotesCh rft.monitorVotesCh = make(chan RaftEvent)
rft.ackCh = make(chan RaftEvent)
rft.eventCh = make(chan RaftEvent)
rft.shiftStatusCh = make(chan int)
getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file. getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file.
rft.isLeader = false rft.isLeader = false
rft.nextIndex = make([]int, len(config.Servers)) rft.nextIndex = make([]int, len(config.Servers))
...@@ -305,14 +308,14 @@ func (rft *Raft) AddToChannel(entry LogEntry) { ...@@ -305,14 +308,14 @@ func (rft *Raft) AddToChannel(entry LogEntry) {
} }
//AddToEventChannel //AddToEventChannel
func (rft *Raft) AddToEventChannel(entry Entry) { func (rft *Raft) AddToEventChannel(entry RaftEvent) {
rft.Info.Println("Adding to event channel", entry) rft.Info.Println("Adding to event channel", entry)
rft.eventCh <- entry rft.eventCh <- entry
} }
//AddToMonitorVotesChannel //AddToMonitorVotesChannel
func (rft *Raft) AddToMonitorVotesChannel(entry Entry) { func (rft *Raft) AddToMonitorVotesChannel(entry RaftEvent) {
rft.Info.Println("Adding to montor votes", entry) rft.Info.Println("Adding to monitor votes", entry)
rft.monitorVotesCh <- entry rft.monitorVotesCh <- entry
} }
...@@ -330,9 +333,9 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) { ...@@ -330,9 +333,9 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) {
config.Path = "" config.Path = ""
config.Servers = make([]ServerConfig, num_servers) config.Servers = make([]ServerConfig, num_servers)
for i := 1; i <= num_servers; i++ { for i := 0; i < num_servers; i++ {
curr_server, _ := NewServerConfig(i) curr_server, _ := NewServerConfig(i)
config.Servers[i-1] = *(curr_server) config.Servers[i] = *(curr_server)
} }
return config, nil return config, nil
...@@ -342,21 +345,23 @@ func (e ErrRedirect) Error() string { ...@@ -342,21 +345,23 @@ func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(0) return "Redirect to server " + strconv.Itoa(0)
} }
func monitorVotesChannelRoutine(rft *Raft) { func monitorVotesChannelRoutine(rft *Raft, killCh chan bool) {
majority := len(rft.clusterConfig.Servers) / 2 majority := len(rft.clusterConfig.Servers) / 2
flag := false flag := false
for { for {
select { select {
case temp := <-rft.monitorVotesCh: case temp1 := <-rft.monitorVotesCh:
if temp.reply { temp := temp1.(*VoteRequestReply)
rft.Info.Println("favorable vote")
if temp.Reply {
rft.voters++ rft.voters++
if !rft.isLeader && rft.voters >= majority { if !rft.isLeader && rft.voters >= majority {
rft.shiftStatusCh <- LEADER rft.shiftStatusCh <- LEADER
rft.isLeader = true rft.isLeader = true
} }
} else { } else {
if rft.currentTerm < temp.currentTerm { if rft.currentTerm < temp.CurrentTerm {
rft.updateTermAndVote(temp.currentTerm) rft.updateTermAndVote(temp.CurrentTerm)
rft.shiftStatusCh <- FOLLOWER rft.shiftStatusCh <- FOLLOWER
} }
} }
...@@ -375,11 +380,12 @@ func monitorAckChannel(rft *Raft, killCh chan bool) { ...@@ -375,11 +380,12 @@ func monitorAckChannel(rft *Raft, killCh chan bool) {
flag := false flag := false
for { for {
select { select {
case temp := <-rft.ackCh: case temp1 := <-rft.ackCh:
temp := temp1.(*AppendReply)
rft.Info.Println("Ack received") rft.Info.Println("Ack received")
if temp.reply { if temp.Reply {
rft.nextIndex[temp.fid] = temp.logLength rft.nextIndex[temp.Fid] = temp.LogLength
rft.matchIndex[temp.fid] = temp.logLength rft.matchIndex[temp.Fid] = temp.LogLength
//update commitindex //update commitindex
for n := rft.commitIndex + 1; n < len(rft.LogArray); n++ { for n := rft.commitIndex + 1; n < len(rft.LogArray); n++ {
maj := 0 maj := 0
...@@ -388,12 +394,12 @@ func monitorAckChannel(rft *Raft, killCh chan bool) { ...@@ -388,12 +394,12 @@ func monitorAckChannel(rft *Raft, killCh chan bool) {
maj++ maj++
} }
} }
if maj > len(rft.clusterConfig.Servers)/2 && rft.LogArray[n].Term == currentTerm { if maj > len(rft.clusterConfig.Servers)/2 && rft.LogArray[n].Term == rft.currentTerm {
rft.commitIndex = n rft.commitIndex = n
} }
} }
} else { } else {
rft.nextIndex[temp.fid]-- rft.nextIndex[temp.Fid]--
} }
case <-killCh: case <-killCh:
flag = true flag = true
...@@ -423,11 +429,11 @@ func (rft *Raft) loop() { ...@@ -423,11 +429,11 @@ func (rft *Raft) loop() {
func getRandTime(log *log.Logger) time.Duration { func getRandTime(log *log.Logger) time.Duration {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
t := time.Millisecond * time.Duration(rand.Intn(MAX_TIMEOUT_ELEC-MIN_TIMEOUT_ELEC)+MIN_TIMEOUT_ELEC) t := time.Millisecond * time.Duration(rand.Intn(MAX_TIMEOUT_ELEC-MIN_TIMEOUT_ELEC)+MIN_TIMEOUT_ELEC)
//log.Println("New rand time", t) log.Println("New rand time", t)
return t return t
} }
func doCastVoteRPC(hostname string, logPort int, temp *VoteRequestReply) { func doCastVoteRPC(hostname string, logPort int, temp *VoteRequestReply, Info *log.Logger) {
Info.Println("Cast vote RPC") Info.Println("Cast vote RPC")
//rpc call to the caller //rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
...@@ -437,12 +443,12 @@ func doCastVoteRPC(hostname string, logPort int, temp *VoteRequestReply) { ...@@ -437,12 +443,12 @@ func doCastVoteRPC(hostname string, logPort int, temp *VoteRequestReply) {
reply := new(Reply) reply := new(Reply)
args := temp args := temp
Info.Println("Calling cast vote RPC", logPort) Info.Println("Calling cast vote RPC", logPort)
castVoteCall := client.Go("RequestVoteReply.CastVoteRPC", args, reply, nil) //let go allocate done channel castVoteCall := client.Go("RaftRPCService.CastVoteRPC", args, reply, nil) //let go allocate done channel
castVoteCall = <-castVoteCall.Done
Info.Println("Reply", castVoteCall, reply.X) Info.Println("Reply", castVoteCall, reply.X)
castVoteCall = <-castVoteCall.Done
} }
func doAppendReplyRPC(hostname string, logPort int, temp *AppendReply) { func doAppendReplyRPC(hostname string, logPort int, temp *AppendReply, Info *log.Logger) {
Info.Println("append reply RPC") Info.Println("append reply RPC")
//rpc call to the caller //rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
...@@ -452,12 +458,12 @@ func doAppendReplyRPC(hostname string, logPort int, temp *AppendReply) { ...@@ -452,12 +458,12 @@ func doAppendReplyRPC(hostname string, logPort int, temp *AppendReply) {
reply := new(Reply) reply := new(Reply)
args := temp args := temp
Info.Println("Calling AppendReply RPC", logPort) Info.Println("Calling AppendReply RPC", logPort)
appendReplyCall := client.Go("AppendEntries.AppendReplyRPC", args, reply, nil) //let go allocate done channel appendReplyCall := client.Go("RaftRPCService.AppendReplyRPC", args, reply, nil) //let go allocate done channel
appendReplyCall = <-appendReplyCall.Done appendReplyCall = <-appendReplyCall.Done
Info.Println("Reply", appendReplyCall, reply.X) Info.Println("Reply", appendReplyCall, reply.X)
} }
func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest) { func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest, Info *log.Logger) {
Info.Println("Vote request RPC") Info.Println("Vote request RPC")
//rpc call to the caller //rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
...@@ -466,14 +472,14 @@ func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest) { ...@@ -466,14 +472,14 @@ func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest) {
} }
reply := new(Reply) reply := new(Reply)
args := temp args := temp
Info.Println("Calling vote requesr RPC", logPort) Info.Println("Calling vote request RPC", logPort)
voteReqCall := client.Go("VoteRequest.VoteRequestRPC", args, reply, nil) //let go allocate done channel voteReqCall := client.Go("RaftRPCService.VoteRequestRPC", args, reply, nil) //let go allocate done channel
voteReqCall = <-voteReqCall.Done voteReqCall = <-voteReqCall.Done
Info.Println("Reply", voteReqCall, reply.X) Info.Println("Reply", voteReqCall, reply.X)
} }
//make append entries rpc call to followers //make append entries rpc call to followers
func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC) { func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC, Info *log.Logger) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil { if err != nil {
Info.Fatal("Dialing:", err) Info.Fatal("Dialing:", err)
...@@ -481,7 +487,7 @@ func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC) { ...@@ -481,7 +487,7 @@ func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC) {
reply := new(Reply) reply := new(Reply)
args := temp args := temp
Info.Println("RPC Called", logPort) Info.Println("RPC Called", logPort)
appendCall := client.Go("AppendEntries.AppendRPC", args, reply, nil) //let go allocate done channel appendCall := client.Go("RaftRPCService.AppendRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done appendCall = <-appendCall.Done
Info.Println("Reply", appendCall, reply.X) Info.Println("Reply", appendCall, reply.X)
} }
...@@ -527,83 +533,83 @@ func (rft *Raft) follower() int { ...@@ -527,83 +533,83 @@ func (rft *Raft) follower() int {
rft.LogF("got vote request") rft.LogF("got vote request")
req := event.(*VoteRequest) req := event.(*VoteRequest)
reply := false reply := false
if req.term < rft.currentTerm { if req.Term < rft.currentTerm {
reply = false reply = false
} }
if req.term > rft.currentTerm || if req.Term > rft.currentTerm ||
req.lastLogTerm > rft.currentTerm || req.LastLogTerm > rft.currentTerm ||
(req.lastLogTerm == rft.currentTerm && req.lastLogIndex >= len(rft.LogArray)) { (req.LastLogTerm == rft.currentTerm && req.LastLogIndex >= len(rft.LogArray)) {
rft.updateTermAndVote(req.term) rft.updateTermAndVote(req.Term)
reply = true reply = true
} }
if reply && rft.votedFor == NULL_VOTE { if reply && rft.votedFor == NULL_VOTE {
rft.et.Reset(getRandTime(rft.Info)) rft.et.Reset(getRandTime(rft.Info))
rft.LogF("reset timer after voting") rft.LogF("reset timer after voting")
writeFile(VOTED_FOR, rft.id, req.candidateId, rft.Info) writeFile(VOTED_FOR, rft.id, req.CandidateId, rft.Info)
rft.LogF("voted for " + strconv.Itoa(req.candidateId)) rft.LogF("voted for " + strconv.Itoa(req.CandidateId))
rft.votedFor = req.candidateId rft.votedFor = req.CandidateId
} }
//let the asker know about the vote //let the asker know about the vote
voteReply := &VoteRequestReply{rft.currentTerm, reply} voteReply := &VoteRequestReply{rft.currentTerm, reply}
server := rft.clusterConfig[req.candidateId] server := rft.clusterConfig.Servers[req.CandidateId]
doCastVoteRPC(server.Hostname, server.LogPort, voteReply) doCastVoteRPC(server.Hostname, server.LogPort, voteReply, rft.Info)
case *AppendRPC: case *AppendRPC:
//rft.LogF("got append rpc") //rft.LogF("got append rpc")
rft.et.Reset(getRandTime(rft.Info)) rft.et.Reset(getRandTime(rft.Info))
//rft.LogF("reset timer on appendRPC") //rft.LogF("reset timer on appendRPC")
req := event.(*AppendRPC) req := event.(*AppendRPC)
if len(req.entries) == 0 { //heartbeat if len(req.Entries) == 0 { //heartbeat
//rft.LogF("got hearbeat from " + strconv.Itoa(req.leaderId)) //rft.LogF("got hearbeat from " + strconv.Itoa(req.leaderId))
continue continue
} }
reply := true reply := true
if req.prevLogIndex == LOG_INVALID_INDEX || req.prevLogIndex == LOG_INVALID_TERM { if req.PrevLogIndex == LOG_INVALID_INDEX || req.PrevLogIndex == LOG_INVALID_TERM {
rft.updateTermAndVote(req.term) rft.updateTermAndVote(req.Term)
reply = true reply = true
} else if req.term < rft.currentTerm { } else if req.Term < rft.currentTerm {
reply = false reply = false
} else if req.term > rft.currentTerm { } else if req.Term > rft.currentTerm {
rft.updateTermAndVote(req.term) rft.updateTermAndVote(req.Term)
reply = true reply = true
} }
//first condition to prevent out of bounds except //first condition to prevent out of bounds except
if !(req.prevLogIndex == LOG_INVALID_INDEX) && rft.LogArray[req.prevLogIndex].Term != req.prevLogTerm { if !(req.PrevLogIndex == LOG_INVALID_INDEX) && rft.LogArray[req.PrevLogIndex].Term != req.PrevLogTerm {
rft.LogF("terms unequal") rft.LogF("terms unequal")
reply = false reply = false
} }
if reply { if reply {
i := req.prevLogIndex + 1 i := req.PrevLogIndex + 1
for ; i < len(rft.LogArray); i++ { for ; i < len(rft.LogArray); i++ {
if req.prevLogIndex == LOG_INVALID_INDEX || req.entries[i-req.prevLogIndex-1].Term != rft.LogArray[i].Term { if req.PrevLogIndex == LOG_INVALID_INDEX || req.Entries[i-req.PrevLogIndex-1].Term != rft.LogArray[i].Term {
break break
} }
} }
if req.prevLogIndex == LOG_INVALID_INDEX { if req.PrevLogIndex == LOG_INVALID_INDEX {
rft.LogArray = append(rft.LogArray, req.entries...) rft.LogArray = append(rft.LogArray, req.Entries...)
} else { } else {
rft.LogArray = append(rft.LogArray[0:i], req.entries[i-req.prevLogIndex-1:]...) rft.LogArray = append(rft.LogArray[0:i], req.Entries[i-req.PrevLogIndex-1:]...)
} }
//todo:also add to log //todo:also add to log
if req.leaderCommit > rft.commitIndex { if req.LeaderCommit > rft.commitIndex {
if req.leaderCommit > len(rft.LogArray)-1 { if req.LeaderCommit > len(rft.LogArray)-1 {
rft.commitIndex = len(rft.LogArray) - 1 rft.commitIndex = len(rft.LogArray) - 1
} else { } else {
rft.commitIndex = req.leaderCommit rft.commitIndex = req.LeaderCommit
} }
} }
} }
temp := &AppendReply{rft.currentTerm, reply, rft.id, len(rft.LogArray)} temp := &AppendReply{rft.currentTerm, reply, rft.id, len(rft.LogArray)}
doAppendReplyRPC(rft.clusterConfig.Servers[req.leaderId].Hostname, rft.clusterConfig.Servers[req.leaderId].LogPort, temp) doAppendReplyRPC(rft.clusterConfig.Servers[req.LeaderId].Hostname, rft.clusterConfig.Servers[req.LeaderId].LogPort, temp, rft.Info)
if reply { if reply {
rft.persistLog() rft.persistLog()
} }
...@@ -627,27 +633,29 @@ func (rft *Raft) candidate() int { ...@@ -627,27 +633,29 @@ func (rft *Raft) candidate() int {
rft.Info.Println(rft.id, "candidate got new timer") rft.Info.Println(rft.id, "candidate got new timer")
//create a vote request object //create a vote request object
req := &VoteRequest{ req := &VoteRequest{
term: rft.currentTerm, Term: rft.currentTerm,
candidateId: rft.id, CandidateId: rft.id,
} }
if len(rft.LogArray) == 0 { if len(rft.LogArray) == 0 {
req.lastLogIndex = LOG_INVALID_INDEX req.LastLogIndex = LOG_INVALID_INDEX
req.lastLogTerm = LOG_INVALID_TERM req.LastLogTerm = LOG_INVALID_TERM
} else { } else {
req.lastLogIndex = len(rft.LogArray) - 1 req.LastLogIndex = len(rft.LogArray) - 1
req.lastLogTerm = rft.LogArray[req.lastLogIndex].Term req.LastLogTerm = rft.LogArray[req.LastLogIndex].Term
} }
//reinitialize rft.monitorVotesCh //reinitialize rft.monitorVotesCh
rft.monitorVotesCh = make(chan *VoteRequestReply) rft.monitorVotesCh = make(chan RaftEvent)
killCh := make(chan bool) killCh := make(chan bool)
go monitorVotesChannelRoutine(rft, killCh) go monitorVotesChannelRoutine(rft, killCh)
//time.Sleep(time.Millisecond * 10)
//send vote request to all servers //send vote request to all servers
for _, server := range rft.clusterConfig.Servers { for _, server := range rft.clusterConfig.Servers {
rft.Info.Println(server.Id)
if server.Id != rft.id { if server.Id != rft.id {
rft.LogC("sent vote request to " + strconv.Itoa(server.Id)) rft.LogC("Sent vote request to " + strconv.Itoa(server.Id))
doVoteRequestRPC(server.Hostname, server.LogPort, req) doVoteRequestRPC(server.Hostname, server.LogPort, req, rft.Info)
} }
} }
...@@ -684,37 +692,21 @@ func enforceLog(rft *Raft) { ...@@ -684,37 +692,21 @@ func enforceLog(rft *Raft) {
for _, server := range rft.clusterConfig.Servers { for _, server := range rft.clusterConfig.Servers {
if rft.id != server.Id && len(rft.LogArray)-1 >= rft.nextIndex[server.Id] { if rft.id != server.Id && len(rft.LogArray)-1 >= rft.nextIndex[server.Id] {
req := &AppendRPC{} req := &AppendRPC{}
req.term = rft.currentTerm req.Term = rft.currentTerm
req.leaderId = rft.id req.LeaderId = rft.id
req.leaderCommit = rft.commitIndex req.LeaderCommit = rft.commitIndex
req.entries = rft.LogArray[rft.nextIndex[server.Id]:len(rft.LogArray)] req.Entries = rft.LogArray[rft.nextIndex[server.Id]:len(rft.LogArray)]
req.prevLogIndex = rft.nextIndex[server.Id] - 1 req.PrevLogIndex = rft.nextIndex[server.Id] - 1
if req.prevLogIndex <= 0 { if req.PrevLogIndex <= 0 {
req.prevLogTerm = LOG_INVALID_TERM req.PrevLogTerm = LOG_INVALID_TERM
} else { } else {
req.prevLogTerm = rft.LogArray[rft.nextIndex[server.Id]-1].Term req.PrevLogTerm = rft.LogArray[rft.nextIndex[server.Id]-1].Term
} }
//appendRPC call //appendRPC call
doAppendRPCCall(server.Hostname, server.LogPort, req) doAppendRPCCall(server.Hostname, server.LogPort, req, rft.Info)
rft.LogL("sent append entries to " + strconv.Itoa(i+1)) rft.LogL("sent append entries to " + strconv.Itoa(server.Id))
} }
/*if !rafts[i+1].isLeader && len(rft.LogArray)-1 >= rft.nextIndex[i] {
req := &AppendRPC{}
req.term = rft.currentTerm
req.leaderId = rft.id
req.leaderCommit = rft.commitIndex
req.entries = rft.LogArray[rft.nextIndex[i]:len(rft.LogArray)]
req.prevLogIndex = rft.nextIndex[i] - 1
if req.prevLogIndex <= 0 {
req.prevLogTerm = LOG_INVALID_TERM
} else {
req.prevLogTerm = rft.LogArray[rft.nextIndex[i]-1].Term
}
//send to other rafts
rafts[i+1].eventCh <- req
rft.LogL("sent append entries to " + strconv.Itoa(i+1))
}*/
time.Sleep(time.Millisecond * 2) time.Sleep(time.Millisecond * 2)
} }
} }
...@@ -724,8 +716,8 @@ func (rft *Raft) leader() int { ...@@ -724,8 +716,8 @@ func (rft *Raft) leader() int {
rft.LogL("became leader") rft.LogL("became leader")
heartbeat := time.NewTimer(time.Millisecond * HEARTBEAT_TIMEOUT) heartbeat := time.NewTimer(time.Millisecond * HEARTBEAT_TIMEOUT)
heartbeatReq := new(AppendRPC) heartbeatReq := new(AppendRPC)
heartbeatReq.entries = []*LogEntryData{} heartbeatReq.Entries = []*LogEntryData{}
heartbeatReq.leaderId = rft.id heartbeatReq.LeaderId = rft.id
rft.currentTerm++ rft.currentTerm++
rft.LogArray = append( rft.LogArray = append(
...@@ -757,7 +749,7 @@ func (rft *Raft) leader() int { ...@@ -757,7 +749,7 @@ func (rft *Raft) leader() int {
for _, server := range rft.clusterConfig.Servers { for _, server := range rft.clusterConfig.Servers {
if server.Id != rft.id { if server.Id != rft.id {
//doRPCCall for hearbeat //doRPCCall for hearbeat
doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq) doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq, rft.Info)
} }
} }
heartbeat.Reset(time.Millisecond * HEARTBEAT_TIMEOUT) heartbeat.Reset(time.Millisecond * HEARTBEAT_TIMEOUT)
...@@ -780,3 +772,7 @@ func (rft *Raft) leader() int { ...@@ -780,3 +772,7 @@ func (rft *Raft) leader() int {
} }
} }
} }
func StartRaft(rft *Raft) {
rft.loop()
}
// server.go
package raft
import (
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
)
var rafts map[int]*Raft
func getLogger(serverId int, toDebug bool) (l *log.Logger) {
if !toDebug {
l = log.New(ioutil.Discard, "INFO: ", log.Ltime|log.Lshortfile)
} else {
logf, _ := os.OpenFile(strconv.Itoa(serverId), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
l = log.New(logf, "INFO: ", log.Ltime|log.Lmicroseconds|log.Lshortfile)
}
l.Println("Initialized server.")
return l
}
func Start(serverId int, toDebug bool) {
eventCh := make(chan RaftEvent)
commitCh := make(chan LogEntry)
monitorVotesCh := make(chan bool)
clusterConfig, _ := NewClusterConfig(5)
rft, _ := NewRaft(clusterConfig, serverId, commitCh, eventCh, monitorVotesCh, true)
if rafts == nil {
rafts = make(map[int]*Raft)
}
rafts[serverId] = rft
fmt.Println(len(rafts))
rft.loop()
}
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"os" "os"
"raft" "raft"
"strconv" "strconv"
"time"
) )
// Logger // Logger
...@@ -20,10 +21,10 @@ var Info *log.Logger ...@@ -20,10 +21,10 @@ var Info *log.Logger
var rft *raft.Raft var rft *raft.Raft
//Receiver for RPC //Receiver for RPC
type AppendEntries struct{} type RaftRPCService struct{}
//Receiver for voting related RPC //Receiver for voting related RPC
type Voting struct{} //type Voting struct{}
//receiver for testing RPC //receiver for testing RPC
//only for testing purpose //only for testing purpose
...@@ -72,7 +73,7 @@ type Reply struct { ...@@ -72,7 +73,7 @@ type Reply struct {
//arguments: pointer to argument struct (has LogEntryData), pointer to reply struct //arguments: pointer to argument struct (has LogEntryData), pointer to reply struct
//returns: error //returns: error
//receiver: pointer to AppendEntries //receiver: pointer to AppendEntries
func (t *AppendEntries) AppendRPC(args *raft.AppendRPC, reply *Reply) error { func (t *RaftRPCService) AppendRPC(args *raft.AppendRPC, reply *Reply) error {
Info.Println("append RPC invoked") Info.Println("append RPC invoked")
rft.AddToEventChannel(args) rft.AddToEventChannel(args)
reply.X = 1 reply.X = 1
...@@ -83,7 +84,7 @@ func (t *AppendEntries) AppendRPC(args *raft.AppendRPC, reply *Reply) error { ...@@ -83,7 +84,7 @@ func (t *AppendEntries) AppendRPC(args *raft.AppendRPC, reply *Reply) error {
return nil*/ return nil*/
} }
func (t *AppendEntries) AppendReplyRPC(args *raft.AppendReplyRPC, reply *Reply) error { func (t *RaftRPCService) AppendReplyRPC(args *raft.AppendReply, reply *Reply) error {
Info.Println("append reply to leader RPC invoked") Info.Println("append reply to leader RPC invoked")
rft.AddToEventChannel(args) rft.AddToEventChannel(args)
reply.X = 1 reply.X = 1
...@@ -98,7 +99,7 @@ func (t *AppendEntries) AppendReplyRPC(args *raft.AppendReplyRPC, reply *Reply) ...@@ -98,7 +99,7 @@ func (t *AppendEntries) AppendReplyRPC(args *raft.AppendReplyRPC, reply *Reply)
//arguments: pointer to argument struct (has LogEntry), pointer to reply struct //arguments: pointer to argument struct (has LogEntry), pointer to reply struct
//returns: error //returns: error
//receiver: pointer to AppendEntries //receiver: pointer to AppendEntries
func (t *AppendEntries) CommitRPC(args *raft.CommitData, reply *Reply) error { func (t *RaftRPCService) CommitRPC(args *raft.CommitData, reply *Reply) error {
Info.Println("Commit RPC invoked") Info.Println("Commit RPC invoked")
rft.LogArray[(*args).Id].SetCommitted(true) rft.LogArray[(*args).Id].SetCommitted(true)
rft.AddToChannel(rft.LogArray[(*args).Id]) rft.AddToChannel(rft.LogArray[(*args).Id])
...@@ -106,29 +107,30 @@ func (t *AppendEntries) CommitRPC(args *raft.CommitData, reply *Reply) error { ...@@ -106,29 +107,30 @@ func (t *AppendEntries) CommitRPC(args *raft.CommitData, reply *Reply) error {
return nil return nil
} }
func (t *Voting) VoteRequestRPC(args *raft.VoteRequest, reply *Reply) { func (t *RaftRPCService) VoteRequestRPC(args *raft.VoteRequest, reply *Reply) error {
Info.Println("Request Vote RPC received from server", id) Info.Println("Request Vote RPC received")
rft.AddToEventChannel(args) rft.AddToEventChannel(args)
reply.X = 1 reply.X = 1
return nil return nil
} }
func (t *Voting) CastVoteRPC(args *raft.VoteRequestReply, reply *Reply) { func (t *RaftRPCService) CastVoteRPC(args *raft.VoteRequestReply, reply *Reply) error {
Info.Println("Request Vote RPC received from server", id) Info.Println("Cast Vote RPC received")
rft.AddToMonitorVotesChannel(args) rft.AddToMonitorVotesChannel(args)
reply.X = 1 reply.X = 1
return nil return nil
} }
//Initialize all the things necessary for start the server for inter raft communication. //Initialize all the things necessary for start the server for inter raft communication.
//The servers are running on ports 20000+serverId. {1..5} //The servers are running on ports 20000+serverId. {0..4}
//arguments: pointer to current server config, pointer to raft object, a bool channel to set to true to let //arguments: pointer to current server config, pointer to raft object, a bool channel to set to true to let
//the invoker know that the proc ended. //the invoker know that the proc ended.
//returns: none //returns: none
//receiver: none //receiver: none
func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) { func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) {
appendRpc := new(AppendEntries) raftRPC := new(RaftRPCService)
rpc.Register(appendRpc) rpc.Register(raftRPC)
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.LogPort)) listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.LogPort))
if e != nil { if e != nil {
Info.Fatal("listen error:", e) Info.Fatal("listen error:", e)
...@@ -152,9 +154,9 @@ func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch ...@@ -152,9 +154,9 @@ func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch
func initLogger(serverId int, toDebug bool) { func initLogger(serverId int, toDebug bool) {
// Logger Initializaion // Logger Initializaion
if !toDebug { if !toDebug {
Info = log.New(ioutil.Discard, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile) Info = log.New(ioutil.Discard, "INFO: ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
} else { } else {
Info = log.New(os.Stdout, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile) Info = log.New(os.Stdout, "INFO: ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
} }
Info.Println("Initialized server.") Info.Println("Initialized server.")
...@@ -184,12 +186,9 @@ func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan ...@@ -184,12 +186,9 @@ func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan
//Entry point for application. Starts all major server go routines and then waits for ever //Entry point for application. Starts all major server go routines and then waits for ever
func main() { func main() {
sid, err := strconv.Atoi(os.Args[1]) sid, _ := strconv.Atoi(os.Args[1])
ch1 := make(chan bool) ch1 := make(chan bool)
ch2 := make(chan bool) ch2 := make(chan bool)
if err != nil {
Info.Println("argument ", os.Args[1], "is not string")
}
if len(os.Args) > 3 { if len(os.Args) > 3 {
initLogger(sid, true) initLogger(sid, true)
...@@ -214,6 +213,9 @@ func main() { ...@@ -214,6 +213,9 @@ func main() {
go initClientCommunication(server, rft, ch1) go initClientCommunication(server, rft, ch1)
go initInterServerCommunication(server, rft, ch2) go initInterServerCommunication(server, rft, ch2)
time.Sleep(100 * time.Millisecond)
raft.StartRaft(rft)
for <-ch1 && <-ch2 { for <-ch1 && <-ch2 {
} }
......
...@@ -3,13 +3,9 @@ ...@@ -3,13 +3,9 @@
package main package main
import ( import (
"bytes"
//"fmt" //"fmt"
"net"
"net/rpc"
"os" "os"
"os/exec" "os/exec"
"raft"
"strconv" "strconv"
"testing" "testing"
"time" "time"
...@@ -27,16 +23,21 @@ type Testpair struct { ...@@ -27,16 +23,21 @@ type Testpair struct {
// //
func TestAll(t *testing.T) { func TestAll(t *testing.T) {
dummy := make(chan bool)
//start the servers //start the servers
for i := 1; i <= NUM_SERVERS; i++ { for i := 0; i < NUM_SERVERS; i++ {
go startServers(i, t) go startServers(i, t, dummy)
} }
//wait for some time so that servers are ready //wait for some time so that servers are ready
time.Sleep(4 * time.Second)
time.Sleep(1 * time.Second)
if <-dummy {
}
} }
//run servers //run servers
func startServers(i int, t *testing.T) { func startServers(i int, t *testing.T, dummy chan bool) {
cmd := exec.Command("go", "run", "server.go", strconv.Itoa(i), strconv.Itoa(NUM_SERVERS), "x") cmd := exec.Command("go", "run", "server.go", strconv.Itoa(i), strconv.Itoa(NUM_SERVERS), "x")
f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment