Commit 933059b0 authored by Bharath Radhakrishnan's avatar Bharath Radhakrishnan

Command workflow working

parent 39d7ba59
......@@ -2,6 +2,7 @@ package raft
import (
"bytes"
"encoding/gob"
"log"
"net"
"strconv"
......@@ -173,10 +174,14 @@ func MonitorCommitChannel(ch chan LogEntry) {
temp := <-ch
conn := temp.(*LogEntryData).conn
cmd := new(utils.Command)
if err := cmd.GobDecode(temp.Data()); err != nil {
log.Fatal("Error decoding command!")
buffer := bytes.NewBuffer(temp.GetData())
dec := gob.NewDecoder(buffer)
if err := dec.Decode(cmd); err != nil {
log.Fatal("Error decoding command!", err)
}
ParseInput(conn, cmd)
//Debug()
}
}
......@@ -478,7 +483,7 @@ func performDelete(conn net.Conn, tokens []string) int {
*arguments: none
*return: none
*/
func debug() {
func Debug() {
logger.Println("----start debug----")
for key, val := range (*table).dictionary {
logger.Println(key, val)
......
......@@ -19,6 +19,8 @@ const (
// Logger
var Info *log.Logger
var lsn Lsn
// Flag for enabling/disabling logging functionality
var DEBUG = true
......@@ -43,64 +45,58 @@ type SharedLog interface {
}
type Raft struct {
log_array []*LogEntryData
LogArray []*LogEntryData
commitCh chan LogEntry
cluster_config *ClusterConfig //cluster
clusterConfig *ClusterConfig //cluster
id int //this server id
sync.RWMutex
}
type LogEntry interface {
Lsn() Lsn
Data() []byte
Committed() bool
GetLsn() Lsn
GetData() []byte
GetCommitted() bool
SetCommitted(status bool)
}
type LogEntryData struct {
id Lsn
data []byte
committed bool
Id Lsn
Data []byte
Committed bool
conn net.Conn
}
type Args struct {
X int
}
type Reply struct {
X int
}
type AppendEntries struct{}
var cluster_config *ClusterConfig
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*Raft, error) {
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, logger *log.Logger) (*Raft, error) {
rft := new(Raft)
rft.commitCh = commitCh
rft.cluster_config = config
rft.clusterConfig = config
rft.id = thisServerId
Info = logger
lsn = 0
return rft, nil
}
func NewLogEntry(id Lsn, data []byte, committed bool, conn net.Conn) *LogEntryData {
func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData)
entry.id = id
entry.data = data
entry.Id = lsn
entry.Data = data
entry.conn = conn
entry.committed = committed
entry.Committed = committed
lsn++
return entry
}
func SetCommitted(logEntry *LogEntryData, committed bool) {
logEntry.committed = committed
}
//goroutine that monitors channel to check if the majority of servers have replied
func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) {
acks_received := 0
num_servers := len(rft.cluster_config.Servers)
num_servers := len(rft.clusterConfig.Servers)
required_acks := num_servers / 2
up := make(chan bool, 1)
err := false
......@@ -113,9 +109,12 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
for {
select {
case temp := <-ack_ch:
Info.Println("Ack Received:", temp)
acks_received += temp
if acks_received == required_acks {
rft.log_array[log_entry.(*LogEntryData).id].committed = true
Info.Println("Majority Achieved", log_entry.(*LogEntryData).Id)
rft.LogArray[log_entry.(*LogEntryData).Id].Committed = true
//Info.Println(rft.LogArray)
rft.commitCh <- log_entry
majCh <- true
err = true
......@@ -123,6 +122,7 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
}
case <-up:
Info.Println("Error")
err = true
break
}
......@@ -132,44 +132,56 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
}
}
//make LogEntryData implement the
func (entry *LogEntryData) Lsn() Lsn {
return entry.id
//make LogEntryData implement the LogEntry Interface
func (entry *LogEntryData) GetLsn() Lsn {
return entry.Id
}
func (entry *LogEntryData) Data() []byte {
return entry.data
func (entry *LogEntryData) GetData() []byte {
return entry.Data
}
func (entry *LogEntryData) Committed() bool {
return entry.committed
func (entry *LogEntryData) GetCommitted() bool {
return entry.Committed
}
func (entry *LogEntryData) SetCommitted(committed bool) {
entry.Committed = committed
}
//make rpc call to followers
func doRPCCall(ackChan chan int, hostname string, logPort int, temp *LogEntryData) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("RPC Called", logPort)
appendCall := client.Go("AppendEntries.AppendEntriesRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
Info.Println("Reply", appendCall, reply.X)
ackChan <- reply.X
}
//make raft implement the append function
func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
Info.Println("Append Called")
if rft.id != 1 {
return nil, ErrRedirect(1)
}
temp := NewLogEntry(1, data, false, conn)
defer rft.Unlock()
rft.Lock()
temp := NewLogEntry(data, false, conn)
rft.log_array = append(rft.log_array, temp)
rft.LogArray = append(rft.LogArray, temp)
ackChan := make(chan int)
majChan := make(chan bool)
go monitorAckChannel(rft, ackChan, temp, majChan)
for _, server := range cluster_config.Servers[1:] {
go func(ackChan chan int) {
client, err := rpc.Dial("tcp", server.Hostname+":"+strconv.Itoa(server.LogPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
appendCall := client.Go("AppendEntries.AppendEntriesRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
ackChan <- reply.X
}(ackChan)
for _, server := range rft.clusterConfig.Servers[1:] {
doRPCCall(ackChan, server.Hostname, server.LogPort, temp)
}
if <-majChan {
......
......@@ -18,14 +18,25 @@ var Info *log.Logger
// Flag for enabling/disabling logging functionality
var DEBUG = true
var rft *raft.Raft
type AppendEntries struct{}
type Reply struct {
X int
}
func (t *AppendEntries) AppendEntriesRPC(args *raft.LogEntry, reply *Reply) error {
Info.Println("RPC invoked")
func (t *AppendEntries) AppendEntriesRPC(args *raft.LogEntryData, reply *Reply) error {
Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
reply.X = 1
return nil
}
func (t *AppendEntries) CommitRPC(args *raft.LogEntry, reply *Reply) error {
Info.Println("Commit RPC invoked")
rft.LogArray[(*args).GetLsn()].SetCommitted(true)
reply.X = 1
return nil
}
......@@ -97,7 +108,7 @@ func main() {
clusterConfig, _ := raft.NewClusterConfig(serverCount)
commitCh := make(chan raft.LogEntry)
rft, _ := raft.NewRaft(clusterConfig, sid, commitCh)
rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info)
raft.InitKVStore(Info)
go raft.MonitorCommitChannel(commitCh) //for kvstore
......
......@@ -31,13 +31,15 @@ func TestAll(t *testing.T) {
go testServersCommunic(i, t)
}
//wait for some time so that servers are ready
time.Sleep(time.Second)
time.Sleep(4 * time.Second)
//run client that tries connecting to the followers
testConnectFollower(t)
//test no reply
testNoReply(t)
testSet(t)
}
//run servers
......@@ -104,7 +106,40 @@ func testNoReply(t *testing.T) {
buffer := make([]byte, 1024)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
fmt.Println(buffer)
//fmt.Println(buffer)
if !bytes.Equal(buffer[:n], pair.from_server) {
t.Error(
"For", pair.to_server, string(pair.to_server),
"expected", pair.from_server, string(pair.from_server),
"got", buffer[:n], string(buffer[:n]),
)
}
}
conn.Close()
//time.Sleep(time.Millisecond)
}
}
//noreply option is not more valid with set and cas
//client should get command error from the server if it sends 'no reply' option
func testSet(t *testing.T) {
var noreply_cases = []Testpair{
{[]byte("set mykey1 100 3\r\nadd\r\n"), []byte("OK 1\r\n")},
}
server_port := raft.CLIENT_PORT + 1
conn, err := net.Dial("tcp", ":"+strconv.Itoa(server_port))
if err != nil {
t.Error("Error in connecting the server at port: " + strconv.Itoa(server_port))
} else {
//time.Sleep(time.Millisecond)
for _, pair := range noreply_cases {
conn.Write(pair.to_server)
buffer := make([]byte, 1024)
conn.Read(buffer)
n := bytes.Index(buffer, []byte{0})
//fmt.Println(buffer)
if !bytes.Equal(buffer[:n], pair.from_server) {
t.Error(
"For", pair.to_server, string(pair.to_server),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment