Commit 8051e299 authored by Sushant Mahajan's avatar Sushant Mahajan

added rpc calls to followers

parent a212f611
......@@ -58,6 +58,10 @@ type Args struct {
X int
}
type Reply struct {
X int
}
type AppendEntries struct{}
var cluster_config *ClusterConfig
......@@ -95,8 +99,8 @@ func (entry *LogEntryData) Committed() bool {
//make raft implement the append function
func (raft *Raft) Append(data []byte) (LogEntry, error) {
if raft.id != 0 {
return nil, ErrRedirect(0)
if raft.id != 1 {
return nil, ErrRedirect(1)
}
temp := new(LogEntryData)
temp.id = 1
......@@ -104,7 +108,20 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) {
temp.data = data
raft.log_array = append(raft.log_array, temp)
//broadcast to other servers
ackChan := make(chan int)
for _, server := range cluster_config.Servers[1:] {
go func(ackChan chan int) {
client, err := rpc.Dial("tcp", server.Hostname+":"+strconv.Itoa(server.LogPort))
if err != nil {
log.Fatal("Dialing:", err)
}
reply := new(Reply)
args := &Args{7}
appendCall := client.Go("AppendEntries.AppendEntriesRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
ackChan <- reply.X
}(ackChan)
}
//wait for acks
//send commit on channel
raft.commitCh <- temp
......@@ -141,8 +158,8 @@ func start_rpc(this_server *ServerConfig) {
//rpc.Register()
}
func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *int) error {
*reply = args.X
func (t *AppendEntries) AppendEntriesRPC(args *Args, reply *Reply) error {
reply.X = args.X
return nil
}
......@@ -164,6 +181,7 @@ func initializeInterServerCommunication(this_server *ServerConfig) {
}
func main() {
log.Println("Start")
server_id, err := strconv.Atoi(os.Args[1])
if err != nil {
fmt.Println("argument ", os.Args[1], "is not string")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment