Commit 49dd3884 authored by Sushant Mahajan's avatar Sushant Mahajan

added documentation and test case for killing leader

parent 0a6c5d11
#! /bin/bash #! /bin/bash
export GOPATH=/home/sushant/gocode/src/cs733/assignment4
export GOMAXPROCS=2
rm {0..4} currentTerm* votedFor* log* test.log rm {0..4} currentTerm* votedFor* log* test.log
...@@ -428,36 +428,41 @@ func (rft *Raft) handleAppendReply(temp *AppendReply) { ...@@ -428,36 +428,41 @@ func (rft *Raft) handleAppendReply(temp *AppendReply) {
} }
func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest, rft *Raft) { func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest, rft *Raft) {
rft.Info.Println("[C]:Vote request RPC") rft.Info.Println("[C]:Vote request RPC", logPort)
//rpc call to the caller //rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
defer client.Close()
if err != nil { if err != nil {
rft.Info.Println("Dialing:", err, "returning") rft.Info.Println("Dialing:", err, "returning")
return //time.Sleep(time.Millisecond * 5)
//return
} else {
defer client.Close()
reply := new(VoteRequestReply)
args := temp
rft.Info.Println("Calling vote request RPC", logPort)
voteReqCall := client.Go("RaftRPCService.VoteRequestRPC", args, reply, nil) //let go allocate done channel
voteReqCall = <-voteReqCall.Done
rft.handleMajority(reply)
} }
reply := new(VoteRequestReply)
args := temp
rft.Info.Println("Calling vote request RPC", logPort)
voteReqCall := client.Go("RaftRPCService.VoteRequestRPC", args, reply, nil) //let go allocate done channel
voteReqCall = <-voteReqCall.Done
rft.handleMajority(reply)
} }
//make append entries rpc call to followers //make append entries rpc call to followers
func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC, rft *Raft) { func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC, rft *Raft, whichAppend string) {
rft.Info.Println("Making", whichAppend)
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
defer client.Close()
if err != nil { if err != nil {
rft.Info.Println("[L]: Dialing:", err, "returning") rft.Info.Println("[L]: Dialing:", err, "returning")
return //time.Sleep(time.Millisecond * 5)
//return
} else {
defer client.Close()
reply := new(AppendReply)
args := temp
rft.Info.Println("[L]: RPC Called", logPort)
appendCall := client.Go("RaftRPCService.AppendRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
rft.handleAppendReply(reply)
} }
reply := new(AppendReply)
args := temp
rft.Info.Println("[L]: RPC Called", logPort)
appendCall := client.Go("RaftRPCService.AppendRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
rft.handleAppendReply(reply)
} }
//set currentterm to latest value and reinitialize votedfor //set currentterm to latest value and reinitialize votedfor
...@@ -468,6 +473,9 @@ func (rft *Raft) updateTermAndVote(term int) { ...@@ -468,6 +473,9 @@ func (rft *Raft) updateTermAndVote(term int) {
writeFile(VOTED_FOR, rft.id, NULL_VOTE, rft.Info) writeFile(VOTED_FOR, rft.id, NULL_VOTE, rft.Info)
} }
//follower function keeps running until the election timeout runs out. Then a shift to
//candidate will be made. The leader will keep sending heartbeats in much smaller intervals
//than election timeout to make sure followers do not switch.
func (rft *Raft) follower() int { func (rft *Raft) follower() int {
//start candidate timeout //start candidate timeout
rft.et = time.NewTimer(time.Millisecond * time.Duration(getRandTime(rft.Info))) rft.et = time.NewTimer(time.Millisecond * time.Duration(getRandTime(rft.Info)))
...@@ -519,7 +527,8 @@ func (rft *Raft) follower() int { ...@@ -519,7 +527,8 @@ func (rft *Raft) follower() int {
rft.et.Reset(time.Millisecond * time.Duration(getRandTime(rft.Info))) rft.et.Reset(time.Millisecond * time.Duration(getRandTime(rft.Info)))
rft.Info.Println("[F]:", "Timer reset on AppendRPC") rft.Info.Println("[F]:", "Timer reset on AppendRPC")
req := event.(*AppendRPC) req := event.(*AppendRPC)
if len(req.Entries) == 0 { //heartbeat //heartbeat
if len(req.Entries) == 0 {
rft.Info.Println("[F]: got hearbeat from " + strconv.Itoa(req.LeaderId)) rft.Info.Println("[F]: got hearbeat from " + strconv.Itoa(req.LeaderId))
temp := &AppendReply{-1, true, -1, -1} temp := &AppendReply{-1, true, -1, -1}
rft.Info.Println("[F]: sending dummy reply to " + strconv.Itoa(req.LeaderId)) rft.Info.Println("[F]: sending dummy reply to " + strconv.Itoa(req.LeaderId))
...@@ -658,9 +667,7 @@ func enforceLog(rft *Raft) { ...@@ -658,9 +667,7 @@ func enforceLog(rft *Raft) {
} }
//appendRPC call //appendRPC call
//rft.Info.Println("[L]: XX append rpc enforce", req) doAppendRPCCall(server.Hostname, server.LogPort, req, rft, "log enforce")
doAppendRPCCall(server.Hostname, server.LogPort, req, rft)
//rft.Info.Println("[L]: Sent append entries", strconv.Itoa(server.Id))
} }
} }
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
...@@ -672,8 +679,7 @@ func sendHeartbeats(rft *Raft, heartbeatReq *AppendRPC) { ...@@ -672,8 +679,7 @@ func sendHeartbeats(rft *Raft, heartbeatReq *AppendRPC) {
for _, server := range rft.clusterConfig.Servers { for _, server := range rft.clusterConfig.Servers {
if server.Id != rft.id { if server.Id != rft.id {
//doRPCCall for hearbeat //doRPCCall for hearbeat
go doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq, rft) go doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq, rft, "heartbeat")
//rft.Info.Println("[L]: Sent heartbeat", strconv.Itoa(server.Id))
} }
} }
} }
......
...@@ -77,18 +77,6 @@ func (t *RaftRPCService) AppendRPC(args *raft.AppendRPC, reply *raft.AppendReply ...@@ -77,18 +77,6 @@ func (t *RaftRPCService) AppendRPC(args *raft.AppendRPC, reply *raft.AppendReply
return nil return nil
} }
//RPC for follower server. To let followers know that and entry can be committed.
//arguments: pointer to argument struct (has LogEntry), pointer to reply struct
//returns: error
//receiver: pointer to RaftRPCService
func (t *RaftRPCService) CommitRPC(args *raft.CommitData, reply *Reply) error {
Info.Println("Commit RPC invoked")
rft.LogArray[(*args).Id].SetCommitted(true)
rft.AddToChannel(rft.LogArray[(*args).Id])
reply.X = 1
return nil
}
//RPC called by candidate server. To ask the follower for votes. //RPC called by candidate server. To ask the follower for votes.
//arguments: pointer to argument struct (has VoteRequest), pointer to reply struct VoteRequestReply //arguments: pointer to argument struct (has VoteRequest), pointer to reply struct VoteRequestReply
//returns: error //returns: error
......
...@@ -49,6 +49,7 @@ func TestAll(t *testing.T) { ...@@ -49,6 +49,7 @@ func TestAll(t *testing.T) {
testPerformClientConnect(t) testPerformClientConnect(t)
testCommands(t) testCommands(t)
testConcurrent(t, 10, 10) testConcurrent(t, 10, 10)
testLeaderChange(t)
killServers() killServers()
} }
...@@ -76,29 +77,32 @@ func startServers(i int, t *testing.T) { ...@@ -76,29 +77,32 @@ func startServers(i int, t *testing.T) {
//check which server is the leader //check which server is the leader
func probeLeader(t *testing.T) (int, error) { func probeLeader(t *testing.T) (int, error) {
logger.Println("Probing leader") logger.Println("Probing leader")
if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000)); err != nil { for i := 0; i < NUM_SERVERS; i++ {
t.Errorf("Could not connect") if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000+i)); err != nil {
return -1, err logger.Println("could not connect to", strconv.Itoa(9000+i))
} else { continue
sending := []byte("set probe 100 3\r\nlul\r\n") } else {
conn.Write(sending) sending := []byte("set probe 100 3\r\nlul\r\n")
buffer := make([]byte, 1024) conn.Write(sending)
conn.Read(buffer) buffer := make([]byte, 1024)
n := bytes.Index(buffer, []byte{0}) conn.Read(buffer)
str := string(buffer[:n]) n := bytes.Index(buffer, []byte{0})
if strings.Contains(str, "ERR_REDIRECT") { str := string(buffer[:n])
str = strings.TrimSpace(str) if strings.Contains(str, "ERR_REDIRECT") {
id, _ := strconv.Atoi(strings.Fields(str)[1]) str = strings.TrimSpace(str)
LeaderId = id id, _ := strconv.Atoi(strings.Fields(str)[1])
return id, nil LeaderId = id
return id, nil
}
return 0, nil
} }
return 0, nil
} }
return -1, nil
} }
//returns a connection to the leader server //returns a connection to the leader server
func getLeaderConn(t *testing.T) net.Conn { func getLeaderConn(t *testing.T) net.Conn {
logger.Println("Getting connection to leader") logger.Println("Getting connection to leader", 9000+LeaderId)
if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000+LeaderId)); err != nil { if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000+LeaderId)); err != nil {
t.Errorf("Could not connect") t.Errorf("Could not connect")
return nil return nil
...@@ -113,6 +117,7 @@ func initTestLogger() { ...@@ -113,6 +117,7 @@ func initTestLogger() {
logger = log.New(f, "INFO: ", log.Ldate|log.Lmicroseconds|log.Lshortfile) logger = log.New(f, "INFO: ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
} }
//test a connection to a leader and also probe for the leader.
func testPerformClientConnect(t *testing.T) { func testPerformClientConnect(t *testing.T) {
logger.Println("testPerformClientConnect") logger.Println("testPerformClientConnect")
id, _ := probeLeader(t) id, _ := probeLeader(t)
...@@ -124,6 +129,7 @@ func testPerformClientConnect(t *testing.T) { ...@@ -124,6 +129,7 @@ func testPerformClientConnect(t *testing.T) {
logger.Println("Leader Id:", id) logger.Println("Leader Id:", id)
} }
//wrapper function to call functions for all kvstore commands
func testCommands(t *testing.T) { func testCommands(t *testing.T) {
logger.Println("testCommands") logger.Println("testCommands")
testPerformMultipleSet(t, getPrefix(), 1) //check single set testPerformMultipleSet(t, getPrefix(), 1) //check single set
...@@ -135,12 +141,15 @@ func testCommands(t *testing.T) { ...@@ -135,12 +141,15 @@ func testCommands(t *testing.T) {
testPerformMultipleDelete(t, 100) testPerformMultipleDelete(t, 100)
} }
//generic function that takes as input a test case sends data to server, receives reply
//and checks whether the result is same expected value.
func doTest(conn net.Conn, t *testing.T, test *Testpair, delay int) { func doTest(conn net.Conn, t *testing.T, test *Testpair, delay int) {
conn.Write(test.test) conn.Write(test.test)
//logger.Println("wrote", test.test)
buf := make([]byte, 256) buf := make([]byte, 256)
time.Sleep(time.Millisecond * time.Duration(delay)) time.Sleep(time.Millisecond * time.Duration(delay))
n, _ := conn.Read(buf) n, _ := conn.Read(buf)
//logger.Println("read", buffer.Bytes()) //logger.Println("read", string(buf[:n]))
if !bytes.Equal(test.expected, buf[:n]) { if !bytes.Equal(test.expected, buf[:n]) {
logger.Println("test:", string(test.test), "got:", string(buf[:n]), "expected:", string(test.expected)) logger.Println("test:", string(test.test), "got:", string(buf[:n]), "expected:", string(test.expected))
...@@ -148,6 +157,7 @@ func doTest(conn net.Conn, t *testing.T, test *Testpair, delay int) { ...@@ -148,6 +157,7 @@ func doTest(conn net.Conn, t *testing.T, test *Testpair, delay int) {
} }
} }
//perform set operation on multiple keys
func testPerformMultipleSet(t *testing.T, start int, times int) { func testPerformMultipleSet(t *testing.T, start int, times int) {
logger.Println("testPerformMultipleSet") logger.Println("testPerformMultipleSet")
if conn := getLeaderConn(t); conn != nil { if conn := getLeaderConn(t); conn != nil {
...@@ -161,6 +171,7 @@ func testPerformMultipleSet(t *testing.T, start int, times int) { ...@@ -161,6 +171,7 @@ func testPerformMultipleSet(t *testing.T, start int, times int) {
} }
} }
//perform cas operation on single key
func testPerformCas(t *testing.T) { func testPerformCas(t *testing.T) {
logger.Println("testPerformCas") logger.Println("testPerformCas")
if conn := getLeaderConn(t); conn != nil { if conn := getLeaderConn(t); conn != nil {
...@@ -172,6 +183,7 @@ func testPerformCas(t *testing.T) { ...@@ -172,6 +183,7 @@ func testPerformCas(t *testing.T) {
} }
} }
//perform cas operation on multiple keys
func testPerformMultipleCas(t *testing.T, end int) { func testPerformMultipleCas(t *testing.T, end int) {
logger.Println("testPerformMultipleCas") logger.Println("testPerformMultipleCas")
if conn := getLeaderConn(t); conn != nil { if conn := getLeaderConn(t); conn != nil {
...@@ -185,6 +197,7 @@ func testPerformMultipleCas(t *testing.T, end int) { ...@@ -185,6 +197,7 @@ func testPerformMultipleCas(t *testing.T, end int) {
} }
} }
//queries the data for multiple keys.
func testPerformMultipleGet(t *testing.T, end int) { func testPerformMultipleGet(t *testing.T, end int) {
logger.Println("testPerformMultipleGet") logger.Println("testPerformMultipleGet")
if conn := getLeaderConn(t); conn != nil { if conn := getLeaderConn(t); conn != nil {
...@@ -198,6 +211,7 @@ func testPerformMultipleGet(t *testing.T, end int) { ...@@ -198,6 +211,7 @@ func testPerformMultipleGet(t *testing.T, end int) {
} }
} }
//queries the meta data for multiple keys.
func testPerformMultipleGetm(t *testing.T, end int) { func testPerformMultipleGetm(t *testing.T, end int) {
logger.Println("testPerformMultipleGetm") logger.Println("testPerformMultipleGetm")
if conn := getLeaderConn(t); conn != nil { if conn := getLeaderConn(t); conn != nil {
...@@ -211,6 +225,7 @@ func testPerformMultipleGetm(t *testing.T, end int) { ...@@ -211,6 +225,7 @@ func testPerformMultipleGetm(t *testing.T, end int) {
} }
} }
//performs deletion of multiple keys
func testPerformMultipleDelete(t *testing.T, end int) { func testPerformMultipleDelete(t *testing.T, end int) {
logger.Println("testPerformMultipleDelete") logger.Println("testPerformMultipleDelete")
if conn := getLeaderConn(t); conn != nil { if conn := getLeaderConn(t); conn != nil {
...@@ -224,6 +239,8 @@ func testPerformMultipleDelete(t *testing.T, end int) { ...@@ -224,6 +239,8 @@ func testPerformMultipleDelete(t *testing.T, end int) {
} }
} }
//create multiple connections to the leader and calls go routines on those connections
//to run commands on the server.
func testConcurrent(t *testing.T, clients int, commands int) { func testConcurrent(t *testing.T, clients int, commands int) {
logger.Println("testConcurrent") logger.Println("testConcurrent")
ch := make(chan int) ch := make(chan int)
...@@ -238,11 +255,12 @@ func testConcurrent(t *testing.T, clients int, commands int) { ...@@ -238,11 +255,12 @@ func testConcurrent(t *testing.T, clients int, commands int) {
} }
num := 0 num := 0
for num < clients { for num < clients {
//logger.Println("got", num)
num += <-ch num += <-ch
} }
} }
//for each connection to the leader, all KV commands will be run
//the number of times each commmand is run is passed as an argument.
func testCommandsRoutine(conn net.Conn, t *testing.T, commands int, ch chan int, off int) { func testCommandsRoutine(conn net.Conn, t *testing.T, commands int, ch chan int, off int) {
logger.Println("testing", commands) logger.Println("testing", commands)
...@@ -277,3 +295,22 @@ func testCommandsRoutine(conn net.Conn, t *testing.T, commands int, ch chan int, ...@@ -277,3 +295,22 @@ func testCommandsRoutine(conn net.Conn, t *testing.T, commands int, ch chan int,
time.Sleep(time.Millisecond * POST_TEST_DELAY) time.Sleep(time.Millisecond * POST_TEST_DELAY)
ch <- 1 ch <- 1
} }
//this test kills the current leader and waits for some time to let
//a new leader be electe. Then it probes for the new leader. If the new
//leader is still same as before, then the test fails. Otherwise a simple get
//operation is performed to check if clients can still connect.
func testLeaderChange(t *testing.T) {
logger.Println("killing the leader")
temp := LeaderId
cmd := exec.Command("sh", "-c", "kill -9 $(netstat -ntlp|grep server|grep "+strconv.Itoa(20000+LeaderId)+"|awk '{print $7}'|cut -d/ -f1)")
cmd.Run()
//give time to change leader
time.Sleep(time.Second * 5)
probeLeader(t)
if LeaderId == temp {
t.Errorf("Leader did not change")
} else {
logger.Println("Leader changed to", LeaderId)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment