Commit d8f890f1 authored by Sushant Mahajan's avatar Sushant Mahajan

exposed method to create new log entry

parent ce3f113b
...@@ -83,6 +83,16 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (* ...@@ -83,6 +83,16 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry) (*
return rft, nil return rft, nil
} }
func NewLogEntry(id Lsn, data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData)
entry.id = id
entry.data = data
entry.conn = conn
entry.committed = committed
return entry
}
//goroutine that monitors channel to check if the majority of servers have replied //goroutine that monitors channel to check if the majority of servers have replied
func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) { func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) {
acks_received := 0 acks_received := 0
...@@ -136,11 +146,8 @@ func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) { ...@@ -136,11 +146,8 @@ func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
if rft.id != 1 { if rft.id != 1 {
return nil, ErrRedirect(1) return nil, ErrRedirect(1)
} }
temp := new(LogEntryData) temp := NewLogEntry(1, data, false, conn)
temp.id = 1
temp.committed = false
temp.data = data
temp.conn = conn
rft.log_array = append(rft.log_array, temp) rft.log_array = append(rft.log_array, temp)
ackChan := make(chan int) ackChan := make(chan int)
......
...@@ -4,7 +4,7 @@ package main ...@@ -4,7 +4,7 @@ package main
import ( import (
"bytes" "bytes"
"fmt" //"fmt"
"net" "net"
"os" "os"
"os/exec" "os/exec"
...@@ -31,7 +31,7 @@ func TestAll(t *testing.T) { ...@@ -31,7 +31,7 @@ func TestAll(t *testing.T) {
go testServersCommunic(i, t) go testServersCommunic(i, t)
} }
//wait for some time so that servers are ready //wait for some time so that servers are ready
time.Sleep(4 * time.Second) time.Sleep(time.Second)
//run client that tries connecting to the followers //run client that tries connecting to the followers
testConnectFollower(t) testConnectFollower(t)
...@@ -62,7 +62,7 @@ func testConnectFollower(t *testing.T) { ...@@ -62,7 +62,7 @@ func testConnectFollower(t *testing.T) {
if err != nil { if err != nil {
t.Error("Error in connecting the server at port: " + strconv.Itoa(server_port)) t.Error("Error in connecting the server at port: " + strconv.Itoa(server_port))
} else { } else {
time.Sleep(time.Millisecond) //time.Sleep(time.Millisecond)
sending := []byte("set mykey1 100 3\r\nlul\r\n") sending := []byte("set mykey1 100 3\r\nlul\r\n")
port := strconv.Itoa(raft.CLIENT_PORT + 1) port := strconv.Itoa(raft.CLIENT_PORT + 1)
expecting := []byte("ERR_REDIRECT 127.0.0.1 " + port + "\r\n") expecting := []byte("ERR_REDIRECT 127.0.0.1 " + port + "\r\n")
...@@ -79,7 +79,7 @@ func testConnectFollower(t *testing.T) { ...@@ -79,7 +79,7 @@ func testConnectFollower(t *testing.T) {
) )
} }
conn.Close() conn.Close()
time.Sleep(time.Millisecond) //time.Sleep(time.Millisecond)
} }
} }
} }
...@@ -98,7 +98,7 @@ func testNoReply(t *testing.T) { ...@@ -98,7 +98,7 @@ func testNoReply(t *testing.T) {
if err != nil { if err != nil {
t.Error("Error in connecting the server at port: " + strconv.Itoa(server_port)) t.Error("Error in connecting the server at port: " + strconv.Itoa(server_port))
} else { } else {
time.Sleep(time.Millisecond) //time.Sleep(time.Millisecond)
for _, pair := range noreply_cases { for _, pair := range noreply_cases {
conn.Write(pair.to_server) conn.Write(pair.to_server)
buffer := make([]byte, 1024) buffer := make([]byte, 1024)
...@@ -113,6 +113,6 @@ func testNoReply(t *testing.T) { ...@@ -113,6 +113,6 @@ func testNoReply(t *testing.T) {
} }
} }
conn.Close() conn.Close()
time.Sleep(time.Millisecond) //time.Sleep(time.Millisecond)
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment