Commit 6a376dbb authored by Sushant Mahajan's avatar Sushant Mahajan

added some persistance information and made entries in raft data structure

parent 9bbecccb
package raft
import (
"io/ioutil"
"log"
"math/rand"
"net"
"net/rpc"
"os"
"strconv"
"sync"
"time"
......@@ -14,17 +15,14 @@ import (
const (
CLIENT_PORT = 9000
LOG_PORT = 20000
ACK_TIMEOUT = 5
ACK_TIMEOUT = 5
MIN_TIMEOUT = 300
MAX_TIMEOUT = 500
LEADER = iota
CANDIDATE = iota
FOLLOWER = iota
CANDIDATE
FOLLOWER
)
// Logger
var Info *log.Logger
// Global variable for generating unique log sequence numbers
var lsn Lsn
......@@ -51,24 +49,24 @@ type ClusterConfig struct {
Servers []ServerConfig // All servers in this cluster
}
type ClientAppend struct{
type ClientAppend struct {
logEntry LogEntry
}
type VoteRequest struct{
type VoteRequest struct {
term int
candidateId int
lastLogIndex int
lastLogTerm int
}
type AppendRPC struct{
type AppendRPC struct {
}
type Timeout struct{
type Timeout struct {
}
type RaftEvent interface{
type RaftEvent interface {
}
type SharedLog interface {
......@@ -83,7 +81,10 @@ type Raft struct {
clusterConfig *ClusterConfig // Cluster
id int // Server id
sync.RWMutex
eventCh chan RaftEvent //receive events related to various states
Info *log.Logger //log for raft instance
eventCh chan RaftEvent //receive events related to various states
votedFor int
currentTerm int
}
// Log entry interface
......@@ -101,31 +102,41 @@ type LogEntryData struct {
conn net.Conn // Connection for communicating with client
}
// Structure for calling commit RPC
type CommitData struct {
Id Lsn
}
// Structure used for replying to the RPC calls
type Reply struct {
X int
func getCurrentTerm(serverId int, info *log.Logger) int {
if file, err := os.Open("currentTerm" + strconv.Itoa(serverId)); err != nil {
ioutil.WriteFile("currentTerm"+strconv.Itoa(serverId), []byte("0"), 0666)
info.Println("wrote in term file:0")
return 0
} else {
if data, err := ioutil.ReadFile(file.Name()); err != nil {
info.Println("error reading file")
return -1
} else {
info.Println("read from file")
if t, err2 := strconv.Atoi(string(data)); err2 != nil {
info.Println("error converting")
return -1
} else {
info.Println("Converted success", t)
return t
}
}
return -1
}
}
// Structure for registering RPC methods
type AppendEntries struct{}
// Creates a raft object. This implements the SharedLog interface.
// commitCh is the channel that the kvstore waits on for committed messages.
// When the process starts, the local disk log is read and all committed
// entries are recovered and replayed
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, eventCh, chan RaftEvent, logger *log.Logger) (*Raft, error) {
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, eventCh chan RaftEvent, toDebug bool) (*Raft, error) {
rft := new(Raft)
rft.commitCh = commitCh
rft.clusterConfig = config
rft.id = thisServerId
Info = logger
lsn = 0
rft.eventCh = eventCh
rft.Info = getLogger(thisServerId, toDebug)
rft.currentTerm = getCurrentTerm(thisServerId, rft.Info)
return rft, nil
}
......@@ -142,52 +153,6 @@ func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
return entry
}
// Goroutine that monitors channel to check if the majority of servers have replied
func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) {
acks_received := 0
num_servers := len(rft.clusterConfig.Servers)
required_acks := num_servers / 2
up := make(chan bool, 1)
err := false
go func() {
time.Sleep(ACK_TIMEOUT * time.Second)
up <- true
}()
for {
select {
case temp := <-ack_ch:
Info.Println("Ack Received:", temp)
acks_received += temp
if acks_received == required_acks {
Info.Println("Majority Achieved", log_entry.(*LogEntryData).Id)
rft.LogArray[log_entry.(*LogEntryData).Id].Committed = true
//Info.Println(rft.LogArray)
rft.commitCh <- log_entry
temp := new(CommitData)
temp.Id = log_entry.(*LogEntryData).Id
for _, server := range rft.clusterConfig.Servers[1:] {
go doCommitRPCCall(server.Hostname, server.LogPort, temp)
}
majCh <- true
err = true
break
}
case <-up:
Info.Println("Error")
err = true
break
}
if err {
break
}
}
}
// Gets the Lsn
func (entry *LogEntryData) GetLsn() Lsn {
return entry.Id
......@@ -208,39 +173,9 @@ func (entry *LogEntryData) SetCommitted(committed bool) {
entry.Committed = committed
}
// Call CommitRPC to inform the followers of newly committed log entry
func doCommitRPCCall(hostname string, logPort int, temp *CommitData) {
Info.Println("Commit RPC")
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("Calling Commit RPC", logPort)
commitCall := client.Go("AppendEntries.CommitRPC", args, reply, nil) //let go allocate done channel
commitCall = <-commitCall.Done
Info.Println("Reply", commitCall, reply.X)
}
//make rpc call to followers
func doRPCCall(ackChan chan int, hostname string, logPort int, temp *LogEntryData) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("RPC Called", logPort)
appendCall := client.Go("AppendEntries.AppendEntriesRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
Info.Println("Reply", appendCall, reply.X)
ackChan <- reply.X
}
//make raft implement the append function
func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
Info.Println("Append Called")
rft.Info.Println("Append Called")
if rft.id != 1 {
return nil, ErrRedirect(1)
}
......@@ -250,24 +185,12 @@ func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
rft.LogArray = append(rft.LogArray, temp)
ackChan := make(chan int)
majChan := make(chan bool)
go monitorAckChannel(rft, ackChan, temp, majChan)
for _, server := range rft.clusterConfig.Servers[1:] {
go doRPCCall(ackChan, server.Hostname, server.LogPort, temp)
}
if <-majChan {
//
}
return temp, nil
}
//AddToChannel
func (rft *Raft) AddToChannel(entry LogEntry) {
Info.Println("Adding to commit", entry)
rft.Info.Println("Adding to commit", entry)
rft.commitCh <- entry
}
......@@ -298,53 +221,53 @@ func (e ErrRedirect) Error() string {
}
//entry loop to raft
func (raft *Raft) loop() {
func (rft *Raft) loop() {
state := FOLLOWER
for {
rft.Info.Println("hello")
switch state {
case FOLLOWER:
state = follower()
case CANDIDATE:
state = candidate()
case LEADER:
state = leader()
// case CANDIDATE:
// state = candidate()
// case LEADER:
// state = leader()
default:
return
}
}
}
func (raft *Raft) follower() {
func getTimer() *time.Timer {
return time.NewTimer(time.Millisecond * time.Duration((rand.Intn(MAX_TIMEOUT)+MIN_TIMEOUT)%MAX_TIMEOUT))
}
func (rft *Raft) follower() int {
//start candidate timeout
isCandidateChan = time.After((rand.Intn(MAX_TIMEOUT) + MIN_TIMEOUT) % MAX_TIMEOUT)
candTimer := getTimer()
for {
//wrap in select
event := <- raft.eventCh
switch event.(type) {
case *ClientAppend:
// Do not handle clients in follower mode. Send it back up the
// pipe with committed = false
event.(*LogEntry).SetCommitted(false)
raft.commitCh <- event.(*LogEntry)
case *VoteRequest:
msg = event.msg
if msg.term < currentterm, respond with
if msg.term > currentterm, upgrade currentterm
if not already voted in my term
reset timer
reply ok to event.msg.serverid
remember term, leader id (either in log or in separate file)
case *AppendRPC:
reset timer
if msg.term < currentterm, ignore
reset heartbeat timer
upgrade to event.msg.term if necessary
if prev entries of my log and event.msg match
add to disk log
flush disk log
respond ok to event.msg.serverid
else
respond err.
case *Timeout : return candidate // new state back to loop()
}
}
\ No newline at end of file
select {
case <-candTimer.C:
return CANDIDATE
case event := <-rft.eventCh:
switch event.(type) {
case *ClientAppend:
// Do not handle clients in follower mode. Send it back up the
// pipe with committed = false
event.(*ClientAppend).logEntry.SetCommitted(false)
rft.commitCh <- event.(*ClientAppend).logEntry
case *VoteRequest:
req := event.(*VoteRequest)
if req.term < rft.currentTerm {
//reply as - not accepted as leader
}
if req.term > rft.currentTerm {
//update currentTerm
}
//condition for - if not voted in current term
}
}
}
}
// server.go
package main
package raft
import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"raft"
"strconv"
"time"
)
// Logger
var Info *log.Logger
var rafts map[int]*Raft
//global raft object for each server instance
var rft *raft.Raft
//Simple logger that is enabled or disabled according to the command line arguments. In test cases
//it is redirected to a file per server {1..5}.
//arguments: current server id, toggle enable/disable
//return: none
//receiver: none
func initLogger(serverId int, toDebug bool) {
// Logger Initializaion
func getLogger(serverId int, toDebug bool) (l *log.Logger) {
if !toDebug {
Info = log.New(ioutil.Discard, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
l = log.New(ioutil.Discard, "INFO: ", log.Ltime|log.Lshortfile)
} else {
Info = log.New(os.Stdout, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
logf, _ := os.OpenFile(strconv.Itoa(serverId), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
l = log.New(logf, "INFO: ", log.Ltime|log.Lshortfile)
}
Info.Println("Initialized server.")
l.Println("Initialized server.")
return l
}
//Entry point for application. Starts all major server go routines and then waits for ever
func main() {
rand.Seed(time.Now().UnixNano())
sid, err := strconv.Atoi(os.Args[1])
if err != nil {
Info.Println("argument ", os.Args[1], "is not string")
}
if len(os.Args) > 3 {
initLogger(sid, true)
} else {
initLogger(sid, false)
func Start(serverId int, commitCh chan LogEntry, eventCh chan RaftEvent, dummyCh chan bool, toDebug bool) {
clusterConfig, _ := NewClusterConfig(5)
rft, _ := NewRaft(clusterConfig, serverId, commitCh, eventCh, true)
if rafts == nil {
rafts = make(map[int]*Raft)
}
Info.Println("Starting")
serverCount, err2 := strconv.Atoi((os.Args[2]))
if err2 != nil {
Info.Println("argument ", os.Args[2], "is not string")
}
server, _ := raft.NewServerConfig(sid)
clusterConfig, _ := raft.NewClusterConfig(serverCount)
commitCh := make(chan raft.LogEntry)
rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info)
rafts[serverId] = rft
fmt.Println(len(rafts))
rft.loop()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment