Commit 2103549e authored by Bharath Radhakrishnan's avatar Bharath Radhakrishnan

Conflict resolved

parents dd919529 9574763b
......@@ -49,6 +49,16 @@ type Data struct {
isPerpetual bool //specifies that the key does not expire
}
//get value
func (d *Data) GetVal() []byte {
return d.value
}
//get version
func (d *Data) GetVers() uint64 {
return d.version
}
//represents the main hashtable where the dance actually happens
type KeyValueStore struct {
dictionary map[string]*Data //the hashtable that stores the (key, value) pairs
......@@ -61,6 +71,16 @@ var logger *log.Logger
//cache
var table *KeyValueStore
//function to get access to the keyvaluestore
func GetKeyValStr() *KeyValueStore {
return table
}
//access the dictionary
func (kvstr *KeyValueStore) GetDicKVstr() map[string]*Data {
return kvstr.dictionary
}
/*Simple write function to send information to the client
*arguments: client connection, msg to send to the client
*return: none
......
......@@ -81,6 +81,12 @@ type Reply struct {
// Structure for registering RPC methods
type AppendEntries struct{}
var cluster_config *ClusterConfig
func GetClusterConfig() *ClusterConfig {
return cluster_config
}
// Creates a raft object. This implements the SharedLog interface.
// commitCh is the channel that the kvstore waits on for committed messages.
// When the process starts, the local disk log is read and all committed
......@@ -89,6 +95,7 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, lo
rft := new(Raft)
rft.commitCh = commitCh
rft.clusterConfig = config
cluster_config = config
rft.id = thisServerId
Info = logger
lsn = 0
......@@ -100,7 +107,6 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, lo
// Returns the log entry
func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData)
entry.Id = lsn
entry.Data = data
entry.conn = conn
......
......@@ -2,6 +2,7 @@
package main
import (
"bytes"
"connhandler"
"io/ioutil"
"log"
......@@ -19,6 +20,40 @@ var rft *raft.Raft
type AppendEntries struct{}
//only for testing purpose
type Tester struct{}
type TestArgs struct {
key string
value []byte
version uint64
}
type TestReply struct {
replica_updated bool
}
//only for testing purpose
//this function checks for the key value in its kvstore and sets replica_updated true if present and false if absent
func (t *Tester) testerRPC(args *TestArgs, reply *TestReply) error {
table := raft.GetKeyValStr()
table.RLock()
defer table.RUnlock()
dic := table.GetDicKVstr()
if v, ok := dic[args.key]; ok {
the_val := v.GetVal()
the_vers := v.GetVers()
if bytes.Equal(the_val, args.value) && the_vers == args.version {
reply.replica_updated = true
return nil
} else {
return nil
}
} else {
return nil
}
}
type Reply struct {
X int
}
......
......@@ -6,6 +6,7 @@ import (
"bytes"
//"fmt"
"net"
"net/rpc"
"os"
"os/exec"
"raft"
......@@ -153,6 +154,75 @@ func testSet(t *testing.T) {
}
}
//test that replication indeed occurred across all followers
//update the kvstore via leader
//but through special testing purpose functions the client should be able to
//check in the followers whether the update got replicated
//this is for testing purpose only and is not part of standard client - server interface
func testReplication(t *testing.T) {
//set value in the leader
leader_port := raft.CLIENT_PORT + 1
conn, err := net.Dial("tcp", ":"+strconv.Itoa(leader_port))
if err != nil {
t.Error("Error in connecting the leader at port: " + strconv.Itoa(leader_port))
} else {
time.Sleep(time.Millisecond)
sending := []byte("set replica 1000 3\r\nlul\r\n")
expecting := []byte("OK 1\r\n")
conn.Write(sending)
buffer := make([]byte, 1024)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
if !bytes.Equal(buffer[:n], expecting) {
t.Error(
"For", sending, string(sending),
"expected", expecting, string(expecting),
"got", buffer[:n], string(buffer[:n]),
)
}
conn.Close()
time.Sleep(time.Millisecond)
}
//test if the value got updated in the followers
presentChan := make(chan bool)
go monitorPresentChannel(presentChan, t)
for _, server := range raft.GetClusterConfig().Servers[1:] {
go func(presentChan chan bool) {
client, err := rpc.Dial("tcp", server.Hostname+":"+strconv.Itoa(server.LogPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(TestReply)
reply.replica_updated = false
args := new(TestArgs)
args.key = "replica"
args.value = []byte("lul")
args.version = 1
testerCall := client.Go("Tester.TesterRPC", args, reply, nil) //check if it is replicated
testerCall = <-testerCall.Done
presentChan <- reply.replica_updated
}(presentChan)
}
}
func monitorPresentChannel(presentChan chan bool, t *testing.T) {
countPresent := 0
var isPresent bool
for i := 1; i <= 4; i++ {
isPresent = <-presentChan
if isPresent {
countPresent++
}
}
if countPresent != 4 {
t.Error("The update didn't occur in all the followers")
}
}
// Add some dummy entries in raft.ClusterConfig such that majority is not achieved
// Expected: Time out should occur after 5 sec and log entry table should not be updated
func testCommandNotCommittedWithoutMajority() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment