Commit ab7ec028 authored by Sushant Mahajan's avatar Sushant Mahajan

added partial leader code

parent 90c89103
......@@ -16,8 +16,9 @@ const (
CLIENT_PORT = 9000
LOG_PORT = 20000
ACK_TIMEOUT = 5
MIN_TIMEOUT = 300
MAX_TIMEOUT = 500
MIN_TIMEOUT_ELEC = 300
MAX_TIMEOUT_ELEC = 500
HEARTBEAT_TIMEOUT = 100
LEADER = 10
CANDIDATE = 20
FOLLOWER = 30
......@@ -283,7 +284,7 @@ func (rft *Raft) loop() {
func getRandTime(log *log.Logger) time.Duration {
rand.Seed(time.Now().UnixNano())
t := time.Millisecond * time.Duration(rand.Intn(MAX_TIMEOUT-MIN_TIMEOUT)+MIN_TIMEOUT)
t := time.Millisecond * time.Duration(rand.Intn(MAX_TIMEOUT_ELEC-MIN_TIMEOUT_ELEC)+MIN_TIMEOUT_ELEC)
log.Println("New rand time", t)
return t
}
......@@ -292,7 +293,8 @@ func (rft *Raft) grantVote(reply bool, currentTerm int) {
if reply {
rft.voters++
rft.Info.Println(rft.id, "got vote")
if rft.voters >= len(rafts)/2+1 {
if rft.voters > len(rafts)/2 {
rft.LogC("got majority")
rft.monitorVotesCh <- true
}
}
......@@ -328,7 +330,7 @@ func (rft *Raft) follower() int {
//wrap in select
select {
case <-rft.et.C:
rft.LogF("follower election timeout")
rft.LogF("election timeout")
return CANDIDATE
case event := <-rft.eventCh:
switch event.(type) {
......@@ -369,6 +371,11 @@ func (rft *Raft) follower() int {
rft.et.Reset(getRandTime(rft.Info))
rft.LogF("reset timer on appendRPC")
req := event.(*AppendRPC)
if len(req.entries) == 0 { //heartbeat
rft.LogF("got hearbeat from " + strconv.Itoa(req.leaderId))
continue
}
reply := true
if req.term < rft.currentTerm {
reply = false
......@@ -454,6 +461,7 @@ func (rft *Raft) candidate() int {
switch event.(type) {
case (*AppendRPC):
rft.LogC("C to F")
rft.et.Reset(getRandTime(rft.Info))
return FOLLOWER
}
}
......@@ -461,9 +469,32 @@ func (rft *Raft) candidate() int {
}
func (rft *Raft) leader() int {
rft.LogL("became leader")
heartbeat := time.NewTimer(time.Millisecond * HEARTBEAT_TIMEOUT)
heartbeatReq := new(AppendRPC)
heartbeatReq.entries = []*LogEntryData{}
heartbeatReq.leaderId = rft.id
for {
select {
case <-rft.commitCh:
rft.LogL("hello")
case <-heartbeat.C:
for k, v := range rafts {
if k != rft.id {
rft.LogL("sending AppendRPC " + strconv.Itoa(k))
v.eventCh <- heartbeatReq
}
}
heartbeat.Reset(time.Millisecond * HEARTBEAT_TIMEOUT)
case event := <-rft.eventCh:
switch event.(type) {
case *ClientAppend:
case *AppendRPC:
case *VoteRequest:
}
}
}
return LEADER
}
......@@ -16,7 +16,7 @@ func getLogger(serverId int, toDebug bool) (l *log.Logger) {
l = log.New(ioutil.Discard, "INFO: ", log.Ltime|log.Lshortfile)
} else {
logf, _ := os.OpenFile(strconv.Itoa(serverId), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
l = log.New(logf, "INFO: ", log.Ltime|log.Lshortfile)
l = log.New(logf, "INFO: ", log.Ltime|log.Lmicroseconds|log.Lshortfile)
}
l.Println("Initialized server.")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment