Commit 92709071 authored by Harshit Pande's avatar Harshit Pande

raft constructor

parents 10833484 79fb8fc8
...@@ -3,8 +3,8 @@ package main ...@@ -3,8 +3,8 @@ package main
import ( import (
//"log" //"log"
"fmt" "fmt"
"net" //"net"
"net/rpc" //"net/rpc"
"os" "os"
"reflect" "reflect"
"strconv" "strconv"
...@@ -48,15 +48,15 @@ type SharedLog interface { ...@@ -48,15 +48,15 @@ type SharedLog interface {
} }
type Raft struct { type Raft struct {
log_array []LogEntryData log_array []*LogEntryData
commitCh chan *LogEntryData commitCh chan LogEntry
cluster_config *ClusterConfig //cluster cluster_config *ClusterConfig //cluster
id uint64 //this server id int //this server id
} }
var cluster_config *ClusterConfig var cluster_config *ClusterConfig
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan *LogEntry) (*Raft, error) { func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*Raft, error) {
rft := new(Raft) rft := new(Raft)
rft.commitCh = commitCh rft.commitCh = commitCh
rft.cluster_config = config rft.cluster_config = config
...@@ -65,12 +65,11 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan *LogEntry) ( ...@@ -65,12 +65,11 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan *LogEntry) (
} }
//goroutine that monitors channel for commiting log entry //goroutine that monitors channel for commiting log entry
func monitor_commitCh(c <-chan *LogEntryData) { //unidirectional -- can only read from the channel func monitor_commitCh(c <-chan LogEntry) { //unidirectional -- can only read from the channel
for { for {
var temp *LogEntryData //var temp LogEntry
temp = <-c //receive from the channel temp := <-c //receive from the channel
temp.committed = true temp.(*LogEntryData).committed = true
//now update key value store here //now update key value store here
} }
} }
...@@ -90,15 +89,20 @@ func (entry *LogEntryData) Committed() bool { ...@@ -90,15 +89,20 @@ func (entry *LogEntryData) Committed() bool {
//make raft implement the append function //make raft implement the append function
func (raft *Raft) Append(data []byte) (LogEntry, error) { func (raft *Raft) Append(data []byte) (LogEntry, error) {
if raft.server.Id != 0 { if raft.id != 0 {
return nil, raft.Id return nil, ErrRedirect(0)
} }
temp := LogEntry{1, data, false} temp := new(LogEntryData)
temp.id = 1
temp.committed = false
temp.data = data
raft.log_array = append(raft.log_array, temp) raft.log_array = append(raft.log_array, temp)
//broadcast to other servers //broadcast to other servers
//wait for acks //wait for acks
//send commit on channel //send commit on channel
raft.commitCh <- temp raft.commitCh <- temp
return temp, nil
} }
type RPChandle struct { type RPChandle struct {
...@@ -132,11 +136,11 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) { ...@@ -132,11 +136,11 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) {
} }
func (e ErrRedirect) Error() string { func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(cluster_config.Servers[0].Id) return "Redirect to server " + strconv.Itoa(0)
} }
func start_rpc(this_server *ServerConfig) { func start_rpc(this_server *ServerConfig) {
rpc.Register() //rpc.Register()
} }
func main() { func main() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment