Commit d0235343 authored by Sushant Mahajan's avatar Sushant Mahajan

complete append code

parent 567b3b54
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
//constant values used //constant values used
const ( const (
CLIENT_PORT = 9000 CLIENT_PORT = 9000
ACK_TIMEOUT = 5
) )
type Lsn uint64 //Log sequence number, unique for all time. type Lsn uint64 //Log sequence number, unique for all time.
...@@ -75,26 +76,44 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (* ...@@ -75,26 +76,44 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*
} }
//goroutine that monitors channel for commiting log entry //goroutine that monitors channel for commiting log entry
func monitor_commitCh(raft *Raft, c <-chan LogEntry) { //unidirectional -- can only read from the channel func monitorCommitChannel(raft *Raft) { //unidirectional -- can only read from the channel
for { for {
//var temp LogEntry //var temp LogEntry
temp := <-c //receive from the channel temp := <-raft.commitCh //receive from the channel
raft.log_array[temp.(*LogEntryData).id].committed = true //commit the value raft.log_array[temp.(*LogEntryData).id].committed = true //commit the value
//update the kv store here //update the kv store here
} }
} }
//goroutine that monitors channel to check if the majority of servers have replied //goroutine that monitors channel to check if the majority of servers have replied
func monitor_ackCh(rft *Raft, ack_ch <-chan int, log_entry LogEntry) { func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majEventCh chan int) {
acks_received := 0 acks_received := 0
num_servers := len(rft.cluster_config.Servers) num_servers := len(rft.cluster_config.Servers)
required_acks := num_servers / 2 required_acks := num_servers / 2
up := make(chan bool, 1)
err := false
go func() {
time.Sleep(ACK_TIMEOUT * time.Second)
up <- true
}()
for { for {
temp := <-ack_ch select {
case temp := <-ack_ch:
acks_received += temp acks_received += temp
if acks_received == required_acks { if acks_received == required_acks {
rft.commitCh <- log_entry rft.commitCh <- log_entry
majEventCh <- 1
err = true
break
}
case <-up:
err = true
break
}
if err {
break break
} }
} }
...@@ -125,7 +144,9 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) { ...@@ -125,7 +144,9 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) {
raft.log_array = append(raft.log_array, temp) raft.log_array = append(raft.log_array, temp)
ackChan := make(chan int) ackChan := make(chan int)
go monitor_ackCh(raft, ackChan, temp) majEventCh := make(chan int)
go monitorAckChannel(raft, ackChan, temp, majEventCh)
go monitorCommitChannel(raft)
for _, server := range cluster_config.Servers[1:] { for _, server := range cluster_config.Servers[1:] {
go func(ackChan chan int) { go func(ackChan chan int) {
...@@ -140,9 +161,10 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) { ...@@ -140,9 +161,10 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) {
ackChan <- reply.X ackChan <- reply.X
}(ackChan) }(ackChan)
} }
//wait for acks //channel will return 1 if majority
//send commit on channel if <-majEventCh == 1 {
raft.commitCh <- temp raft.commitCh <- temp
}
return temp, nil return temp, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment