Commit 1eb4542d authored by Harshit Pande's avatar Harshit Pande

test for checking if the replication happenned across kvstores using test specific rpc calls

parent 88683f33
...@@ -48,6 +48,16 @@ type Data struct { ...@@ -48,6 +48,16 @@ type Data struct {
isPerpetual bool //specifies that the key does not expire isPerpetual bool //specifies that the key does not expire
} }
//get value
func (d *Data) GetVal() []byte {
return d.value
}
//get version
func (d *Data) GetVers() uint64 {
return d.version
}
//represents the main hashtable where the dance actually happens //represents the main hashtable where the dance actually happens
type KeyValueStore struct { type KeyValueStore struct {
dictionary map[string]*Data //the hashtable that stores the (key, value) pairs dictionary map[string]*Data //the hashtable that stores the (key, value) pairs
...@@ -60,6 +70,16 @@ var logger *log.Logger ...@@ -60,6 +70,16 @@ var logger *log.Logger
//cache //cache
var table *KeyValueStore var table *KeyValueStore
//function to get access to the keyvaluestore
func GetKeyValStr() *KeyValueStore {
return table
}
//access the dictionary
func (kvstr *KeyValueStore) GetDicKVstr() map[string]*Data {
return kvstr.dictionary
}
/*Simple write function to send information to the client /*Simple write function to send information to the client
*arguments: client connection, msg to send to the client *arguments: client connection, msg to send to the client
*return: none *return: none
......
...@@ -75,6 +75,10 @@ type AppendEntries struct{} ...@@ -75,6 +75,10 @@ type AppendEntries struct{}
var cluster_config *ClusterConfig var cluster_config *ClusterConfig
func GetClusterConfig() *ClusterConfig {
return cluster_config
}
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*Raft, error) { func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*Raft, error) {
rft := new(Raft) rft := new(Raft)
rft.commitCh = commitCh rft.commitCh = commitCh
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
package main package main
import ( import (
"bytes"
"connhandler" "connhandler"
"io/ioutil" "io/ioutil"
"log" "log"
...@@ -20,6 +21,40 @@ var DEBUG = true ...@@ -20,6 +21,40 @@ var DEBUG = true
type AppendEntries struct{} type AppendEntries struct{}
//only for testing purpose
type Tester struct{}
type TestArgs struct {
key string
value []byte
version uint64
}
type TestReply struct {
replica_updated bool
}
//only for testing purpose
//this function checks for the key value in its kvstore and sets replica_updated true if present and false if absent
func (t *Tester) testerRPC(args *TestArgs, reply *TestReply) error {
table := raft.GetKeyValStr()
table.RLock()
defer table.RUnlock()
dic := table.GetDicKVstr()
if v, ok := dic[args.key]; ok {
the_val := v.GetVal()
the_vers := v.GetVers()
if bytes.Equal(the_val, args.value) && the_vers == args.version {
reply.replica_updated = true
return nil
} else {
return nil
}
} else {
return nil
}
}
type Reply struct { type Reply struct {
X int X int
} }
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"net" "net"
"net/rpc"
"os" "os"
"os/exec" "os/exec"
"raft" "raft"
...@@ -118,6 +119,75 @@ func testNoReply(t *testing.T) { ...@@ -118,6 +119,75 @@ func testNoReply(t *testing.T) {
} }
} }
//test that replication indeed occurred across all followers
//update the kvstore via leader
//but through special testing purpose functions the client should be able to
//check in the followers whether the update got replicated
//this is for testing purpose only and is not part of standard client - server interface
func testReplication(t *testing.T) {
//set value in the leader
leader_port := raft.CLIENT_PORT + 1
conn, err := net.Dial("tcp", ":"+strconv.Itoa(leader_port))
if err != nil {
t.Error("Error in connecting the leader at port: " + strconv.Itoa(leader_port))
} else {
time.Sleep(time.Millisecond)
sending := []byte("set replica 1000 3\r\nlul\r\n")
expecting := []byte("OK 1\r\n")
conn.Write(sending)
buffer := make([]byte, 1024)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
if !bytes.Equal(buffer[:n], expecting) {
t.Error(
"For", sending, string(sending),
"expected", expecting, string(expecting),
"got", buffer[:n], string(buffer[:n]),
)
}
conn.Close()
time.Sleep(time.Millisecond)
}
//test if the value got updated in the followers
presentChan := make(chan bool)
go monitorPresentChannel(presentChan, t)
for _, server := range raft.GetClusterConfig().Servers[1:] {
go func(presentChan chan bool) {
client, err := rpc.Dial("tcp", server.Hostname+":"+strconv.Itoa(server.LogPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(TestReply)
reply.replica_updated = false
args := new(TestArgs)
args.key = "replica"
args.value = []byte("lul")
args.version = 1
testerCall := client.Go("Tester.TesterRPC", args, reply, nil) //check if it is replicated
testerCall = <-testerCall.Done
presentChan <- reply.replica_updated
}(presentChan)
}
}
func monitorPresentChannel(presentChan chan bool, t *testing.T) {
countPresent := 0
var isPresent bool
for i := 1; i <= 4; i++ {
isPresent = <-presentChan
if isPresent {
countPresent++
}
}
if countPresent != 4 {
t.Error("The update didn't occur in all the followers")
}
}
// Add some dummy entries in raft.ClusterConfig such that majority is not achieved // Add some dummy entries in raft.ClusterConfig such that majority is not achieved
// Expected: Time out should occur after 5 sec and log entry table should not be updated // Expected: Time out should occur after 5 sec and log entry table should not be updated
func CommandNotCommittedWithoutMajority() { func CommandNotCommittedWithoutMajority() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment