Commit 56f872c7 authored by Sushant Mahajan's avatar Sushant Mahajan

added code for persisting and unmarshalling the log, updated cleanup file

parent 0a0bba45
#! /bin/bash
rm {1..5} currentTerm* votedFor*
rm {1..5} currentTerm* votedFor* log*
package raft
import (
"encoding/json"
"io/ioutil"
"log"
"math/rand"
......@@ -24,6 +25,7 @@ const (
FOLLOWER = 30
VOTED_FOR = "votedFor"
CURRENT_TERM = "currentTerm"
LOG_PERSIST = "log"
FILE_WRITTEN = 0
FILE_ERR = -1
NULL_VOTE = 0
......@@ -126,11 +128,41 @@ type LogEntryData struct {
}
func (rft *Raft) persistLog() {
if file, err := os.OpenFile(LOG_PERSIST+strconv.Itoa(rft.id), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666); err != nil {
rft.Info.Println("error opening log persist file", err.Error())
} else {
defer file.Close()
enc := json.NewEncoder(file)
for _, e := range rft.LogArray {
if err := enc.Encode(e); err != nil {
rft.Info.Println("error persisting log entry", err.Error())
}
}
if err := file.Sync(); err != nil {
rft.Info.Println("error synching log persist file", err.Error())
} else {
rft.Info.Println("log persist success!")
}
}
}
func (rft *Raft) readLogFromDisk() {
if file, err := os.OpenFile(LOG_PERSIST+strconv.Itoa(rft.id), os.O_RDONLY, 0666); err != nil {
rft.Info.Println("error reading log persist file")
} else {
defer file.Close()
dec := json.NewDecoder(file)
for {
d := LogEntryData{}
if err := dec.Decode(d); err != nil {
rft.Info.Println("done reading log from file")
break
} else {
rft.LogArray = []*LogEntryData{}
rft.LogArray = append(rft.LogArray, &d)
}
}
}
}
func getSingleDataFromFile(name string, serverId int, info *log.Logger) int {
......@@ -432,6 +464,9 @@ func (rft *Raft) follower() int {
}
}
rafts[req.leaderId].replyAppendRPC(reply, rft.currentTerm, rft.id)
if reply {
rft.persistLog()
}
rft.Info.Println("F: log is size", len(rft.LogArray))
}
}
......@@ -520,6 +555,7 @@ func (rft *Raft) leader() int {
heartbeatReq := new(AppendRPC)
heartbeatReq.entries = []*LogEntryData{}
heartbeatReq.leaderId = rft.id
rft.currentTerm++
rft.LogArray = append(
rft.LogArray,
......@@ -527,11 +563,13 @@ func (rft *Raft) leader() int {
Id: 1,
Data: []byte("hello"),
Committed: false,
Term: rft.currentTerm,
},
&LogEntryData{
Id: 2,
Data: []byte("world"),
Committed: false,
Term: rft.currentTerm,
})
//build nextIndex and matchIndex
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment