Commit 9e2876e7 authored by Bharath Radhakrishnan's avatar Bharath Radhakrishnan

Replication achieved

parent cbafd0dc
...@@ -216,7 +216,7 @@ func MonitorCommitChannel(ch chan LogEntry) { ...@@ -216,7 +216,7 @@ func MonitorCommitChannel(ch chan LogEntry) {
log.Fatal("Error decoding command!", err) log.Fatal("Error decoding command!", err)
} }
ParseInput(conn, cmd) ParseInput(conn, cmd)
//Debug() Debug()
} }
} }
......
...@@ -22,6 +22,9 @@ var Info *log.Logger ...@@ -22,6 +22,9 @@ var Info *log.Logger
// Global variable for generating unique log sequence numbers // Global variable for generating unique log sequence numbers
var lsn Lsn var lsn Lsn
// Flag for enabling/disabling logging functionality
var DEBUG = true
// See Log.Append. Implements Error interface. // See Log.Append. Implements Error interface.
type ErrRedirect int type ErrRedirect int
...@@ -73,7 +76,7 @@ type LogEntryData struct { ...@@ -73,7 +76,7 @@ type LogEntryData struct {
// Structure for calling commit RPC // Structure for calling commit RPC
type CommitData struct { type CommitData struct {
Id uint64 Id Lsn
} }
// Structure used for replying to the RPC calls // Structure used for replying to the RPC calls
...@@ -145,7 +148,7 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c ...@@ -145,7 +148,7 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
temp := new(CommitData) temp := new(CommitData)
temp.Id = log_entry.(*LogEntryData).Id temp.Id = log_entry.(*LogEntryData).Id
for _, server := range rft.clusterConfig.Servers[1:] { for _, server := range rft.clusterConfig.Servers[1:] {
doCommitRPCCall(ackChan, server.Hostname, server.LogPort, temp) go doCommitRPCCall(server.Hostname, server.LogPort, temp)
} }
majCh <- true majCh <- true
...@@ -186,6 +189,7 @@ func (entry *LogEntryData) SetCommitted(committed bool) { ...@@ -186,6 +189,7 @@ func (entry *LogEntryData) SetCommitted(committed bool) {
// Call CommitRPC to inform the followers of newly committed log entry // Call CommitRPC to inform the followers of newly committed log entry
func doCommitRPCCall(hostname string, logPort int, temp *CommitData) { func doCommitRPCCall(hostname string, logPort int, temp *CommitData) {
Info.Println("Commit RPC")
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil { if err != nil {
Info.Fatal("Dialing:", err) Info.Fatal("Dialing:", err)
...@@ -230,7 +234,7 @@ func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) { ...@@ -230,7 +234,7 @@ func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
go monitorAckChannel(rft, ackChan, temp, majChan) go monitorAckChannel(rft, ackChan, temp, majChan)
for _, server := range rft.clusterConfig.Servers[1:] { for _, server := range rft.clusterConfig.Servers[1:] {
doRPCCall(ackChan, server.Hostname, server.LogPort, temp) go doRPCCall(ackChan, server.Hostname, server.LogPort, temp)
} }
if <-majChan { if <-majChan {
...@@ -242,6 +246,7 @@ func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) { ...@@ -242,6 +246,7 @@ func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
//AddToChannel //AddToChannel
func (rft *Raft) AddToChannel(entry LogEntry) { func (rft *Raft) AddToChannel(entry LogEntry) {
Info.Println("Adding to commit", entry)
rft.commitCh <- entry rft.commitCh <- entry
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment