Commit 10833484 authored by Harshit Pande's avatar Harshit Pande

raft constructor

parent a8826caf
...@@ -2,9 +2,9 @@ package main ...@@ -2,9 +2,9 @@ package main
import ( import (
//"log" //"log"
//"net"
//"net/rpc"
"fmt" "fmt"
"net"
"net/rpc"
"os" "os"
"reflect" "reflect"
"strconv" "strconv"
...@@ -48,15 +48,22 @@ type SharedLog interface { ...@@ -48,15 +48,22 @@ type SharedLog interface {
} }
type Raft struct { type Raft struct {
//some good stuff needs to go here log_array []LogEntryData
commitCh chan *LogEntryData commitCh chan *LogEntryData
cluster *ClusterConfig //cluster cluster_config *ClusterConfig //cluster
id uint64 //leader id uint64 //this server
commitCh chan LogEntry
} }
var cluster_config *ClusterConfig var cluster_config *ClusterConfig
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan *LogEntry) (*Raft, error) {
rft := new(Raft)
rft.commitCh = commitCh
rft.cluster_config = config
rft.id = thisServerId
return rft, nil
}
//goroutine that monitors channel for commiting log entry //goroutine that monitors channel for commiting log entry
func monitor_commitCh(c <-chan *LogEntryData) { //unidirectional -- can only read from the channel func monitor_commitCh(c <-chan *LogEntryData) { //unidirectional -- can only read from the channel
for { for {
...@@ -87,13 +94,20 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) { ...@@ -87,13 +94,20 @@ func (raft *Raft) Append(data []byte) (LogEntry, error) {
return nil, raft.Id return nil, raft.Id
} }
temp := LogEntry{1, data, false} temp := LogEntry{1, data, false}
raft.log_array = append(raft.log_array, temp)
//broadcast to other servers //broadcast to other servers
//wait for acks //wait for acks
//send commit on channel //send commit on channel
raft.commitCh <- temp raft.commitCh <- temp
} }
func AppendEntriesRPC() type RPChandle struct {
}
func (r *RPChandle) AppendEntriesRPC(log_entry LogEntryData) bool {
return true
}
func NewServerConfig(server_id int) (*ServerConfig, error) { func NewServerConfig(server_id int) (*ServerConfig, error) {
server := new(ServerConfig) server := new(ServerConfig)
...@@ -121,6 +135,10 @@ func (e ErrRedirect) Error() string { ...@@ -121,6 +135,10 @@ func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(cluster_config.Servers[0].Id) return "Redirect to server " + strconv.Itoa(cluster_config.Servers[0].Id)
} }
func start_rpc(this_server *ServerConfig) {
rpc.Register()
}
func main() { func main() {
server_id, err := strconv.Atoi(os.Args[1]) server_id, err := strconv.Atoi(os.Args[1])
if err != nil { if err != nil {
...@@ -136,4 +154,9 @@ func main() { ...@@ -136,4 +154,9 @@ func main() {
fmt.Println(reflect.TypeOf(this_server)) fmt.Println(reflect.TypeOf(this_server))
fmt.Println(reflect.TypeOf(cluster_config)) fmt.Println(reflect.TypeOf(cluster_config))
go start_rpc(this_server)
var dummy_input string
fmt.Scanln(&dummy_input)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment