Commit 89a62c9d authored by Sushant Mahajan's avatar Sushant Mahajan

modified logentry struct and sharedlog interface

parent e36c6dbe
...@@ -116,20 +116,11 @@ func Write(conn net.Conn, msg string) { ...@@ -116,20 +116,11 @@ func Write(conn net.Conn, msg string) {
conn.Write(buf) conn.Write(buf)
} }
func ReplyToClient(conn net.Conn, writeCh chan []byte) {
for {
w := <-writeCh
Write(conn, string(w))
}
}
func HandleClient(conn net.Conn, rft *raft.Raft) { func HandleClient(conn net.Conn, rft *raft.Raft) {
defer conn.Close() defer conn.Close()
//channel for every connection for every client //channel for every connection for every client
ch := make(chan []byte) ch := make(chan []byte)
writeCh := make(chan []byte)
go MyRead(ch, conn) go MyRead(ch, conn)
go ReplyToClient(conn, writeCh)
for { for {
command := new(utils.Command) command := new(utils.Command)
...@@ -167,6 +158,6 @@ func HandleClient(conn net.Conn, rft *raft.Raft) { ...@@ -167,6 +158,6 @@ func HandleClient(conn net.Conn, rft *raft.Raft) {
//log.Fatal("encode error:", err) //log.Fatal("encode error:", err)
} }
rft.Append(buffer.Bytes(), writeCh) rft.Append(buffer.Bytes())
} }
} }
...@@ -523,9 +523,9 @@ func InitKVStore() { ...@@ -523,9 +523,9 @@ func InitKVStore() {
table = &KeyValueStore{dictionary: make(map[string]*Data)} table = &KeyValueStore{dictionary: make(map[string]*Data)}
} }
func MonitorCommitChannel(ch chan LogEntry, conn) { func MonitorCommitChannel(ch chan LogEntry) {
for { for {
//do some shit temp := <-ch
} }
} }
......
...@@ -57,6 +57,7 @@ type LogEntryData struct { ...@@ -57,6 +57,7 @@ type LogEntryData struct {
id Lsn id Lsn
data []byte data []byte
committed bool committed bool
conn net.Conn
} }
type Args struct { type Args struct {
...@@ -128,7 +129,7 @@ func (entry *LogEntryData) Committed() bool { ...@@ -128,7 +129,7 @@ func (entry *LogEntryData) Committed() bool {
} }
//make raft implement the append function //make raft implement the append function
func (rft *Raft) Append(data []byte, writeCh chan []byte) (LogEntry, error) { func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
if rft.id != 1 { if rft.id != 1 {
return nil, ErrRedirect(1) return nil, ErrRedirect(1)
} }
...@@ -136,6 +137,7 @@ func (rft *Raft) Append(data []byte, writeCh chan []byte) (LogEntry, error) { ...@@ -136,6 +137,7 @@ func (rft *Raft) Append(data []byte, writeCh chan []byte) (LogEntry, error) {
temp.id = 1 temp.id = 1
temp.committed = false temp.committed = false
temp.data = data temp.data = data
temp.conn = conn
rft.log_array = append(rft.log_array, temp) rft.log_array = append(rft.log_array, temp)
ackChan := make(chan int) ackChan := make(chan int)
......
...@@ -101,7 +101,9 @@ func main() { ...@@ -101,7 +101,9 @@ func main() {
commitCh := make(chan raft.LogEntry) commitCh := make(chan raft.LogEntry)
rft, _ := raft.NewRaft(clusterConfig, sid, commitCh) rft, _ := raft.NewRaft(clusterConfig, sid, commitCh)
raft.InitKVStore()
go raft.MonitorCommitChannel(commitCh)
go initClientCommunication(server, rft, ch1) go initClientCommunication(server, rft, ch1)
go initInterServerCommunication(server, rft, ch2) go initInterServerCommunication(server, rft, ch2)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment