Commit 2373e395 authored by Sushant Mahajan's avatar Sushant Mahajan

completed implementation, added test cases

parent e0747868
package main
import (
"fmt"
"raft"
)
const (
SERVERS = 5
)
func main() {
dummyCh := make(chan bool, 1)
fmt.Println("Started")
for i := 1; i <= 5; i++ {
go raft.Start(i, true)
}
if <-dummyCh {
fmt.Println("khattam")
}
}
......@@ -18,7 +18,6 @@ import (
const (
CLIENT_PORT = 9000
LOG_PORT = 20000
ACK_TIMEOUT = 5
MIN_TIMEOUT_ELEC = 300
MAX_TIMEOUT_ELEC = 500
HEARTBEAT_TIMEOUT = 100
......@@ -30,7 +29,7 @@ const (
LOG_PERSIST = "log"
FILE_WRITTEN = 0
FILE_ERR = -1
NULL_VOTE = 0
NULL_VOTE = -1
LOG_INVALID_INDEX = -1
LOG_INVALID_TERM = -1
)
......@@ -61,10 +60,12 @@ type ClusterConfig struct {
Servers []ServerConfig // All servers in this cluster
}
//structure used to identify a request from client
type ClientAppend struct {
logEntry *LogEntryData
}
//structure used to identify a request from candidate for votes
type VoteRequest struct {
Term int
CandidateId int
......@@ -72,11 +73,13 @@ type VoteRequest struct {
LastLogTerm int
}
//structure used to identify a reply from votes RPC
type VoteRequestReply struct {
CurrentTerm int
Reply bool
}
//structure used to identify a reply from Append RPC
type AppendReply struct {
CurrentTerm int
Reply bool
......@@ -84,6 +87,7 @@ type AppendReply struct {
LogLength int
}
//structure used to identify a req to perform Append Entries
type AppendRPC struct {
Term int
LeaderId int
......@@ -98,9 +102,7 @@ type Reply struct {
X int
}
type Timeout struct {
}
//interface used to encapsulate all structures composing the RPCs
type RaftEvent interface {
}
......@@ -118,19 +120,19 @@ type Raft struct {
sync.RWMutex
Info *log.Logger //log for raft instance
eventCh chan RaftEvent //receive events related to various states
votedFor int
votedFor int //which server have you voted for
currentTerm int
commitIndex int
voters int
shiftStatusCh chan int
voteReplyCh chan RaftEvent
appendReplyCh chan RaftEvent
et *time.Timer
voters int //number of servers who voted for you
shiftStatusCh chan int //used for switching candidate to follower or leader
voteReplyCh chan RaftEvent //used to receive votes from followers
appendReplyCh chan RaftEvent //used to receive the reply fot append entries sent
et *time.Timer //election timeout
IsLeader bool
lastApplied int
nextIndex []int
matchIndex []int
LeaderId int
LeaderId int //so everyone knows who the leader is and can reply to the client
}
// Log entry interface
......@@ -154,6 +156,7 @@ type CommitData struct {
Id Lsn
}
//persists log to the disk for later retrieval
func (rft *Raft) persistLog() {
if file, err := os.OpenFile(LOG_PERSIST+strconv.Itoa(rft.id), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666); err != nil {
rft.Info.Println("error opening log persist file", err.Error())
......@@ -165,14 +168,10 @@ func (rft *Raft) persistLog() {
rft.Info.Println("error persisting log entry", err.Error())
}
}
if err := file.Sync(); err != nil {
rft.Info.Println("error synching log persist file", err.Error())
} else {
rft.Info.Println("log persist success!")
}
}
}
//used for reading the entire log from disk if it exists
func (rft *Raft) readLogFromDisk() {
rft.LogArray = []*LogEntryData{}
if file, err := os.OpenFile(LOG_PERSIST+strconv.Itoa(rft.id), os.O_RDONLY, 0666); err != nil {
......@@ -192,12 +191,14 @@ func (rft *Raft) readLogFromDisk() {
}
}
//for reading votedfor and currentterm from disk
func getSingleDataFromFile(name string, serverId int, info *log.Logger) int {
filename := name + strconv.Itoa(serverId)
if file, err := os.Open(filename); err != nil {
defer file.Close()
ioutil.WriteFile(filename, []byte("0"), 0666)
ioutil.WriteFile(filename, []byte("-1"), 0666)
//file.Sync()
//info.Println("wrote in " + filename + " file")
return 0
} else {
......@@ -217,16 +218,12 @@ func getSingleDataFromFile(name string, serverId int, info *log.Logger) int {
}
}
func writeFile(name string, serverId int, data int, info *log.Logger) int {
//write data to some file, used to currentterm and votedfor
func writeFile(name string, serverId int, data int, Info *log.Logger) int {
filename := name + strconv.Itoa(serverId)
if file, err := os.Open(filename); err != nil {
defer file.Close()
return FILE_ERR
} else {
ioutil.WriteFile(filename, []byte(strconv.Itoa(data)), 0666)
//info.Println("wrote in " + filename + " file")
return FILE_WRITTEN //file written
}
ioutil.WriteFile(filename, []byte(strconv.Itoa(data)), 0666)
Info.Println("wrote in "+filename, data)
return FILE_WRITTEN //file written
}
// Creates a raft object. This implements the SharedLog interface.
......@@ -254,6 +251,7 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, In
rft.nextIndex = make([]int, len(config.Servers))
rft.matchIndex = make([]int, len(config.Servers))
rft.commitIndex = -1
rft.votedFor = -1
rft.lastApplied = -1
return rft, nil
}
......@@ -359,6 +357,7 @@ func (e ErrRedirect) Error() string {
//entry loop to raft
func (rft *Raft) Loop() {
//start moninoring the commitIndex so that required entries are sent to the followers
go rft.MonitorStateMachine()
state := FOLLOWER
for {
......@@ -373,16 +372,19 @@ func (rft *Raft) Loop() {
}
}
func getRandTime(log *log.Logger) time.Duration {
//returns a new integer which is used to reset the timer
func getRandTime(log *log.Logger) int {
rand.Seed(time.Now().UnixNano())
t := time.Millisecond * time.Duration(rand.Intn(MAX_TIMEOUT_ELEC-MIN_TIMEOUT_ELEC)+MIN_TIMEOUT_ELEC)
t := rand.Intn(MAX_TIMEOUT_ELEC-MIN_TIMEOUT_ELEC) + MIN_TIMEOUT_ELEC
log.Println("New rand time", t)
return t
}
//check for majority for a candidate.
//if someone replies with higher currentterm, revert to follower status
func (rft *Raft) handleMajority(reply *VoteRequestReply) {
majority := len(rft.clusterConfig.Servers) / 2
rft.Info.Println("[C]: favorable vote")
rft.Info.Println("[C]: vote received", reply)
if reply.Reply {
rft.voters++
rft.Info.Println("[C]: count", rft.voters)
......@@ -392,6 +394,7 @@ func (rft *Raft) handleMajority(reply *VoteRequestReply) {
}
} else {
if rft.currentTerm < reply.CurrentTerm {
rft.Info.Println("Candidate to follower", rft.voters)
rft.updateTermAndVote(reply.CurrentTerm)
rft.shiftStatusCh <- FOLLOWER
}
......@@ -435,7 +438,7 @@ func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest, rft *Raft
}
reply := new(VoteRequestReply)
args := temp
//rft.Info.Println("Calling vote request RPC", logPort)
rft.Info.Println("Calling vote request RPC", logPort)
voteReqCall := client.Go("RaftRPCService.VoteRequestRPC", args, reply, nil) //let go allocate done channel
voteReqCall = <-voteReqCall.Done
rft.handleMajority(reply)
......@@ -457,16 +460,17 @@ func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC, rft *Raft) {
rft.handleAppendReply(reply)
}
//set currentterm to latest value and reinitialize votedfor
func (rft *Raft) updateTermAndVote(term int) {
writeFile(CURRENT_TERM, rft.id, term, rft.Info)
rft.currentTerm = term
writeFile(VOTED_FOR, rft.id, NULL_VOTE, rft.Info)
writeFile(CURRENT_TERM, rft.id, term, rft.Info)
rft.votedFor = NULL_VOTE
writeFile(VOTED_FOR, rft.id, NULL_VOTE, rft.Info)
}
func (rft *Raft) follower() int {
//start candidate timeout
rft.et = time.NewTimer(getRandTime(rft.Info))
rft.et = time.NewTimer(time.Millisecond * time.Duration(getRandTime(rft.Info)))
SetIsLeader(false)
for {
//wrap in select
......@@ -484,37 +488,36 @@ func (rft *Raft) follower() int {
rft.eventCh <- event.(*ClientAppend).logEntry
case *VoteRequest:
rft.Info.Println("[F]: got vote request")
req := event.(*VoteRequest)
rft.Info.Println("[F]: got vote request", req)
reply := false
if req.Term < rft.currentTerm {
rft.Info.Println("[F]: req.Term < rft.currentTerm")
reply = false
}
if req.Term > rft.currentTerm ||
req.LastLogTerm > rft.currentTerm ||
(req.LastLogTerm == rft.currentTerm && req.LastLogIndex >= len(rft.LogArray)) {
rft.Info.Println("[F]: updating term and vote", req.Term, NULL_VOTE)
rft.updateTermAndVote(req.Term)
reply = true
}
if reply && rft.votedFor == NULL_VOTE {
rft.et.Reset(getRandTime(rft.Info))
rft.et.Reset(time.Millisecond * 300)
rft.Info.Println("[F]: timer reset, after vote")
writeFile(VOTED_FOR, rft.id, req.CandidateId, rft.Info)
rft.Info.Println("[F]: voted for ", strconv.Itoa(req.CandidateId))
rft.Info.Println("[F]: voted for ", req.CandidateId)
rft.votedFor = req.CandidateId
writeFile(VOTED_FOR, rft.id, req.CandidateId, rft.Info)
}
//let the asker know about the vote
voteReply := &VoteRequestReply{rft.currentTerm, reply}
//server := rft.clusterConfig.Servers[req.CandidateId]
//doCastVoteRPC(server.Hostname, server.LogPort, voteReply, rft.Info)
rft.voteReplyCh <- voteReply
case *AppendRPC:
//rft.LogF("got append rpc")
rft.et.Reset(getRandTime(rft.Info))
//rft.LogF("reset timer on appendRPC")
rft.et.Reset(time.Millisecond * time.Duration(getRandTime(rft.Info)))
rft.Info.Println("[F]:", "Timer reset on AppendRPC")
req := event.(*AppendRPC)
if len(req.Entries) == 0 { //heartbeat
rft.Info.Println("[F]: got hearbeat from " + strconv.Itoa(req.LeaderId))
......@@ -528,7 +531,7 @@ func (rft *Raft) follower() int {
reply := true
if req.PrevLogIndex == LOG_INVALID_INDEX || req.PrevLogIndex == LOG_INVALID_TERM {
rft.updateTermAndVote(req.Term)
//rft.updateTermAndVote(req.Term)
reply = true
} else if req.Term < rft.currentTerm {
reply = false
......@@ -556,7 +559,6 @@ func (rft *Raft) follower() int {
} else {
rft.LogArray = append(rft.LogArray[0:i], req.Entries[i-req.PrevLogIndex-1:]...)
}
//todo:also add to log
if req.LeaderCommit > rft.commitIndex {
if req.LeaderCommit > len(rft.LogArray)-1 {
......@@ -569,7 +571,6 @@ func (rft *Raft) follower() int {
temp := &AppendReply{rft.currentTerm, reply, rft.id, len(rft.LogArray)}
rft.appendReplyCh <- temp
//doAppendReplyRPC(rft.clusterConfig.Servers[req.LeaderId].Hostname, rft.clusterConfig.Servers[req.LeaderId].LogPort, temp, rft.Info)
if reply {
rft.persistLog()
}
......@@ -583,14 +584,16 @@ func (rft *Raft) candidate() int {
//increment current term
rft.Info.Println("[C]: became candidate")
SetIsLeader(false)
writeFile(CURRENT_TERM, rft.id, rft.currentTerm+1, rft.Info)
rft.currentTerm++
writeFile(CURRENT_TERM, rft.id, rft.currentTerm, rft.Info)
//vote for self
rft.voters = 1
writeFile(VOTED_FOR, rft.id, rft.id, rft.Info)
rft.votedFor = rft.id
writeFile(VOTED_FOR, rft.id, rft.id, rft.Info)
//reset timer
rft.et = time.NewTimer(getRandTime(rft.Info))
rft.et.Reset(time.Millisecond * time.Duration(getRandTime(rft.Info)))
rft.Info.Println("[C]:", rft.id, "candidate got new timer")
//create a vote request object
req := &VoteRequest{
......@@ -605,19 +608,11 @@ func (rft *Raft) candidate() int {
req.LastLogTerm = rft.LogArray[req.LastLogIndex].Term
}
//reinitialize rft.monitorVotesCh
//rft.monitorVotesCh = make(chan RaftEvent)
//killCh := make(chan bool)
//go monitorVotesChannelRoutine(rft, killCh)
//time.Sleep(time.Millisecond * 10)
//send vote request to all servers
rft.Info.Println("[C]:", "asking for votes from all servers")
for _, server := range rft.clusterConfig.Servers {
//rft.Info.Println(server.Id)
if server.Id != rft.id {
//rft.Info.Println("[C]: Vote request to " + strconv.Itoa(server.Id))
go doVoteRequestRPC(server.Hostname, server.LogPort, req, rft)
//rft.handleMajority(reply)
}
}
......@@ -638,13 +633,14 @@ func (rft *Raft) candidate() int {
switch event.(type) {
case (*AppendRPC):
rft.Info.Println("[Switch]: C to F")
rft.et.Reset(getRandTime(rft.Info))
rft.et.Reset(time.Millisecond * time.Duration(getRandTime(rft.Info)))
return FOLLOWER
}
}
}
}
//enforce the leaders log on all followers
func enforceLog(rft *Raft) {
for {
for _, server := range rft.clusterConfig.Servers {
......@@ -662,12 +658,23 @@ func enforceLog(rft *Raft) {
}
//appendRPC call
rft.Info.Println("[L]: XX append rpc enforce", req)
//rft.Info.Println("[L]: XX append rpc enforce", req)
doAppendRPCCall(server.Hostname, server.LogPort, req, rft)
rft.Info.Println("[L]: Sent append entries", strconv.Itoa(server.Id))
//rft.Info.Println("[L]: Sent append entries", strconv.Itoa(server.Id))
}
}
time.Sleep(time.Millisecond * 20)
time.Sleep(time.Millisecond)
}
}
//simple function to execute send heartbeats to all servers
func sendHeartbeats(rft *Raft, heartbeatReq *AppendRPC) {
for _, server := range rft.clusterConfig.Servers {
if server.Id != rft.id {
//doRPCCall for hearbeat
go doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq, rft)
//rft.Info.Println("[L]: Sent heartbeat", strconv.Itoa(server.Id))
}
}
}
......@@ -679,7 +686,6 @@ func (rft *Raft) leader() int {
heartbeatReq := new(AppendRPC)
heartbeatReq.Entries = []*LogEntryData{}
heartbeatReq.LeaderId = rft.id
rft.currentTerm++
//build nextIndex and matchIndex
for i := 0; i < len(rft.nextIndex); i++ {
......@@ -687,18 +693,14 @@ func (rft *Raft) leader() int {
rft.matchIndex[i] = 0
}
//send first heartbeat
sendHeartbeats(rft, heartbeatReq)
go enforceLog(rft)
for {
select {
case <-heartbeat.C:
for _, server := range rft.clusterConfig.Servers {
if server.Id != rft.id {
//doRPCCall for hearbeat
go doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq, rft)
rft.Info.Println("[L]: Sent heartbeat", strconv.Itoa(server.Id))
}
}
sendHeartbeats(rft, heartbeatReq)
heartbeat.Reset(time.Millisecond * HEARTBEAT_TIMEOUT)
case event := <-rft.eventCh:
......@@ -721,15 +723,15 @@ func (rft *Raft) leader() int {
}
}
//run as go routine to monitor the commit index and execute on kvstore
func (rft *Raft) MonitorStateMachine() {
rft.Info.Println("MonitorStateMachine initialized")
for {
rft.Info.Println("[L]: C,L", rft.commitIndex, rft.lastApplied)
if rft.commitIndex > rft.lastApplied {
rft.lastApplied++
rft.Info.Println("data put on commit channel")
rft.commitCh <- rft.LogArray[rft.lastApplied]
}
time.Sleep(time.Second)
//to avoid cpu intensive go routine
time.Sleep(time.Millisecond)
}
}
......@@ -216,7 +216,7 @@ func main() {
go initInterServerCommunication(server, rft, ch2)
time.Sleep(100 * time.Millisecond)
rft.Loop()
go rft.Loop()
for <-ch1 && <-ch2 {
......
......@@ -20,34 +20,37 @@ const (
)
type Testpair struct {
to_server []byte
from_server []byte
test []byte
expected []byte
}
var LeaderId int
var Info *log.Logger
var logger *log.Logger
var keyPrefix int
func getPrefix() int {
keyPrefix++
return keyPrefix
}
//
func TestAll(t *testing.T) {
//start the servers
initLogger()
initTestLogger()
for i := 0; i < NUM_SERVERS; i++ {
go startServers(i, t)
}
//wait for some time so that servers are ready
time.Sleep(5 * time.Second)
time.Sleep(8 * time.Second)
testPerformClientConnect(t)
testPerformSet(t, 1)
testPerformSet(t, 2)
testPerformSet(t, 3)
testPerformSet(t, 4)
//testKillLeader(t)
testCommands(t)
killServers()
}
//kill all servers, cleanup
func killServers() {
Info.Println("killing servers")
logger.Println("killing servers")
cmd := exec.Command("sh", "-c", "for i in `netstat -ntlp|grep server|awk '{print $7}'`; do kill -9 ${i%%/*}; done")
cmd.Run()
}
......@@ -66,6 +69,7 @@ func startServers(i int, t *testing.T) {
cmd.Run()
}
//check which server is the leader
func probeLeader(t *testing.T) (int, error) {
if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000)); err != nil {
t.Errorf("Could not connect")
......@@ -87,6 +91,7 @@ func probeLeader(t *testing.T) (int, error) {
}
}
//returns a connection to the leader server
func getLeaderConn(t *testing.T) net.Conn {
if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000+LeaderId)); err != nil {
t.Errorf("Could not connect")
......@@ -96,10 +101,10 @@ func getLeaderConn(t *testing.T) net.Conn {
}
}
func initLogger() {
// Logger Initializaion
//initialize logger meant for test cases
func initTestLogger() {
f, _ := os.OpenFile("test.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
Info = log.New(f, "INFO: ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
logger = log.New(f, "INFO: ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
}
func testPerformClientConnect(t *testing.T) {
......@@ -109,24 +114,96 @@ func testPerformClientConnect(t *testing.T) {
} else if id < 0 || id > 4 {
t.Errorf("Invalid leader id")
}
Info.Println("Leader Id:", id)
logger.Println("Leader Id:", id)
}
func testCommands(t *testing.T) {
testPerformMultipleSet(t, getPrefix(), 1) //check single set
testPerformMultipleSet(t, getPrefix(), 200)
testPerformCas(t)
testPerformMultipleCas(t, 200)
testPerformMultipleGet(t, 200)
testPerformMultipleGetm(t, 200)
testPerformMultipleDelete(t, 100)
}
func doTest(conn net.Conn, t *testing.T, test *Testpair) {
conn.Write(test.test)
buf := make([]byte, 1024)
time.Sleep(time.Millisecond * 20)
n, _ := conn.Read(buf)
if !bytes.Equal(test.expected, buf[:n]) {
logger.Println("test:", string(test.test), "got:", string(buf[:n]), "expected:", string(test.expected))
t.Errorf("invalid reply received", string(buf[:n]))
}
}
func testPerformSet(t *testing.T, i int) {
func testPerformMultipleSet(t *testing.T, start int, times int) {
if conn := getLeaderConn(t); conn != nil {
sending := []byte("set mykey" + strconv.Itoa(i) + " 100 3\r\nlul\r\n")
conn.Write(sending)
buffer := make([]byte, 1024)
time.Sleep(time.Millisecond * 50)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
str := string(buffer[:n])
if strings.TrimSpace(str) != "OK "+strconv.Itoa(1) {
t.Errorf("invalid reply received", str)
} else {
Info.Println(str)
defer conn.Close()
for i := start; i < start+times; i++ {
test := &Testpair{[]byte("set mykey" + strconv.Itoa(i) + " 0 3\r\nlul\r\n"), []byte("OK 1\r\n")}
doTest(conn, t, test)
}
} else {
t.Errorf("could not get leader connection")
}
}
func testPerformCas(t *testing.T) {
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
test := &Testpair{[]byte("cas mykey1 1000 1 3\r\nlul\r\n"), []byte("OK 2\r\n")}
doTest(conn, t, test)
} else {
t.Errorf("could not get leader connection")
}
}
func testPerformMultipleCas(t *testing.T, end int) {
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := 0; i < end; i++ {
test := &Testpair{[]byte("cas mykey2 1000 " + strconv.Itoa(i+1) + " 3\r\nlul\r\n"), []byte("OK " + strconv.Itoa(i+2) + "\r\n")}
doTest(conn, t, test)
}
} else {
t.Errorf("could not get leader connection")
}
}
func testPerformMultipleGet(t *testing.T, end int) {
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := 0; i < end; i++ {
test := &Testpair{[]byte("get mykey3\r\n"), []byte("VALUE 3\r\nlul\r\n")}
doTest(conn, t, test)
}
} else {
t.Errorf("could not get leader connection")
}
}
func testPerformMultipleGetm(t *testing.T, end int) {
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := 0; i < end; i++ {
test := &Testpair{[]byte("getm mykey4\r\n"), []byte("VALUE 1 0 3\r\nlul\r\n")}
doTest(conn, t, test)
}
} else {
t.Errorf("could not get leader connection")
}
}
func testPerformMultipleDelete(t *testing.T, end int) {
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := 0; i < end; i++ {
test := &Testpair{[]byte("delete mykey" + strconv.Itoa(i+1) + "\r\n"), []byte("DELETED\r\n")}
doTest(conn, t, test)
}
conn.Close()
} else {
t.Errorf("could not get leader connection")
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment