Commit 9574763b authored by Harshit Pande's avatar Harshit Pande

fixed conflicts in raft.go with cluster configuration

parents 1eb4542d aafa7ab8
......@@ -2,6 +2,7 @@ package raft
import (
"bytes"
"encoding/gob"
"log"
"net"
"strconv"
......@@ -193,10 +194,14 @@ func MonitorCommitChannel(ch chan LogEntry) {
temp := <-ch
conn := temp.(*LogEntryData).conn
cmd := new(utils.Command)
if err := cmd.GobDecode(temp.Data()); err != nil {
log.Fatal("Error decoding command!")
buffer := bytes.NewBuffer(temp.GetData())
dec := gob.NewDecoder(buffer)
if err := dec.Decode(cmd); err != nil {
log.Fatal("Error decoding command!", err)
}
ParseInput(conn, cmd)
//Debug()
}
}
......@@ -498,7 +503,7 @@ func performDelete(conn net.Conn, tokens []string) int {
*arguments: none
*return: none
*/
func debug() {
func Debug() {
logger.Println("----start debug----")
for key, val := range (*table).dictionary {
logger.Println(key, val)
......
......@@ -19,6 +19,8 @@ const (
// Logger
var Info *log.Logger
var lsn Lsn
// Flag for enabling/disabling logging functionality
var DEBUG = true
......@@ -43,30 +45,27 @@ type SharedLog interface {
}
type Raft struct {
log_array []*LogEntryData
commitCh chan LogEntry
cluster_config *ClusterConfig //cluster
id int //this server id
LogArray []*LogEntryData
commitCh chan LogEntry
clusterConfig *ClusterConfig //cluster
id int //this server id
sync.RWMutex
}
type LogEntry interface {
Lsn() Lsn
Data() []byte
Committed() bool
GetLsn() Lsn
GetData() []byte
GetCommitted() bool
SetCommitted(status bool)
}
type LogEntryData struct {
id Lsn
data []byte
committed bool
Id Lsn
Data []byte
Committed bool
conn net.Conn
}
type Args struct {
X int
}
type Reply struct {
X int
}
......@@ -79,18 +78,31 @@ func GetClusterConfig() *ClusterConfig {
return cluster_config
}
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*Raft, error) {
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, logger *log.Logger) (*Raft, error) {
rft := new(Raft)
rft.commitCh = commitCh
rft.cluster_config = config
rft.clusterConfig = config
cluster_config = config
rft.id = thisServerId
Info = logger
lsn = 0
return rft, nil
}
func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData)
entry.Id = lsn
entry.Data = data
entry.conn = conn
entry.Committed = committed
lsn++
return entry
}
//goroutine that monitors channel to check if the majority of servers have replied
func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) {
acks_received := 0
num_servers := len(rft.cluster_config.Servers)
num_servers := len(rft.clusterConfig.Servers)
required_acks := num_servers / 2
up := make(chan bool, 1)
err := false
......@@ -103,9 +115,12 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
for {
select {
case temp := <-ack_ch:
Info.Println("Ack Received:", temp)
acks_received += temp
if acks_received == required_acks {
rft.log_array[log_entry.(*LogEntryData).id].committed = true
Info.Println("Majority Achieved", log_entry.(*LogEntryData).Id)
rft.LogArray[log_entry.(*LogEntryData).Id].Committed = true
//Info.Println(rft.LogArray)
rft.commitCh <- log_entry
majCh <- true
err = true
......@@ -113,6 +128,7 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
}
case <-up:
Info.Println("Error")
err = true
break
}
......@@ -122,47 +138,56 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
}
}
//make LogEntryData implement the
func (entry *LogEntryData) Lsn() Lsn {
return entry.id
//make LogEntryData implement the LogEntry Interface
func (entry *LogEntryData) GetLsn() Lsn {
return entry.Id
}
func (entry *LogEntryData) Data() []byte {
return entry.data
func (entry *LogEntryData) GetData() []byte {
return entry.Data
}
func (entry *LogEntryData) Committed() bool {
return entry.committed
func (entry *LogEntryData) GetCommitted() bool {
return entry.Committed
}
func (entry *LogEntryData) SetCommitted(committed bool) {
entry.Committed = committed
}
//make rpc call to followers
func doRPCCall(ackChan chan int, hostname string, logPort int, temp *LogEntryData) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("RPC Called", logPort)
appendCall := client.Go("AppendEntries.AppendEntriesRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
Info.Println("Reply", appendCall, reply.X)
ackChan <- reply.X
}
//make raft implement the append function
func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
Info.Println("Append Called")
if rft.id != 1 {
return nil, ErrRedirect(1)
}
temp := new(LogEntryData)
temp.id = 1
temp.committed = false
temp.data = data
temp.conn = conn
rft.log_array = append(rft.log_array, temp)
defer rft.Unlock()
rft.Lock()
temp := NewLogEntry(data, false, conn)
rft.LogArray = append(rft.LogArray, temp)
ackChan := make(chan int)
majChan := make(chan bool)
go monitorAckChannel(rft, ackChan, temp, majChan)
for _, server := range cluster_config.Servers[1:] {
go func(ackChan chan int) {
client, err := rpc.Dial("tcp", server.Hostname+":"+strconv.Itoa(server.LogPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
appendCall := client.Go("AppendEntries.AppendEntriesRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
ackChan <- reply.X
}(ackChan)
for _, server := range rft.clusterConfig.Servers[1:] {
doRPCCall(ackChan, server.Hostname, server.LogPort, temp)
}
if <-majChan {
......
......@@ -16,8 +16,7 @@ import (
// Logger
var Info *log.Logger
// Flag for enabling/disabling logging functionality
var DEBUG = true
var rft *raft.Raft
type AppendEntries struct{}
......@@ -59,8 +58,17 @@ type Reply struct {
X int
}
func (t *AppendEntries) AppendEntriesRPC(args *raft.LogEntry, reply *Reply) error {
Info.Println("RPC invoked")
func (t *AppendEntries) AppendEntriesRPC(args *raft.LogEntryData, reply *Reply) error {
Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
reply.X = 1
return nil
}
func (t *AppendEntries) CommitRPC(args *raft.LogEntry, reply *Reply) error {
Info.Println("Commit RPC invoked")
rft.LogArray[(*args).GetLsn()].SetCommitted(true)
reply.X = 1
return nil
}
......@@ -84,12 +92,11 @@ func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch
}
// Initialize Logger
func initLogger(serverId int) {
func initLogger(serverId int, toDebug bool) {
// Logger Initializaion
if !DEBUG {
if !toDebug {
Info = log.New(ioutil.Discard, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
} else {
Info = log.New(os.Stdout, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
}
......@@ -120,7 +127,11 @@ func main() {
Info.Println("argument ", os.Args[1], "is not string")
}
initLogger(sid)
if len(os.Args) > 3 {
initLogger(sid, true)
} else {
initLogger(sid, false)
}
Info.Println("Starting")
serverCount, err2 := strconv.Atoi((os.Args[2]))
......@@ -132,7 +143,7 @@ func main() {
clusterConfig, _ := raft.NewClusterConfig(serverCount)
commitCh := make(chan raft.LogEntry)
rft, _ := raft.NewRaft(clusterConfig, sid, commitCh)
rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info)
raft.InitKVStore(Info)
go raft.MonitorCommitChannel(commitCh) //for kvstore
......
......@@ -4,7 +4,7 @@ package main
import (
"bytes"
"fmt"
//"fmt"
"net"
"net/rpc"
"os"
......@@ -39,6 +39,8 @@ func TestAll(t *testing.T) {
//test no reply
testNoReply(t)
testSet(t)
}
//run servers
......@@ -63,7 +65,7 @@ func testConnectFollower(t *testing.T) {
if err != nil {
t.Error("Error in connecting the server at port: " + strconv.Itoa(server_port))
} else {
time.Sleep(time.Millisecond)
//time.Sleep(time.Millisecond)
sending := []byte("set mykey1 100 3\r\nlul\r\n")
port := strconv.Itoa(raft.CLIENT_PORT + 1)
expecting := []byte("ERR_REDIRECT 127.0.0.1 " + port + "\r\n")
......@@ -80,7 +82,7 @@ func testConnectFollower(t *testing.T) {
)
}
conn.Close()
time.Sleep(time.Millisecond)
//time.Sleep(time.Millisecond)
}
}
}
......@@ -99,13 +101,13 @@ func testNoReply(t *testing.T) {
if err != nil {
t.Error("Error in connecting the server at port: " + strconv.Itoa(server_port))
} else {
time.Sleep(time.Millisecond)
//time.Sleep(time.Millisecond)
for _, pair := range noreply_cases {
conn.Write(pair.to_server)
buffer := make([]byte, 1024)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
fmt.Println(buffer)
//fmt.Println(buffer)
if !bytes.Equal(buffer[:n], pair.from_server) {
t.Error(
"For", pair.to_server, string(pair.to_server),
......@@ -115,7 +117,40 @@ func testNoReply(t *testing.T) {
}
}
conn.Close()
time.Sleep(time.Millisecond)
//time.Sleep(time.Millisecond)
}
}
//noreply option is not more valid with set and cas
//client should get command error from the server if it sends 'no reply' option
func testSet(t *testing.T) {
var noreply_cases = []Testpair{
{[]byte("set mykey1 100 3\r\nadd\r\n"), []byte("OK 1\r\n")},
}
server_port := raft.CLIENT_PORT + 1
conn, err := net.Dial("tcp", ":"+strconv.Itoa(server_port))
if err != nil {
t.Error("Error in connecting the server at port: " + strconv.Itoa(server_port))
} else {
//time.Sleep(time.Millisecond)
for _, pair := range noreply_cases {
conn.Write(pair.to_server)
buffer := make([]byte, 1024)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
//fmt.Println(buffer)
if !bytes.Equal(buffer[:n], pair.from_server) {
t.Error(
"For", pair.to_server, string(pair.to_server),
"expected", pair.from_server, string(pair.from_server),
"got", buffer[:n], string(buffer[:n]),
)
}
}
conn.Close()
//time.Sleep(time.Millisecond)
}
}
......@@ -190,24 +225,24 @@ func monitorPresentChannel(presentChan chan bool, t *testing.T) {
// Add some dummy entries in raft.ClusterConfig such that majority is not achieved
// Expected: Time out should occur after 5 sec and log entry table should not be updated
func CommandNotCommittedWithoutMajority() {
func testCommandNotCommittedWithoutMajority() {
}
// Expected: Log entry table updated with the new entry
func CommandCommittedWithMajority() {
func testCommandCommittedWithMajority() {
}
// Multiple clients sending different requests
// Expected: Log entry table updated
func ConcurrentManyClientsToLeader() {
func testConcurrentManyClientsToLeader() {
}
// Single client sending 100 requests one after another
// Expected: Log entry table updated
func ConcurrentClientManyRequestsToLeader() {
func testConcurrentClientManyRequestsToLeader() {
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment