Commit 86628649 authored by Sushant Mahajan's avatar Sushant Mahajan

structured the code

parent 900862b0
package raft
import (
"log"
"net"
"net/rpc"
"strconv"
"sync"
"time"
)
//constant values used
const (
CLIENT_PORT = 9000
LOG_PORT = 20000
ACK_TIMEOUT = 5
)
// Logger
var Info *log.Logger
// Global variable for generating unique log sequence numbers
var lsn Lsn
// Flag for enabling/disabling logging functionality
var DEBUG = true
// See Log.Append. Implements Error interface.
type ErrRedirect int
//Log sequence number, unique for all time.
type Lsn uint64
// Stores the server information
type ServerConfig struct {
Id int // Id of server. Must be unique
Hostname string // name or ip of host
ClientPort int // port at which server listens to client messages.
LogPort int // tcp port for inter-replica protocol messages.
}
// Stores the replica information of the cluster
type ClusterConfig struct {
Path string // Directory for persistent log
Servers []ServerConfig // All servers in this cluster
}
type SharedLog interface {
Append(data []byte, conn net.Conn) (LogEntry, error)
AddToChannel(entry LogEntry)
}
// Raft information
type Raft struct {
LogArray []*LogEntryData // In memory store for log entries
commitCh chan LogEntry // Commit Channel
clusterConfig *ClusterConfig // Cluster
id int // Server id
sync.RWMutex
}
// Log entry interface
type LogEntry interface {
GetLsn() Lsn // Returns Lsn
GetData() []byte // Returns Data
GetCommitted() bool // Returns committed status
SetCommitted(status bool) // Sets committed status
}
type LogEntryData struct {
Id Lsn // Unique identifier for log entry
Data []byte // Data bytes
Committed bool // Commit status
conn net.Conn // Connection for communicating with client
}
// Structure for calling commit RPC
type CommitData struct {
Id Lsn
}
// Structure used for replying to the RPC calls
type Reply struct {
X int
}
// Structure for registering RPC methods
type AppendEntries struct{}
// Creates a raft object. This implements the SharedLog interface.
// commitCh is the channel that the kvstore waits on for committed messages.
// When the process starts, the local disk log is read and all committed
// entries are recovered and replayed
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, logger *log.Logger) (*Raft, error) {
rft := new(Raft)
rft.commitCh = commitCh
rft.clusterConfig = config
rft.id = thisServerId
Info = logger
lsn = 0
return rft, nil
}
// Creates a log entry. This implements the LogEntry interface
// data: data bytes, committed: commit status, conn: connection to client
// Returns the log entry
func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData)
entry.Id = lsn
entry.Data = data
entry.conn = conn
entry.Committed = committed
lsn++
return entry
}
// Goroutine that monitors channel to check if the majority of servers have replied
func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) {
acks_received := 0
num_servers := len(rft.clusterConfig.Servers)
required_acks := num_servers / 2
up := make(chan bool, 1)
err := false
go func() {
time.Sleep(ACK_TIMEOUT * time.Second)
up <- true
}()
for {
select {
case temp := <-ack_ch:
Info.Println("Ack Received:", temp)
acks_received += temp
if acks_received == required_acks {
Info.Println("Majority Achieved", log_entry.(*LogEntryData).Id)
rft.LogArray[log_entry.(*LogEntryData).Id].Committed = true
//Info.Println(rft.LogArray)
rft.commitCh <- log_entry
temp := new(CommitData)
temp.Id = log_entry.(*LogEntryData).Id
for _, server := range rft.clusterConfig.Servers[1:] {
go doCommitRPCCall(server.Hostname, server.LogPort, temp)
}
majCh <- true
err = true
break
}
case <-up:
Info.Println("Error")
err = true
break
}
if err {
break
}
}
}
// Gets the Lsn
func (entry *LogEntryData) GetLsn() Lsn {
return entry.Id
}
// Get data
func (entry *LogEntryData) GetData() []byte {
return entry.Data
}
// Get committed status
func (entry *LogEntryData) GetCommitted() bool {
return entry.Committed
}
// Sets the committed status
func (entry *LogEntryData) SetCommitted(committed bool) {
entry.Committed = committed
}
// Call CommitRPC to inform the followers of newly committed log entry
func doCommitRPCCall(hostname string, logPort int, temp *CommitData) {
Info.Println("Commit RPC")
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("Calling Commit RPC", logPort)
commitCall := client.Go("AppendEntries.CommitRPC", args, reply, nil) //let go allocate done channel
commitCall = <-commitCall.Done
Info.Println("Reply", commitCall, reply.X)
}
//make rpc call to followers
func doRPCCall(ackChan chan int, hostname string, logPort int, temp *LogEntryData) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("RPC Called", logPort)
appendCall := client.Go("AppendEntries.AppendEntriesRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
Info.Println("Reply", appendCall, reply.X)
ackChan <- reply.X
}
//make raft implement the append function
func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
Info.Println("Append Called")
if rft.id != 1 {
return nil, ErrRedirect(1)
}
defer rft.Unlock()
rft.Lock()
temp := NewLogEntry(data, false, conn)
rft.LogArray = append(rft.LogArray, temp)
ackChan := make(chan int)
majChan := make(chan bool)
go monitorAckChannel(rft, ackChan, temp, majChan)
for _, server := range rft.clusterConfig.Servers[1:] {
go doRPCCall(ackChan, server.Hostname, server.LogPort, temp)
}
if <-majChan {
//
}
return temp, nil
}
//AddToChannel
func (rft *Raft) AddToChannel(entry LogEntry) {
Info.Println("Adding to commit", entry)
rft.commitCh <- entry
}
func NewServerConfig(serverId int) (*ServerConfig, error) {
server := new(ServerConfig)
server.Id = serverId
server.Hostname = "127.0.0.1"
server.ClientPort = CLIENT_PORT + serverId
server.LogPort = LOG_PORT + serverId
return server, nil
}
func NewClusterConfig(num_servers int) (*ClusterConfig, error) {
config := new(ClusterConfig)
config.Path = ""
config.Servers = make([]ServerConfig, num_servers)
for i := 1; i <= num_servers; i++ {
curr_server, _ := NewServerConfig(i)
config.Servers[i-1] = *(curr_server)
}
return config, nil
}
func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(0)
}
// server.go
package main
import (
"io/ioutil"
"log"
"os"
"raft"
"strconv"
)
// Logger
var Info *log.Logger
//global raft object for each server instance
var rft *raft.Raft
//Simple logger that is enabled or disabled according to the command line arguments. In test cases
//it is redirected to a file per server {1..5}.
//arguments: current server id, toggle enable/disable
//return: none
//receiver: none
func initLogger(serverId int, toDebug bool) {
// Logger Initializaion
if !toDebug {
Info = log.New(ioutil.Discard, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
} else {
Info = log.New(os.Stdout, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
}
Info.Println("Initialized server.")
}
//Entry point for application. Starts all major server go routines and then waits for ever
func main() {
sid, err := strconv.Atoi(os.Args[1])
ch1 := make(chan bool)
ch2 := make(chan bool)
if err != nil {
Info.Println("argument ", os.Args[1], "is not string")
}
if len(os.Args) > 3 {
initLogger(sid, true)
} else {
initLogger(sid, false)
}
Info.Println("Starting")
serverCount, err2 := strconv.Atoi((os.Args[2]))
if err2 != nil {
Info.Println("argument ", os.Args[2], "is not string")
}
server, _ := raft.NewServerConfig(sid)
clusterConfig, _ := raft.NewClusterConfig(serverCount)
commitCh := make(chan raft.LogEntry)
rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info)
raft.InitKVStore(Info, sid)
go raft.MonitorCommitChannel(commitCh) //for kvstore
go initClientCommunication(server, rft, ch1)
go initInterServerCommunication(server, rft, ch2)
for <-ch1 && <-ch2 {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment