Commit 5da07e4f authored by Bharath Radhakrishnan's avatar Bharath Radhakrishnan

Commit RPC

parent 7a0f540c
...@@ -74,6 +74,11 @@ type LogEntryData struct { ...@@ -74,6 +74,11 @@ type LogEntryData struct {
conn net.Conn // Connection for communicating with client conn net.Conn // Connection for communicating with client
} }
// Structure for calling commit RPC
type CommitData struct {
Id uint64
}
// Structure used for replying to the RPC calls // Structure used for replying to the RPC calls
type Reply struct { type Reply struct {
X int X int
...@@ -139,6 +144,13 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c ...@@ -139,6 +144,13 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
rft.LogArray[log_entry.(*LogEntryData).Id].Committed = true rft.LogArray[log_entry.(*LogEntryData).Id].Committed = true
//Info.Println(rft.LogArray) //Info.Println(rft.LogArray)
rft.commitCh <- log_entry rft.commitCh <- log_entry
temp := new(CommitData)
temp.Id = log_entry.(*LogEntryData).Id
for _, server := range rft.clusterConfig.Servers[1:] {
doCommitRPCCall(ackChan, server.Hostname, server.LogPort, temp)
}
majCh <- true majCh <- true
err = true err = true
break break
...@@ -175,6 +187,20 @@ func (entry *LogEntryData) SetCommitted(committed bool) { ...@@ -175,6 +187,20 @@ func (entry *LogEntryData) SetCommitted(committed bool) {
entry.Committed = committed entry.Committed = committed
} }
// Call CommitRPC to inform the followers of newly committed log entry
func doCommitRPCCall(hostname string, logPort int, temp *CommitData) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("Calling Commit RPC", logPort)
commitCall := client.Go("AppendEntries.CommitRPC", args, reply, nil) //let go allocate done channel
commitCall = <-commitCall.Done
Info.Println("Reply", commitCall, reply.X)
}
//make rpc call to followers //make rpc call to followers
func doRPCCall(ackChan chan int, hostname string, logPort int, temp *LogEntryData) { func doRPCCall(ackChan chan int, hostname string, logPort int, temp *LogEntryData) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
......
...@@ -76,10 +76,10 @@ func (t *AppendEntries) AppendEntriesRPC(args *raft.LogEntryData, reply *Reply) ...@@ -76,10 +76,10 @@ func (t *AppendEntries) AppendEntriesRPC(args *raft.LogEntryData, reply *Reply)
//arguments: pointer to argument struct (has LogEntry), pointer to reply struct //arguments: pointer to argument struct (has LogEntry), pointer to reply struct
//returns: error //returns: error
//receiver: pointer to AppendEntries //receiver: pointer to AppendEntries
func (t *AppendEntries) CommitRPC(args *raft.LogEntryData, reply *Reply) error { func (t *AppendEntries) CommitRPC(args *raft.CommitData, reply *Reply) error {
Info.Println("Commit RPC invoked") Info.Println("Commit RPC invoked")
rft.LogArray[(*args).GetLsn()].SetCommitted(true) rft.LogArray[(*args).Id].SetCommitted(true)
rft.AddToChannel(args) rft.AddToChannel(rft.LogArray[(*args).Id])
reply.X = 1 reply.X = 1
return nil return nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment