Commit e0747868 authored by Sushant Mahajan's avatar Sushant Mahajan

begun test cases

parent dd6e640a
#! /bin/bash #! /bin/bash
rm {0..4} currentTerm* votedFor* log* rm {0..4} currentTerm* votedFor* log* test.log
...@@ -128,6 +128,8 @@ func Write(conn net.Conn, msg string) { ...@@ -128,6 +128,8 @@ func Write(conn net.Conn, msg string) {
func HandleClient(conn net.Conn, rft *raft.Raft, logger *log.Logger) { func HandleClient(conn net.Conn, rft *raft.Raft, logger *log.Logger) {
defer conn.Close() defer conn.Close()
//channel for every connection for every client //channel for every connection for every client
rft.Info.Println("new client connection")
ch := make(chan []byte) ch := make(chan []byte)
go MyRead(ch, conn) go MyRead(ch, conn)
...@@ -173,7 +175,7 @@ func HandleClient(conn net.Conn, rft *raft.Raft, logger *log.Logger) { ...@@ -173,7 +175,7 @@ func HandleClient(conn net.Conn, rft *raft.Raft, logger *log.Logger) {
} }
if _, err := rft.Append(buffer.Bytes(), conn); err != nil { if _, err := rft.Append(buffer.Bytes(), conn); err != nil {
Write(conn, "ERR_REDIRECT 127.0.0.1 "+strconv.Itoa(raft.CLIENT_PORT+1)) Write(conn, "ERR_REDIRECT "+strconv.Itoa(rft.LeaderId))
conn.Close() conn.Close()
break break
} }
......
...@@ -201,6 +201,7 @@ func isValid(cmd string, tokens []string, conn net.Conn) int { ...@@ -201,6 +201,7 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
func MonitorCommitChannel(ch chan LogEntry) { func MonitorCommitChannel(ch chan LogEntry) {
for { for {
temp := <-ch temp := <-ch
logger.Println("got entry to run")
var conn net.Conn var conn net.Conn
if isLeader { if isLeader {
conn = temp.(*LogEntryData).conn conn = temp.(*LogEntryData).conn
...@@ -359,6 +360,7 @@ func ParseInput(conn net.Conn, cmd *utils.Command) { ...@@ -359,6 +360,7 @@ func ParseInput(conn net.Conn, cmd *utils.Command) {
*return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client *return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client
*/ */
func performSet(tokens []string, cmd *utils.Command) (uint64, bool, bool) { func performSet(tokens []string, cmd *utils.Command) (uint64, bool, bool) {
logger.Println("performSet called")
k := tokens[0] k := tokens[0]
//expiry time offset //expiry time offset
e, _ := strconv.ParseUint(tokens[1], 10, 64) e, _ := strconv.ParseUint(tokens[1], 10, 64)
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"net" "net"
"net/rpc" "net/rpc"
"os" "os"
"reflect"
"strconv" "strconv"
"sync" "sync"
"time" "time"
...@@ -125,10 +126,11 @@ type Raft struct { ...@@ -125,10 +126,11 @@ type Raft struct {
voteReplyCh chan RaftEvent voteReplyCh chan RaftEvent
appendReplyCh chan RaftEvent appendReplyCh chan RaftEvent
et *time.Timer et *time.Timer
isLeader bool IsLeader bool
lastApplied int lastApplied int
nextIndex []int nextIndex []int
matchIndex []int matchIndex []int
LeaderId int
} }
// Log entry interface // Log entry interface
...@@ -248,9 +250,11 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, In ...@@ -248,9 +250,11 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, In
rft.appendReplyCh = make(chan RaftEvent) rft.appendReplyCh = make(chan RaftEvent)
rft.voteReplyCh = make(chan RaftEvent) rft.voteReplyCh = make(chan RaftEvent)
getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file. getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file.
rft.isLeader = false rft.IsLeader = false
rft.nextIndex = make([]int, len(config.Servers)) rft.nextIndex = make([]int, len(config.Servers))
rft.matchIndex = make([]int, len(config.Servers)) rft.matchIndex = make([]int, len(config.Servers))
rft.commitIndex = -1
rft.lastApplied = -1
return rft, nil return rft, nil
} }
...@@ -291,14 +295,11 @@ func (entry *LogEntryData) SetCommitted(committed bool) { ...@@ -291,14 +295,11 @@ func (entry *LogEntryData) SetCommitted(committed bool) {
//make raft implement the append function //make raft implement the append function
func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) { func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
rft.Info.Println("Append Called") rft.Info.Println("Append Called")
if !rft.isLeader { if !rft.IsLeader {
return nil, ErrRedirect(1) return nil, ErrRedirect(rft.LeaderId)
} }
defer rft.Unlock()
rft.Lock()
temp := rft.NewLogEntry(data, false, conn) temp := rft.NewLogEntry(data, false, conn)
rft.AddToEventChannel(temp) rft.AddToEventChannel(&ClientAppend{temp})
return temp, nil return temp, nil
} }
...@@ -310,7 +311,7 @@ func (rft *Raft) AddToChannel(entry LogEntry) { ...@@ -310,7 +311,7 @@ func (rft *Raft) AddToChannel(entry LogEntry) {
//AddToEventChannel //AddToEventChannel
func (rft *Raft) AddToEventChannel(entry RaftEvent) { func (rft *Raft) AddToEventChannel(entry RaftEvent) {
rft.Info.Println("Adding to event channel", entry) rft.Info.Println("Adding to event channel", entry, reflect.TypeOf(entry))
rft.eventCh <- entry rft.eventCh <- entry
} }
...@@ -385,9 +386,9 @@ func (rft *Raft) handleMajority(reply *VoteRequestReply) { ...@@ -385,9 +386,9 @@ func (rft *Raft) handleMajority(reply *VoteRequestReply) {
if reply.Reply { if reply.Reply {
rft.voters++ rft.voters++
rft.Info.Println("[C]: count", rft.voters) rft.Info.Println("[C]: count", rft.voters)
if !rft.isLeader && rft.voters > majority { if !rft.IsLeader && rft.voters > majority {
rft.shiftStatusCh <- LEADER rft.shiftStatusCh <- LEADER
rft.isLeader = true rft.IsLeader = true
} }
} else { } else {
if rft.currentTerm < reply.CurrentTerm { if rft.currentTerm < reply.CurrentTerm {
...@@ -520,6 +521,7 @@ func (rft *Raft) follower() int { ...@@ -520,6 +521,7 @@ func (rft *Raft) follower() int {
temp := &AppendReply{-1, true, -1, -1} temp := &AppendReply{-1, true, -1, -1}
rft.Info.Println("[F]: sending dummy reply to " + strconv.Itoa(req.LeaderId)) rft.Info.Println("[F]: sending dummy reply to " + strconv.Itoa(req.LeaderId))
rft.appendReplyCh <- temp rft.appendReplyCh <- temp
rft.LeaderId = req.LeaderId
continue continue
} }
...@@ -653,18 +655,19 @@ func enforceLog(rft *Raft) { ...@@ -653,18 +655,19 @@ func enforceLog(rft *Raft) {
req.LeaderCommit = rft.commitIndex req.LeaderCommit = rft.commitIndex
req.Entries = rft.LogArray[rft.nextIndex[server.Id]:len(rft.LogArray)] req.Entries = rft.LogArray[rft.nextIndex[server.Id]:len(rft.LogArray)]
req.PrevLogIndex = rft.nextIndex[server.Id] - 1 req.PrevLogIndex = rft.nextIndex[server.Id] - 1
if req.PrevLogIndex <= 0 { if req.PrevLogIndex == -1 {
req.PrevLogTerm = LOG_INVALID_TERM req.PrevLogTerm = LOG_INVALID_TERM
} else { } else {
req.PrevLogTerm = rft.LogArray[rft.nextIndex[server.Id]-1].Term req.PrevLogTerm = rft.LogArray[rft.nextIndex[server.Id]-1].Term
} }
//appendRPC call //appendRPC call
rft.Info.Println("[L]: XX append rpc enforce", req)
doAppendRPCCall(server.Hostname, server.LogPort, req, rft) doAppendRPCCall(server.Hostname, server.LogPort, req, rft)
rft.Info.Println("[L]: Sent append entries", strconv.Itoa(server.Id)) rft.Info.Println("[L]: Sent append entries", strconv.Itoa(server.Id))
} }
time.Sleep(time.Millisecond * 2)
} }
time.Sleep(time.Millisecond * 20)
} }
} }
...@@ -699,6 +702,7 @@ func (rft *Raft) leader() int { ...@@ -699,6 +702,7 @@ func (rft *Raft) leader() int {
heartbeat.Reset(time.Millisecond * HEARTBEAT_TIMEOUT) heartbeat.Reset(time.Millisecond * HEARTBEAT_TIMEOUT)
case event := <-rft.eventCh: case event := <-rft.eventCh:
rft.Info.Println("[L]: got event", event, reflect.TypeOf(event))
switch event.(type) { switch event.(type) {
case *ClientAppend: case *ClientAppend:
//write data to log //write data to log
...@@ -718,11 +722,14 @@ func (rft *Raft) leader() int { ...@@ -718,11 +722,14 @@ func (rft *Raft) leader() int {
} }
func (rft *Raft) MonitorStateMachine() { func (rft *Raft) MonitorStateMachine() {
rft.Info.Println("MonitorStateMachine initialized")
for { for {
rft.Info.Println("[L]: C,L", rft.commitIndex, rft.lastApplied)
if rft.commitIndex > rft.lastApplied { if rft.commitIndex > rft.lastApplied {
rft.lastApplied++ rft.lastApplied++
rft.Info.Println("data put on commit channel")
rft.commitCh <- rft.LogArray[rft.lastApplied] rft.commitCh <- rft.LogArray[rft.lastApplied]
} }
time.Sleep(time.Second * 1) time.Sleep(time.Second)
} }
} }
...@@ -3,10 +3,13 @@ ...@@ -3,10 +3,13 @@
package main package main
import ( import (
//"fmt" "bytes"
"log"
"net"
"os" "os"
"os/exec" "os/exec"
"strconv" "strconv"
"strings"
"testing" "testing"
"time" "time"
) )
...@@ -21,24 +24,36 @@ type Testpair struct { ...@@ -21,24 +24,36 @@ type Testpair struct {
from_server []byte from_server []byte
} }
var LeaderId int
var Info *log.Logger
// //
func TestAll(t *testing.T) { func TestAll(t *testing.T) {
dummy := make(chan bool)
//start the servers //start the servers
initLogger()
for i := 0; i < NUM_SERVERS; i++ { for i := 0; i < NUM_SERVERS; i++ {
go startServers(i, t, dummy) go startServers(i, t)
} }
//wait for some time so that servers are ready //wait for some time so that servers are ready
time.Sleep(1 * time.Second) time.Sleep(8 * time.Second)
testClientAppend() testPerformClientConnect(t)
if <-dummy { testPerformSet(t, 1)
testPerformSet(t, 2)
testPerformSet(t, 3)
testPerformSet(t, 4)
//testKillLeader(t)
killServers()
}
} func killServers() {
Info.Println("killing servers")
cmd := exec.Command("sh", "-c", "for i in `netstat -ntlp|grep server|awk '{print $7}'`; do kill -9 ${i%%/*}; done")
cmd.Run()
} }
//run servers //run servers
func startServers(i int, t *testing.T, dummy chan bool) { func startServers(i int, t *testing.T) {
cmd := exec.Command("go", "run", "server.go", strconv.Itoa(i), strconv.Itoa(NUM_SERVERS), "x") cmd := exec.Command("go", "run", "server.go", strconv.Itoa(i), strconv.Itoa(NUM_SERVERS), "x")
f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil { if err != nil {
...@@ -51,6 +66,68 @@ func startServers(i int, t *testing.T, dummy chan bool) { ...@@ -51,6 +66,68 @@ func startServers(i int, t *testing.T, dummy chan bool) {
cmd.Run() cmd.Run()
} }
func testClientAppend() { func probeLeader(t *testing.T) (int, error) {
if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000)); err != nil {
t.Errorf("Could not connect")
return -1, err
} else {
sending := []byte("set probe 100 3\r\nlul\r\n")
conn.Write(sending)
buffer := make([]byte, 1024)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
str := string(buffer[:n])
if strings.Contains(str, "ERR_REDIRECT") {
str = strings.TrimSpace(str)
id, _ := strconv.Atoi(strings.Fields(str)[1])
LeaderId = id
return id, nil
}
return 0, nil
}
}
func getLeaderConn(t *testing.T) net.Conn {
if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000+LeaderId)); err != nil {
t.Errorf("Could not connect")
return nil
} else {
return conn
}
}
func initLogger() {
// Logger Initializaion
f, _ := os.OpenFile("test.log", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
Info = log.New(f, "INFO: ", log.Ldate|log.Lmicroseconds|log.Lshortfile)
}
func testPerformClientConnect(t *testing.T) {
id, _ := probeLeader(t)
if id == -1 {
t.Errorf("Could not connect")
} else if id < 0 || id > 4 {
t.Errorf("Invalid leader id")
}
Info.Println("Leader Id:", id)
}
func testPerformSet(t *testing.T, i int) {
if conn := getLeaderConn(t); conn != nil {
sending := []byte("set mykey" + strconv.Itoa(i) + " 100 3\r\nlul\r\n")
conn.Write(sending)
buffer := make([]byte, 1024)
time.Sleep(time.Millisecond * 50)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
str := string(buffer[:n])
if strings.TrimSpace(str) != "OK "+strconv.Itoa(1) {
t.Errorf("invalid reply received", str)
} else {
Info.Println(str)
}
conn.Close()
} else {
t.Errorf("could not get leader connection")
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment