Commit 3ec2d21b authored by Bharath Radhakrishnan's avatar Bharath Radhakrishnan

RPC skeleton

parents 71bae140 92709071
...@@ -48,11 +48,32 @@ type SharedLog interface { ...@@ -48,11 +48,32 @@ type SharedLog interface {
} }
type Raft struct { type Raft struct {
//some good stuff needs to go here log_array []*LogEntryData
commitCh chan LogEntry
cluster_config *ClusterConfig //cluster
id int //this server id
} }
var cluster_config *ClusterConfig var cluster_config *ClusterConfig
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*Raft, error) {
rft := new(Raft)
rft.commitCh = commitCh
rft.cluster_config = config
rft.id = thisServerId
return rft, nil
}
//goroutine that monitors channel for commiting log entry
func monitor_commitCh(c <-chan LogEntry) { //unidirectional -- can only read from the channel
for {
//var temp LogEntry
temp := <-c //receive from the channel
temp.(*LogEntryData).committed = true
//now update key value store here
}
}
//make LogEntryData implement the //make LogEntryData implement the
func (entry *LogEntryData) Lsn() Lsn { func (entry *LogEntryData) Lsn() Lsn {
return entry.id return entry.id
...@@ -67,8 +88,30 @@ func (entry *LogEntryData) Committed() bool { ...@@ -67,8 +88,30 @@ func (entry *LogEntryData) Committed() bool {
} }
//make raft implement the append function //make raft implement the append function
//func (raft *Raft) Append(data []byte) (LogEntry, error) { func (raft *Raft) Append(data []byte) (LogEntry, error) {
//} if raft.id != 0 {
return nil, ErrRedirect(0)
}
temp := new(LogEntryData)
temp.id = 1
temp.committed = false
temp.data = data
raft.log_array = append(raft.log_array, temp)
//broadcast to other servers
//wait for acks
//send commit on channel
raft.commitCh <- temp
return temp, nil
}
type RPChandle struct {
}
func (r *RPChandle) AppendEntriesRPC(log_entry LogEntryData) bool {
return true
}
func NewServerConfig(server_id int) (*ServerConfig, error) { func NewServerConfig(server_id int) (*ServerConfig, error) {
server := new(ServerConfig) server := new(ServerConfig)
...@@ -93,7 +136,11 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) { ...@@ -93,7 +136,11 @@ func NewClusterConfig(num_servers int) (*ClusterConfig, error) {
} }
func (e ErrRedirect) Error() string { func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(cluster_config.Servers[0].Id) return "Redirect to server " + strconv.Itoa(0)
}
func start_rpc(this_server *ServerConfig) {
//rpc.Register()
} }
type Args struct { type Args struct {
...@@ -140,4 +187,7 @@ func main() { ...@@ -140,4 +187,7 @@ func main() {
fmt.Println(reflect.TypeOf(this_server)) fmt.Println(reflect.TypeOf(this_server))
fmt.Println(reflect.TypeOf(cluster_config)) fmt.Println(reflect.TypeOf(cluster_config))
initializeInterServerCommunication(this_server) initializeInterServerCommunication(this_server)
var dummy_input string
fmt.Scanln(&dummy_input)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment