Commit 6bb021c1 authored by Sushant Mahajan's avatar Sushant Mahajan

merged code

parents 39e6edb7 9574763b
...@@ -49,6 +49,16 @@ type Data struct { ...@@ -49,6 +49,16 @@ type Data struct {
isPerpetual bool //specifies that the key does not expire isPerpetual bool //specifies that the key does not expire
} }
//get value
func (d *Data) GetVal() []byte {
return d.value
}
//get version
func (d *Data) GetVers() uint64 {
return d.version
}
//represents the main hashtable where the dance actually happens //represents the main hashtable where the dance actually happens
type KeyValueStore struct { type KeyValueStore struct {
dictionary map[string]*Data //the hashtable that stores the (key, value) pairs dictionary map[string]*Data //the hashtable that stores the (key, value) pairs
...@@ -61,6 +71,16 @@ var logger *log.Logger ...@@ -61,6 +71,16 @@ var logger *log.Logger
//cache //cache
var table *KeyValueStore var table *KeyValueStore
//function to get access to the keyvaluestore
func GetKeyValStr() *KeyValueStore {
return table
}
//access the dictionary
func (kvstr *KeyValueStore) GetDicKVstr() map[string]*Data {
return kvstr.dictionary
}
/*Simple write function to send information to the client /*Simple write function to send information to the client
*arguments: client connection, msg to send to the client *arguments: client connection, msg to send to the client
*return: none *return: none
......
...@@ -72,10 +72,17 @@ type Reply struct { ...@@ -72,10 +72,17 @@ type Reply struct {
type AppendEntries struct{} type AppendEntries struct{}
var cluster_config *ClusterConfig
func GetClusterConfig() *ClusterConfig {
return cluster_config
}
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, logger *log.Logger) (*Raft, error) { func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, logger *log.Logger) (*Raft, error) {
rft := new(Raft) rft := new(Raft)
rft.commitCh = commitCh rft.commitCh = commitCh
rft.clusterConfig = config rft.clusterConfig = config
cluster_config = config
rft.id = thisServerId rft.id = thisServerId
Info = logger Info = logger
lsn = 0 lsn = 0
...@@ -84,7 +91,6 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, lo ...@@ -84,7 +91,6 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, lo
func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData { func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData) entry := new(LogEntryData)
entry.Id = lsn entry.Id = lsn
entry.Data = data entry.Data = data
entry.conn = conn entry.conn = conn
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
package main package main
import ( import (
"bytes"
"connhandler" "connhandler"
"io/ioutil" "io/ioutil"
"log" "log"
...@@ -22,6 +23,40 @@ var rft *raft.Raft ...@@ -22,6 +23,40 @@ var rft *raft.Raft
type AppendEntries struct{} type AppendEntries struct{}
//encapsulate the return value of RPC //encapsulate the return value of RPC
//only for testing purpose
type Tester struct{}
type TestArgs struct {
key string
value []byte
version uint64
}
type TestReply struct {
replica_updated bool
}
//only for testing purpose
//this function checks for the key value in its kvstore and sets replica_updated true if present and false if absent
func (t *Tester) testerRPC(args *TestArgs, reply *TestReply) error {
table := raft.GetKeyValStr()
table.RLock()
defer table.RUnlock()
dic := table.GetDicKVstr()
if v, ok := dic[args.key]; ok {
the_val := v.GetVal()
the_vers := v.GetVers()
if bytes.Equal(the_val, args.value) && the_vers == args.version {
reply.replica_updated = true
return nil
} else {
return nil
}
} else {
return nil
}
}
type Reply struct { type Reply struct {
X int X int
} }
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"bytes" "bytes"
//"fmt" //"fmt"
"net" "net"
"net/rpc"
"os" "os"
"os/exec" "os/exec"
"raft" "raft"
...@@ -153,6 +154,75 @@ func testSet(t *testing.T) { ...@@ -153,6 +154,75 @@ func testSet(t *testing.T) {
} }
} }
//test that replication indeed occurred across all followers
//update the kvstore via leader
//but through special testing purpose functions the client should be able to
//check in the followers whether the update got replicated
//this is for testing purpose only and is not part of standard client - server interface
func testReplication(t *testing.T) {
//set value in the leader
leader_port := raft.CLIENT_PORT + 1
conn, err := net.Dial("tcp", ":"+strconv.Itoa(leader_port))
if err != nil {
t.Error("Error in connecting the leader at port: " + strconv.Itoa(leader_port))
} else {
time.Sleep(time.Millisecond)
sending := []byte("set replica 1000 3\r\nlul\r\n")
expecting := []byte("OK 1\r\n")
conn.Write(sending)
buffer := make([]byte, 1024)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
if !bytes.Equal(buffer[:n], expecting) {
t.Error(
"For", sending, string(sending),
"expected", expecting, string(expecting),
"got", buffer[:n], string(buffer[:n]),
)
}
conn.Close()
time.Sleep(time.Millisecond)
}
//test if the value got updated in the followers
presentChan := make(chan bool)
go monitorPresentChannel(presentChan, t)
for _, server := range raft.GetClusterConfig().Servers[1:] {
go func(presentChan chan bool) {
client, err := rpc.Dial("tcp", server.Hostname+":"+strconv.Itoa(server.LogPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(TestReply)
reply.replica_updated = false
args := new(TestArgs)
args.key = "replica"
args.value = []byte("lul")
args.version = 1
testerCall := client.Go("Tester.TesterRPC", args, reply, nil) //check if it is replicated
testerCall = <-testerCall.Done
presentChan <- reply.replica_updated
}(presentChan)
}
}
func monitorPresentChannel(presentChan chan bool, t *testing.T) {
countPresent := 0
var isPresent bool
for i := 1; i <= 4; i++ {
isPresent = <-presentChan
if isPresent {
countPresent++
}
}
if countPresent != 4 {
t.Error("The update didn't occur in all the followers")
}
}
// Add some dummy entries in raft.ClusterConfig such that majority is not achieved // Add some dummy entries in raft.ClusterConfig such that majority is not achieved
// Expected: Time out should occur after 5 sec and log entry table should not be updated // Expected: Time out should occur after 5 sec and log entry table should not be updated
func testCommandNotCommittedWithoutMajority() { func testCommandNotCommittedWithoutMajority() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment