Commit b045968a authored by Sushant Mahajan's avatar Sushant Mahajan

partial implementation of ClientAppend of leader

parent ab7ec028
...@@ -58,7 +58,7 @@ type ClusterConfig struct { ...@@ -58,7 +58,7 @@ type ClusterConfig struct {
} }
type ClientAppend struct { type ClientAppend struct {
logEntry LogEntry logEntry *LogEntry
} }
type VoteRequest struct { type VoteRequest struct {
...@@ -103,6 +103,10 @@ type Raft struct { ...@@ -103,6 +103,10 @@ type Raft struct {
voters int voters int
monitorVotesCh chan bool monitorVotesCh chan bool
et *time.Timer et *time.Timer
isLeader bool
lastApplied int
nextIndex []int
matchIndex []int
} }
// Log entry interface // Log entry interface
...@@ -184,6 +188,9 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, ev ...@@ -184,6 +188,9 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, ev
} }
rft.monitorVotesCh = monitorVotesCh rft.monitorVotesCh = monitorVotesCh
getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file. getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file.
rft.isLeader = false
rft.nextIndex = make([]int, len(config.Servers))
rft.matchIndex = make([]int, len(config.Servers))
return rft, nil return rft, nil
} }
...@@ -293,9 +300,10 @@ func (rft *Raft) grantVote(reply bool, currentTerm int) { ...@@ -293,9 +300,10 @@ func (rft *Raft) grantVote(reply bool, currentTerm int) {
if reply { if reply {
rft.voters++ rft.voters++
rft.Info.Println(rft.id, "got vote") rft.Info.Println(rft.id, "got vote")
if rft.voters > len(rafts)/2 { if !rft.isLeader && rft.voters > len(rafts)/2 {
rft.LogC("got majority") rft.LogC("got majority")
rft.monitorVotesCh <- true rft.monitorVotesCh <- true
rft.isLeader = true
} }
} }
} }
...@@ -474,6 +482,11 @@ func (rft *Raft) leader() int { ...@@ -474,6 +482,11 @@ func (rft *Raft) leader() int {
heartbeatReq := new(AppendRPC) heartbeatReq := new(AppendRPC)
heartbeatReq.entries = []*LogEntryData{} heartbeatReq.entries = []*LogEntryData{}
heartbeatReq.leaderId = rft.id heartbeatReq.leaderId = rft.id
//reintialize nextIndex and matchIndex
for i := 0; i < len(rft.nextIndex); i++ {
rft.nextIndex[i] = len(rft.LogArray)
rft.matchIndex[i] = 0
}
for { for {
select { select {
...@@ -489,6 +502,12 @@ func (rft *Raft) leader() int { ...@@ -489,6 +502,12 @@ func (rft *Raft) leader() int {
case event := <-rft.eventCh: case event := <-rft.eventCh:
switch event.(type) { switch event.(type) {
case *ClientAppend: case *ClientAppend:
//write data to log
rft.LogL("got client data")
entry := event.(*ClientAppend).logEntry
rft.LogArray = append(rft.LogArray, entry)
//todo:apply to state machine
//todo:respond to client
case *AppendRPC: case *AppendRPC:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment