Commit faa29612 authored by Sushant Mahajan's avatar Sushant Mahajan

added preliminery code for follower loop

parent 86628649
...@@ -2,6 +2,7 @@ package raft ...@@ -2,6 +2,7 @@ package raft
import ( import (
"log" "log"
"math/rand"
"net" "net"
"net/rpc" "net/rpc"
"strconv" "strconv"
...@@ -13,7 +14,12 @@ import ( ...@@ -13,7 +14,12 @@ import (
const ( const (
CLIENT_PORT = 9000 CLIENT_PORT = 9000
LOG_PORT = 20000 LOG_PORT = 20000
ACK_TIMEOUT = 5 ACK_TIMEOUT = 5
MIN_TIMEOUT = 300
MAX_TIMEOUT = 500
LEADER = iota
CANDIDATE = iota
FOLLOWER = iota
) )
// Logger // Logger
...@@ -45,6 +51,26 @@ type ClusterConfig struct { ...@@ -45,6 +51,26 @@ type ClusterConfig struct {
Servers []ServerConfig // All servers in this cluster Servers []ServerConfig // All servers in this cluster
} }
type ClientAppend struct{
}
type VoteRequest struct{
}
type AppendRPC struct{
}
type Timeout struct{
}
type RaftEvent interface{
}
type SharedLog interface { type SharedLog interface {
Append(data []byte, conn net.Conn) (LogEntry, error) Append(data []byte, conn net.Conn) (LogEntry, error)
AddToChannel(entry LogEntry) AddToChannel(entry LogEntry)
...@@ -57,6 +83,7 @@ type Raft struct { ...@@ -57,6 +83,7 @@ type Raft struct {
clusterConfig *ClusterConfig // Cluster clusterConfig *ClusterConfig // Cluster
id int // Server id id int // Server id
sync.RWMutex sync.RWMutex
eventCh chan RaftEvent //receive events related to various states
} }
// Log entry interface // Log entry interface
...@@ -91,13 +118,14 @@ type AppendEntries struct{} ...@@ -91,13 +118,14 @@ type AppendEntries struct{}
// commitCh is the channel that the kvstore waits on for committed messages. // commitCh is the channel that the kvstore waits on for committed messages.
// When the process starts, the local disk log is read and all committed // When the process starts, the local disk log is read and all committed
// entries are recovered and replayed // entries are recovered and replayed
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, logger *log.Logger) (*Raft, error) { func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, eventCh, chan RaftEvent, logger *log.Logger) (*Raft, error) {
rft := new(Raft) rft := new(Raft)
rft.commitCh = commitCh rft.commitCh = commitCh
rft.clusterConfig = config rft.clusterConfig = config
rft.id = thisServerId rft.id = thisServerId
Info = logger Info = logger
lsn = 0 lsn = 0
rft.eventCh = eventCh
return rft, nil return rft, nil
} }
...@@ -268,3 +296,55 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) { ...@@ -268,3 +296,55 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) {
func (e ErrRedirect) Error() string { func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(0) return "Redirect to server " + strconv.Itoa(0)
} }
//entry loop to raft
func (raft *Raft) loop() {
state := FOLLOWER
for {
switch state {
case FOLLOWER:
state = follower()
case CANDIDATE:
state = candidate()
case LEADER:
state = leader()
default:
return
}
}
}
func (raft *Raft) follower() {
//start candidate timeout
canTimeout = time.After((randGen.Intn(MAX_TIMEOUT) + MIN_TIMEOUT) % MAX_TIMEOUT)
for {
//wrap in select
event := <- raft.eventCh
switch event.(type) {
case ClientAppend:
// Do not handle clients in follower mode. Send it back up the
// pipe with committed = false
ev.logEntry.commited = false
commitCh <- ev.logentry
case VoteRequest:
msg = event.msg
if msg.term < currentterm, respond with
if msg.term > currentterm, upgrade currentterm
if not already voted in my term
reset timer
reply ok to event.msg.serverid
remember term, leader id (either in log or in separate file)
case AppendRPC:
reset timer
if msg.term < currentterm, ignore
reset heartbeat timer
upgrade to event.msg.term if necessary
if prev entries of my log and event.msg match
add to disk log
flush disk log
respond ok to event.msg.serverid
else
respond err.
case Timeout : return candidate // new state back to loop()
}
}
\ No newline at end of file
...@@ -4,9 +4,11 @@ package main ...@@ -4,9 +4,11 @@ package main
import ( import (
"io/ioutil" "io/ioutil"
"log" "log"
"math/rand"
"os" "os"
"raft" "raft"
"strconv" "strconv"
"time"
) )
// Logger // Logger
...@@ -33,6 +35,7 @@ func initLogger(serverId int, toDebug bool) { ...@@ -33,6 +35,7 @@ func initLogger(serverId int, toDebug bool) {
//Entry point for application. Starts all major server go routines and then waits for ever //Entry point for application. Starts all major server go routines and then waits for ever
func main() { func main() {
rand.Seed(time.Now().UnixNano())
sid, err := strconv.Atoi(os.Args[1]) sid, err := strconv.Atoi(os.Args[1])
ch1 := make(chan bool) ch1 := make(chan bool)
ch2 := make(chan bool) ch2 := make(chan bool)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment