Commit 0a6c5d11 authored by Sushant Mahajan's avatar Sushant Mahajan

added concurrent test cases and support for [noreply] optional command attribute

parent 490c6b63
......@@ -35,7 +35,7 @@ const (
ERR_INTERNAL = "ERR_INTERNAL"
//constant
MAX_CMD_ARGS = 5
MAX_CMD_ARGS = 6
MIN_CMD_ARGS = 2
READ_TIMEOUT = 5
)
......@@ -58,7 +58,7 @@ func (d *Data) GetVal() []byte {
}
//get version
func (d *Data) GetVers() uint64 {
func (d *Data) GetVer() uint64 {
return d.version
}
......@@ -80,7 +80,7 @@ func GetKeyValStr() *KeyValueStore {
}
//access the dictionary
func (kvstr *KeyValueStore) GetDicKVstr() map[string]*Data {
func (kvstr *KeyValueStore) GetDictionary() map[string]*Data {
return kvstr.dictionary
}
......@@ -103,7 +103,7 @@ func write(conn net.Conn, msg string) {
func isValid(cmd string, tokens []string, conn net.Conn) int {
switch cmd {
case SET:
if len(tokens) != 4 {
if len(tokens) > 5 || len(tokens) < 4 {
logger.Println(cmd, ":Invalid no. of tokens")
write(conn, ERR_CMD_ERR)
return 1
......@@ -113,6 +113,11 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
write(conn, ERR_CMD_ERR)
return 1
}
if len(tokens) == 5 && tokens[4] != NOREPLY {
logger.Println(cmd, ":optional arg incorrect")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[2], 10, 64); err != nil {
logger.Println(cmd, ":expiry time invalid")
write(conn, ERR_CMD_ERR)
......@@ -149,7 +154,7 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
}
case CAS:
if len(tokens) != 5 {
if len(tokens) > 6 || len(tokens) < 5 {
logger.Println(cmd, ":Invalid number of tokens")
write(conn, ERR_CMD_ERR)
return 1
......@@ -159,6 +164,11 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
write(conn, ERR_CMD_ERR)
return 1
}
if len(tokens) == 6 && tokens[5] != NOREPLY {
logger.Println(cmd, ":optional arg incorrect")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[2], 10, 64); err != nil {
logger.Println(cmd, ":expiry time invalid")
write(conn, ERR_CMD_ERR)
......@@ -368,6 +378,10 @@ func performSet(tokens []string, cmd *utils.Command) (uint64, bool, bool) {
n, _ := strconv.ParseUint(tokens[2], 10, 64)
r := true
if len(tokens) == 4 && tokens[3] == NOREPLY {
r = false
}
logger.Println(r)
defer table.Unlock()
......@@ -458,6 +472,9 @@ func performCas(tokens []string, cmd *utils.Command) (uint64, int, bool) {
n, _ := strconv.ParseUint(tokens[3], 10, 64)
r := true
if len(tokens) == 5 && tokens[4] == NOREPLY {
r = false
}
logger.Println(k, e, ve, n, r)
defer table.Unlock()
......
......@@ -157,13 +157,13 @@ type CommitData struct {
}
//persists log to the disk for later retrieval
func (rft *Raft) persistLog() {
if file, err := os.OpenFile(LOG_PERSIST+strconv.Itoa(rft.id), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666); err != nil {
func (rft *Raft) persistLog(entries []*LogEntryData) {
if file, err := os.OpenFile(LOG_PERSIST+strconv.Itoa(rft.id), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666); err != nil {
rft.Info.Println("error opening log persist file", err.Error())
} else {
defer file.Close()
enc := json.NewEncoder(file)
for _, e := range rft.LogArray {
for _, e := range entries {
if err := enc.Encode(e); err != nil {
rft.Info.Println("error persisting log entry", err.Error())
}
......@@ -572,7 +572,7 @@ func (rft *Raft) follower() int {
temp := &AppendReply{rft.currentTerm, reply, rft.id, len(rft.LogArray)}
rft.appendReplyCh <- temp
if reply {
rft.persistLog()
rft.persistLog(req.Entries)
}
rft.Info.Println("[F]: log is size", len(rft.LogArray))
}
......@@ -712,7 +712,7 @@ func (rft *Raft) leader() int {
entry := event.(*ClientAppend).logEntry
rft.LogArray = append(rft.LogArray, entry)
//will now be send to kvstore which'll decode and reply
rft.persistLog()
rft.persistLog([]*LogEntryData{entry})
case *AppendRPC:
......
......@@ -20,16 +20,9 @@ var Info *log.Logger
//global raft object for each server instance
var rft *raft.Raft
//Receiver for RPC
//Receiver for all raft related RPCs
type RaftRPCService struct{}
//Receiver for voting related RPC
//type Voting struct{}
//receiver for testing RPC
//only for testing purpose
type Tester struct{}
//RPC argument for testing the replication of keys value version across key value stores
type TestArgs struct {
key string
......@@ -39,23 +32,23 @@ type TestArgs struct {
// RPC argument with boolean value in the reply to confirm that indeed the replication went through across servers
type TestReply struct {
replica_updated bool
isUpdated bool
}
//only for testing purpose
//this function checks for the key value in its kvstore and sets reply.replica_updated true if present and false if absent
//this function checks for the key value in its kvstore and sets reply.isUpdated true if present and false if absent
//arguments: args contains the key, value, version to be matched
//reply is the reply to be sent
func (t *Tester) testerRPC(args *TestArgs, reply *TestReply) error {
func (t *RaftRPCService) testerRPC(args *TestArgs, reply *TestReply) error {
table := raft.GetKeyValStr()
table.RLock()
defer table.RUnlock()
dic := table.GetDicKVstr()
dic := table.GetDictionary()
if v, ok := dic[args.key]; ok {
the_val := v.GetVal()
the_vers := v.GetVers()
if bytes.Equal(the_val, args.value) && the_vers == args.version {
reply.replica_updated = true
val := v.GetVal()
ver := v.GetVer()
if bytes.Equal(val, args.value) && ver == args.version {
reply.isUpdated = true
return nil
} else {
return nil
......@@ -69,10 +62,10 @@ type Reply struct {
X int
}
//RPC for follower server. To let followers know that they can append their logs
//arguments: pointer to argument struct (has LogEntryData), pointer to reply struct
//RPC called by leader server. To let followers know that they can append their logs
//arguments: pointer to argument struct (has AppendRPC), pointer to reply struct AppendReply
//returns: error
//receiver: pointer to AppendEntries
//receiver: pointer to RaftRPCService
func (t *RaftRPCService) AppendRPC(args *raft.AppendRPC, reply *raft.AppendReply) error {
Info.Println("append RPC invoked")
rft.AddToEventChannel(args)
......@@ -84,21 +77,10 @@ func (t *RaftRPCService) AppendRPC(args *raft.AppendRPC, reply *raft.AppendReply
return nil
}
//func (t *RaftRPCService) AppendReplyRPC(args *raft.AppendReply, reply *Reply) error {
// Info.Println("append reply to leader RPC invoked")
// rft.AddToEventChannel(args)
// reply.X = 1
// return nil
// /*Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
// rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
// reply.X = 1
// return nil*/
//}
//RPC for follower server. To let followers know that and entry can be committed.
//arguments: pointer to argument struct (has LogEntry), pointer to reply struct
//returns: error
//receiver: pointer to AppendEntries
//receiver: pointer to RaftRPCService
func (t *RaftRPCService) CommitRPC(args *raft.CommitData, reply *Reply) error {
Info.Println("Commit RPC invoked")
rft.LogArray[(*args).Id].SetCommitted(true)
......@@ -107,6 +89,10 @@ func (t *RaftRPCService) CommitRPC(args *raft.CommitData, reply *Reply) error {
return nil
}
//RPC called by candidate server. To ask the follower for votes.
//arguments: pointer to argument struct (has VoteRequest), pointer to reply struct VoteRequestReply
//returns: error
//receiver: pointer to RaftRPCService
func (t *RaftRPCService) VoteRequestRPC(args *raft.VoteRequest, reply *raft.VoteRequestReply) error {
Info.Println("Request Vote RPC received")
rft.AddToEventChannel(args)
......@@ -116,13 +102,6 @@ func (t *RaftRPCService) VoteRequestRPC(args *raft.VoteRequest, reply *raft.Vote
return nil
}
//func (t *RaftRPCService) CastVoteRPC(args *raft.VoteRequestReply, reply *Reply) error {
// Info.Println("Cast Vote RPC received")
// rft.AddToMonitorVotesChannel(args)
// reply.X = 1
// return nil
//}
//Initialize all the things necessary for start the server for inter raft communication.
//The servers are running on ports 20000+serverId. {0..4}
//arguments: pointer to current server config, pointer to raft object, a bool channel to set to true to let
......
......@@ -16,7 +16,10 @@ import (
//constant values used
const (
NUM_SERVERS int = 5
NUM_SERVERS int = 5
NORM_DELAY = 15
CONC_DELAY = 30
POST_TEST_DELAY = 5
)
type Testpair struct {
......@@ -45,6 +48,7 @@ func TestAll(t *testing.T) {
testPerformClientConnect(t)
testCommands(t)
testConcurrent(t, 10, 10)
killServers()
}
......@@ -71,6 +75,7 @@ func startServers(i int, t *testing.T) {
//check which server is the leader
func probeLeader(t *testing.T) (int, error) {
logger.Println("Probing leader")
if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000)); err != nil {
t.Errorf("Could not connect")
return -1, err
......@@ -93,6 +98,7 @@ func probeLeader(t *testing.T) (int, error) {
//returns a connection to the leader server
func getLeaderConn(t *testing.T) net.Conn {
logger.Println("Getting connection to leader")
if conn, err := net.Dial("tcp", ":"+strconv.Itoa(9000+LeaderId)); err != nil {
t.Errorf("Could not connect")
return nil
......@@ -108,6 +114,7 @@ func initTestLogger() {
}
func testPerformClientConnect(t *testing.T) {
logger.Println("testPerformClientConnect")
id, _ := probeLeader(t)
if id == -1 {
t.Errorf("Could not connect")
......@@ -118,6 +125,7 @@ func testPerformClientConnect(t *testing.T) {
}
func testCommands(t *testing.T) {
logger.Println("testCommands")
testPerformMultipleSet(t, getPrefix(), 1) //check single set
testPerformMultipleSet(t, getPrefix(), 200)
testPerformCas(t)
......@@ -127,11 +135,12 @@ func testCommands(t *testing.T) {
testPerformMultipleDelete(t, 100)
}
func doTest(conn net.Conn, t *testing.T, test *Testpair) {
func doTest(conn net.Conn, t *testing.T, test *Testpair, delay int) {
conn.Write(test.test)
buf := make([]byte, 1024)
time.Sleep(time.Millisecond * 20)
buf := make([]byte, 256)
time.Sleep(time.Millisecond * time.Duration(delay))
n, _ := conn.Read(buf)
//logger.Println("read", buffer.Bytes())
if !bytes.Equal(test.expected, buf[:n]) {
logger.Println("test:", string(test.test), "got:", string(buf[:n]), "expected:", string(test.expected))
......@@ -140,11 +149,12 @@ func doTest(conn net.Conn, t *testing.T, test *Testpair) {
}
func testPerformMultipleSet(t *testing.T, start int, times int) {
logger.Println("testPerformMultipleSet")
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := start; i < start+times; i++ {
test := &Testpair{[]byte("set mykey" + strconv.Itoa(i) + " 0 3\r\nlul\r\n"), []byte("OK 1\r\n")}
doTest(conn, t, test)
doTest(conn, t, test, NORM_DELAY)
}
} else {
t.Errorf("could not get leader connection")
......@@ -152,21 +162,23 @@ func testPerformMultipleSet(t *testing.T, start int, times int) {
}
func testPerformCas(t *testing.T) {
logger.Println("testPerformCas")
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
test := &Testpair{[]byte("cas mykey1 1000 1 3\r\nlul\r\n"), []byte("OK 2\r\n")}
doTest(conn, t, test)
doTest(conn, t, test, NORM_DELAY)
} else {
t.Errorf("could not get leader connection")
}
}
func testPerformMultipleCas(t *testing.T, end int) {
logger.Println("testPerformMultipleCas")
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := 0; i < end; i++ {
test := &Testpair{[]byte("cas mykey2 1000 " + strconv.Itoa(i+1) + " 3\r\nlul\r\n"), []byte("OK " + strconv.Itoa(i+2) + "\r\n")}
doTest(conn, t, test)
doTest(conn, t, test, NORM_DELAY)
}
} else {
t.Errorf("could not get leader connection")
......@@ -174,11 +186,12 @@ func testPerformMultipleCas(t *testing.T, end int) {
}
func testPerformMultipleGet(t *testing.T, end int) {
logger.Println("testPerformMultipleGet")
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := 0; i < end; i++ {
test := &Testpair{[]byte("get mykey3\r\n"), []byte("VALUE 3\r\nlul\r\n")}
doTest(conn, t, test)
doTest(conn, t, test, NORM_DELAY)
}
} else {
t.Errorf("could not get leader connection")
......@@ -186,11 +199,12 @@ func testPerformMultipleGet(t *testing.T, end int) {
}
func testPerformMultipleGetm(t *testing.T, end int) {
logger.Println("testPerformMultipleGetm")
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := 0; i < end; i++ {
test := &Testpair{[]byte("getm mykey4\r\n"), []byte("VALUE 1 0 3\r\nlul\r\n")}
doTest(conn, t, test)
doTest(conn, t, test, NORM_DELAY)
}
} else {
t.Errorf("could not get leader connection")
......@@ -198,13 +212,68 @@ func testPerformMultipleGetm(t *testing.T, end int) {
}
func testPerformMultipleDelete(t *testing.T, end int) {
logger.Println("testPerformMultipleDelete")
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
for i := 0; i < end; i++ {
test := &Testpair{[]byte("delete mykey" + strconv.Itoa(i+1) + "\r\n"), []byte("DELETED\r\n")}
doTest(conn, t, test)
doTest(conn, t, test, NORM_DELAY)
}
} else {
t.Errorf("could not get leader connection")
}
}
func testConcurrent(t *testing.T, clients int, commands int) {
logger.Println("testConcurrent")
ch := make(chan int)
for c := 0; c < clients; c++ {
if conn := getLeaderConn(t); conn != nil {
defer conn.Close()
logger.Println("starting routine")
go testCommandsRoutine(conn, t, commands, ch, c+1500)
} else {
t.Errorf("could not get leader connection")
}
}
num := 0
for num < clients {
//logger.Println("got", num)
num += <-ch
}
}
func testCommandsRoutine(conn net.Conn, t *testing.T, commands int, ch chan int, off int) {
logger.Println("testing", commands)
for i := 0; i < commands; i++ {
test := &Testpair{[]byte("set mykey" + strconv.Itoa(off) + " 0 9\r\nsome data\r\n"), []byte("OK " + strconv.Itoa(i+1) + "\r\n")}
doTest(conn, t, test, CONC_DELAY)
time.Sleep(time.Millisecond * POST_TEST_DELAY)
}
for i := 0; i < commands; i++ {
test := &Testpair{[]byte("get mykey" + strconv.Itoa(off) + "\r\n"), []byte("VALUE 9\r\nsome data\r\n")}
doTest(conn, t, test, CONC_DELAY)
time.Sleep(time.Millisecond * POST_TEST_DELAY)
}
for i := 0; i < commands; i++ {
test := &Testpair{[]byte("getm mykey" + strconv.Itoa(off) + "\r\n"), []byte("VALUE " + strconv.Itoa(commands) + " 0 9\r\nsome data\r\n")}
doTest(conn, t, test, CONC_DELAY)
time.Sleep(time.Millisecond * POST_TEST_DELAY)
}
for i := 0; i < commands; i++ {
test := &Testpair{
[]byte("cas mykey" + strconv.Itoa(off) + " 1000 " + strconv.Itoa(commands+i) + " 9\r\nsome data\r\n"),
[]byte("OK " + strconv.Itoa(commands+i+1) + "\r\n")}
doTest(conn, t, test, CONC_DELAY)
time.Sleep(time.Millisecond * POST_TEST_DELAY)
}
test := &Testpair{[]byte("delete mykey" + strconv.Itoa(off) + "\r\n"), []byte("DELETED\r\n")}
doTest(conn, t, test, CONC_DELAY)
time.Sleep(time.Millisecond * POST_TEST_DELAY)
ch <- 1
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment