Commit 567b3b54 authored by Sushant Mahajan's avatar Sushant Mahajan

added code to commit acked entry

parent a40a6241
......@@ -75,12 +75,12 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*
}
//goroutine that monitors channel for commiting log entry
func monitor_commitCh(c <-chan LogEntry) { //unidirectional -- can only read from the channel
func monitor_commitCh(raft *Raft, c <-chan LogEntry) { //unidirectional -- can only read from the channel
for {
//var temp LogEntry
temp := <-c //receive from the channel
temp.(*LogEntryData).committed = true
//now update key value store here
temp := <-c //receive from the channel
raft.log_array[temp.(*LogEntryData).id].committed = true //commit the value
//update the kv store here
}
}
......@@ -89,9 +89,6 @@ func monitor_ackCh(rft *Raft, ack_ch <-chan int, log_entry LogEntry) {
acks_received := 0
num_servers := len(rft.cluster_config.Servers)
required_acks := num_servers / 2
if num_servers%2 == 0 {
required_acks++
}
for {
temp := <-ack_ch
......@@ -128,6 +125,8 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) {
raft.log_array = append(raft.log_array, temp)
ackChan := make(chan int)
go monitor_ackCh(raft, ackChan, temp)
for _, server := range cluster_config.Servers[1:] {
go func(ackChan chan int) {
client, err := rpc.Dial("tcp", server.Hostname+":"+strconv.Itoa(server.LogPort))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment