Commit 9bbecccb authored by Sushant Mahajan's avatar Sushant Mahajan

moved server.go in the raft package

parent faa29612
...@@ -52,7 +52,7 @@ type ClusterConfig struct { ...@@ -52,7 +52,7 @@ type ClusterConfig struct {
} }
type ClientAppend struct{ type ClientAppend struct{
logEntry LogEntry
} }
type VoteRequest struct{ type VoteRequest struct{
...@@ -316,17 +316,17 @@ func (raft *Raft) loop() { ...@@ -316,17 +316,17 @@ func (raft *Raft) loop() {
func (raft *Raft) follower() { func (raft *Raft) follower() {
//start candidate timeout //start candidate timeout
canTimeout = time.After((randGen.Intn(MAX_TIMEOUT) + MIN_TIMEOUT) % MAX_TIMEOUT) isCandidateChan = time.After((rand.Intn(MAX_TIMEOUT) + MIN_TIMEOUT) % MAX_TIMEOUT)
for { for {
//wrap in select //wrap in select
event := <- raft.eventCh event := <- raft.eventCh
switch event.(type) { switch event.(type) {
case ClientAppend: case *ClientAppend:
// Do not handle clients in follower mode. Send it back up the // Do not handle clients in follower mode. Send it back up the
// pipe with committed = false // pipe with committed = false
ev.logEntry.commited = false event.(*LogEntry).SetCommitted(false)
commitCh <- ev.logentry raft.commitCh <- event.(*LogEntry)
case VoteRequest: case *VoteRequest:
msg = event.msg msg = event.msg
if msg.term < currentterm, respond with if msg.term < currentterm, respond with
if msg.term > currentterm, upgrade currentterm if msg.term > currentterm, upgrade currentterm
...@@ -334,7 +334,7 @@ func (raft *Raft) follower() { ...@@ -334,7 +334,7 @@ func (raft *Raft) follower() {
reset timer reset timer
reply ok to event.msg.serverid reply ok to event.msg.serverid
remember term, leader id (either in log or in separate file) remember term, leader id (either in log or in separate file)
case AppendRPC: case *AppendRPC:
reset timer reset timer
if msg.term < currentterm, ignore if msg.term < currentterm, ignore
reset heartbeat timer reset heartbeat timer
...@@ -345,6 +345,6 @@ func (raft *Raft) follower() { ...@@ -345,6 +345,6 @@ func (raft *Raft) follower() {
respond ok to event.msg.serverid respond ok to event.msg.serverid
else else
respond err. respond err.
case Timeout : return candidate // new state back to loop() case *Timeout : return candidate // new state back to loop()
} }
} }
\ No newline at end of file
...@@ -37,8 +37,7 @@ func initLogger(serverId int, toDebug bool) { ...@@ -37,8 +37,7 @@ func initLogger(serverId int, toDebug bool) {
func main() { func main() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
sid, err := strconv.Atoi(os.Args[1]) sid, err := strconv.Atoi(os.Args[1])
ch1 := make(chan bool)
ch2 := make(chan bool)
if err != nil { if err != nil {
Info.Println("argument ", os.Args[1], "is not string") Info.Println("argument ", os.Args[1], "is not string")
} }
...@@ -60,13 +59,4 @@ func main() { ...@@ -60,13 +59,4 @@ func main() {
commitCh := make(chan raft.LogEntry) commitCh := make(chan raft.LogEntry)
rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info) rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info)
raft.InitKVStore(Info, sid)
go raft.MonitorCommitChannel(commitCh) //for kvstore
go initClientCommunication(server, rft, ch1)
go initInterServerCommunication(server, rft, ch2)
for <-ch1 && <-ch2 {
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment