Commit dd6e640a authored by Sushant Mahajan's avatar Sushant Mahajan

added client append handling code

parent 82832129
...@@ -50,7 +50,7 @@ type Data struct { ...@@ -50,7 +50,7 @@ type Data struct {
} }
//leader check //leader check
var sid int var isLeader bool
//get value //get value
func (d *Data) GetVal() []byte { func (d *Data) GetVal() []byte {
...@@ -91,7 +91,7 @@ func (kvstr *KeyValueStore) GetDicKVstr() map[string]*Data { ...@@ -91,7 +91,7 @@ func (kvstr *KeyValueStore) GetDicKVstr() map[string]*Data {
func write(conn net.Conn, msg string) { func write(conn net.Conn, msg string) {
buf := []byte(msg) buf := []byte(msg)
buf = append(buf, []byte(CRLF)...) buf = append(buf, []byte(CRLF)...)
if sid == 1 { if isLeader {
conn.Write(buf) conn.Write(buf)
} }
} }
...@@ -202,7 +202,7 @@ func MonitorCommitChannel(ch chan LogEntry) { ...@@ -202,7 +202,7 @@ func MonitorCommitChannel(ch chan LogEntry) {
for { for {
temp := <-ch temp := <-ch
var conn net.Conn var conn net.Conn
if sid == 1 { if isLeader {
conn = temp.(*LogEntryData).conn conn = temp.(*LogEntryData).conn
} else { } else {
conn = nil conn = nil
...@@ -530,9 +530,8 @@ func Debug() { ...@@ -530,9 +530,8 @@ func Debug() {
*arguments: Logger *arguments: Logger
*return: none *return: none
*/ */
func InitKVStore(log *log.Logger, id int) { func InitKVStore(log *log.Logger) {
logger = log logger = log
sid = id
//initialize key value store //initialize key value store
table = &KeyValueStore{dictionary: make(map[string]*Data)} table = &KeyValueStore{dictionary: make(map[string]*Data)}
...@@ -548,3 +547,7 @@ func ReInitServer() { ...@@ -548,3 +547,7 @@ func ReInitServer() {
} }
//fmt.Println(table.dictionary) //fmt.Println(table.dictionary)
} }
func SetIsLeader(l bool) {
isLeader = l
}
...@@ -257,12 +257,13 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, In ...@@ -257,12 +257,13 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, In
// Creates a log entry. This implements the LogEntry interface // Creates a log entry. This implements the LogEntry interface
// data: data bytes, committed: commit status, conn: connection to client // data: data bytes, committed: commit status, conn: connection to client
// Returns the log entry // Returns the log entry
func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData { func (rft *Raft) NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData) entry := new(LogEntryData)
entry.Id = lsn entry.Id = lsn
entry.Data = data entry.Data = data
entry.conn = conn entry.conn = conn
entry.Committed = committed entry.Committed = committed
entry.Term = rft.currentTerm
lsn++ lsn++
return entry return entry
} }
...@@ -290,14 +291,13 @@ func (entry *LogEntryData) SetCommitted(committed bool) { ...@@ -290,14 +291,13 @@ func (entry *LogEntryData) SetCommitted(committed bool) {
//make raft implement the append function //make raft implement the append function
func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) { func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
rft.Info.Println("Append Called") rft.Info.Println("Append Called")
if rft.id != 1 { if !rft.isLeader {
return nil, ErrRedirect(1) return nil, ErrRedirect(1)
} }
defer rft.Unlock() defer rft.Unlock()
rft.Lock() rft.Lock()
temp := NewLogEntry(data, false, conn) temp := rft.NewLogEntry(data, false, conn)
rft.AddToEventChannel(temp)
rft.LogArray = append(rft.LogArray, temp)
return temp, nil return temp, nil
} }
...@@ -357,7 +357,8 @@ func (e ErrRedirect) Error() string { ...@@ -357,7 +357,8 @@ func (e ErrRedirect) Error() string {
} }
//entry loop to raft //entry loop to raft
func (rft *Raft) loop() { func (rft *Raft) Loop() {
go rft.MonitorStateMachine()
state := FOLLOWER state := FOLLOWER
for { for {
switch state { switch state {
...@@ -428,7 +429,8 @@ func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest, rft *Raft ...@@ -428,7 +429,8 @@ func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest, rft *Raft
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
defer client.Close() defer client.Close()
if err != nil { if err != nil {
rft.Info.Fatal("Dialing:", err) rft.Info.Println("Dialing:", err, "returning")
return
} }
reply := new(VoteRequestReply) reply := new(VoteRequestReply)
args := temp args := temp
...@@ -443,7 +445,8 @@ func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC, rft *Raft) { ...@@ -443,7 +445,8 @@ func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC, rft *Raft) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort)) client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
defer client.Close() defer client.Close()
if err != nil { if err != nil {
rft.Info.Fatal("[L]: Dialing:", err) rft.Info.Println("[L]: Dialing:", err, "returning")
return
} }
reply := new(AppendReply) reply := new(AppendReply)
args := temp args := temp
...@@ -463,6 +466,7 @@ func (rft *Raft) updateTermAndVote(term int) { ...@@ -463,6 +466,7 @@ func (rft *Raft) updateTermAndVote(term int) {
func (rft *Raft) follower() int { func (rft *Raft) follower() int {
//start candidate timeout //start candidate timeout
rft.et = time.NewTimer(getRandTime(rft.Info)) rft.et = time.NewTimer(getRandTime(rft.Info))
SetIsLeader(false)
for { for {
//wrap in select //wrap in select
select { select {
...@@ -576,6 +580,7 @@ func (rft *Raft) follower() int { ...@@ -576,6 +580,7 @@ func (rft *Raft) follower() int {
func (rft *Raft) candidate() int { func (rft *Raft) candidate() int {
//increment current term //increment current term
rft.Info.Println("[C]: became candidate") rft.Info.Println("[C]: became candidate")
SetIsLeader(false)
writeFile(CURRENT_TERM, rft.id, rft.currentTerm+1, rft.Info) writeFile(CURRENT_TERM, rft.id, rft.currentTerm+1, rft.Info)
rft.currentTerm++ rft.currentTerm++
//vote for self //vote for self
...@@ -665,34 +670,14 @@ func enforceLog(rft *Raft) { ...@@ -665,34 +670,14 @@ func enforceLog(rft *Raft) {
func (rft *Raft) leader() int { func (rft *Raft) leader() int {
rft.Info.Println("[L]: became leader") rft.Info.Println("[L]: became leader")
//update kvstore
SetIsLeader(true)
heartbeat := time.NewTimer(time.Millisecond * HEARTBEAT_TIMEOUT) heartbeat := time.NewTimer(time.Millisecond * HEARTBEAT_TIMEOUT)
heartbeatReq := new(AppendRPC) heartbeatReq := new(AppendRPC)
heartbeatReq.Entries = []*LogEntryData{} heartbeatReq.Entries = []*LogEntryData{}
heartbeatReq.LeaderId = rft.id heartbeatReq.LeaderId = rft.id
rft.currentTerm++ rft.currentTerm++
rft.LogArray = append(
rft.LogArray,
&LogEntryData{
Id: 1,
Data: []byte("hello"),
Committed: false,
Term: rft.currentTerm,
},
&LogEntryData{
Id: 2,
Data: []byte("world"),
Committed: false,
Term: rft.currentTerm,
})
newEntry := &LogEntryData{
Id: 3,
Data: []byte("goodbye"),
Committed: false,
Term: rft.currentTerm,
}
//build nextIndex and matchIndex //build nextIndex and matchIndex
for i := 0; i < len(rft.nextIndex); i++ { for i := 0; i < len(rft.nextIndex); i++ {
rft.nextIndex[i] = 0 rft.nextIndex[i] = 0
...@@ -700,10 +685,6 @@ func (rft *Raft) leader() int { ...@@ -700,10 +685,6 @@ func (rft *Raft) leader() int {
} }
go enforceLog(rft) go enforceLog(rft)
go func() {
time.Sleep(time.Second * 2)
rft.LogArray = append(rft.LogArray, newEntry)
}()
for { for {
select { select {
...@@ -724,8 +705,7 @@ func (rft *Raft) leader() int { ...@@ -724,8 +705,7 @@ func (rft *Raft) leader() int {
rft.Info.Println("[L]: got client data") rft.Info.Println("[L]: got client data")
entry := event.(*ClientAppend).logEntry entry := event.(*ClientAppend).logEntry
rft.LogArray = append(rft.LogArray, entry) rft.LogArray = append(rft.LogArray, entry)
//todo:apply to state machine //will now be send to kvstore which'll decode and reply
//todo:respond to client
rft.persistLog() rft.persistLog()
case *AppendRPC: case *AppendRPC:
...@@ -737,6 +717,12 @@ func (rft *Raft) leader() int { ...@@ -737,6 +717,12 @@ func (rft *Raft) leader() int {
} }
} }
func StartRaft(rft *Raft) { func (rft *Raft) MonitorStateMachine() {
rft.loop() for {
if rft.commitIndex > rft.lastApplied {
rft.lastApplied++
rft.commitCh <- rft.LogArray[rft.lastApplied]
}
time.Sleep(time.Second * 1)
}
} }
...@@ -209,14 +209,14 @@ func main() { ...@@ -209,14 +209,14 @@ func main() {
commitCh := make(chan raft.LogEntry) commitCh := make(chan raft.LogEntry)
rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info) rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info)
raft.InitKVStore(Info, sid) raft.InitKVStore(Info)
go raft.MonitorCommitChannel(commitCh) //for kvstore go raft.MonitorCommitChannel(commitCh) //for kvstore
go initClientCommunication(server, rft, ch1) go initClientCommunication(server, rft, ch1)
go initInterServerCommunication(server, rft, ch2) go initInterServerCommunication(server, rft, ch2)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
raft.StartRaft(rft) rft.Loop()
for <-ch1 && <-ch2 { for <-ch1 && <-ch2 {
......
...@@ -31,6 +31,7 @@ func TestAll(t *testing.T) { ...@@ -31,6 +31,7 @@ func TestAll(t *testing.T) {
//wait for some time so that servers are ready //wait for some time so that servers are ready
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
testClientAppend()
if <-dummy { if <-dummy {
} }
...@@ -49,3 +50,7 @@ func startServers(i int, t *testing.T, dummy chan bool) { ...@@ -49,3 +50,7 @@ func startServers(i int, t *testing.T, dummy chan bool) {
cmd.Stderr = f cmd.Stderr = f
cmd.Run() cmd.Run()
} }
func testClientAppend() {
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment