Commit fd164d82 authored by Sushant Mahajan's avatar Sushant Mahajan

partially completed integration

parent f2a18073
#! /bin/bash
rm {1..5} currentTerm* votedFor* log*
package connhandler
import (
"bufio"
"bytes"
"encoding/gob"
"log"
"net"
"raft"
"strconv"
"strings"
"time"
"utils"
)
/*
*Helper function to read value or cause timeout after READ_TIMEOUT seconds
*parameters: channel to read data from, threshold number of bytes to read, log pointer to write into
*returns: the value string and error state
*/
func readValue(ch chan []byte, n uint64, logger *log.Logger) ([]byte, bool) {
//now we need to read the value which should have been sent
valReadLength := uint64(0)
var v []byte
err := false
up := make(chan bool, 1)
//after 5 seconds passed reading value, we'll just send err to client
go func() {
time.Sleep(5 * time.Second)
up <- true
}()
//use select for the data channel and the timeout channel
for valReadLength < n+2 {
select {
case temp := <-ch:
valReadLength += uint64(len(temp))
if valReadLength > n+2 {
err = true
break
}
v = append(v, temp...)
case <-up:
err = true
break
}
//will be true if timeout occurs
if err {
logger.Println("Timeout")
break
}
}
if err {
return []byte{0}, err
}
return v[:n], err
}
/*Copied from the bufio.Scanner (originally ScanLines).
*By default it splits by '\n' but now we want it to split by '\r\n'
*arguments: data in bytes, is eof reached
*return: next sequence of bytes, chunk of data found, err state
*/
func CustomSplitter(data []byte, atEOF bool) (advance int, token []byte, err error) {
omega := 0
if atEOF && len(data) == 0 {
return 0, nil, nil
}
for {
if i := bytes.IndexByte(data[omega:], '\n'); i >= 0 {
//here we add omega as we are using the complete data array instead of the slice where we found '\n'
if i > 0 && data[omega+i-1] == '\r' {
//next byte begins at i+1 and data[0:i+1] returned
return omega + i + 1, data[:omega+i+1], nil
} else {
//move the omega index to the byte after \n
omega += i + 1
}
} else {
//need to break free the chains
break
}
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
/*Function to read data from the connection and put it on the channel so it could be read in a systematic fashion.
*arguments: channel shared between this go routine and other functions performing actions based on the commands given,
*client connection
*return: none
*/
func MyRead(ch chan []byte, conn net.Conn) {
scanner := bufio.NewScanner(conn)
scanner.Split(CustomSplitter)
for {
if ok := scanner.Scan(); !ok {
break
} else {
temp := scanner.Bytes()
ch <- temp
}
}
}
/*Simple write function to send information to the client
*arguments: client connection, msg to send to the client
*return: none
*/
func Write(conn net.Conn, msg string) {
buf := []byte(msg)
buf = append(buf, []byte("\r\n")...)
conn.Write(buf)
}
/*Will be invoked as go routine by server to every client connection. Will take care of all communication with the
*client and the raft/kvstore
*arguments: connection to client, pointer to raft, pointer to logger
*return: none
*/
func HandleClient(conn net.Conn, rft *raft.Raft, logger *log.Logger) {
defer conn.Close()
//channel for every connection for every client
ch := make(chan []byte)
go MyRead(ch, conn)
for {
command := new(utils.Command)
msg := <-ch
logger.Println("got:", msg, string(msg))
if len(msg) == 0 {
continue
}
command.Cmd = msg
flag := false
nr := uint64(0)
tokens := strings.Fields(string(msg))
if tokens[0] == "cas" {
n, _ := strconv.ParseUint(tokens[4], 10, 64)
nr = n
flag = true
} else if tokens[0] == "set" {
n, _ := strconv.ParseUint(tokens[3], 10, 64)
nr = n
flag = true
}
if flag {
logger.Println("numbytes", nr)
if v, err := readValue(ch, nr, logger); err {
logger.Println("error reading value")
Write(conn, "ERR_CMD_ERR")
continue
} else {
command.Val = v
//command.isVal = true
}
}
buffer := new(bytes.Buffer)
// writing
enc := gob.NewEncoder(buffer)
err := enc.Encode(command)
if err != nil {
//log.Fatal("encode error:", err)
}
if _, err := rft.Append(buffer.Bytes(), conn); err != nil {
Write(conn, "ERR_REDIRECT 127.0.0.1 "+strconv.Itoa(raft.CLIENT_PORT+1))
conn.Close()
break
}
}
}
package main
import (
"fmt"
"raft"
)
const (
SERVERS = 5
)
func main() {
dummyCh := make(chan bool, 1)
fmt.Println("Started")
for i := 1; i <= 5; i++ {
go raft.Start(i, true)
}
if <-dummyCh {
fmt.Println("khattam")
}
}
package raft
import (
"bytes"
"encoding/gob"
"log"
"net"
"strconv"
"strings"
"sync"
"time"
"utils"
)
/*Constants used throughout the program to identify commands, request, response, and error messages*/
const (
//request
SET = "set"
GET = "get"
GETM = "getm"
CAS = "cas"
DELETE = "delete"
NOREPLY = "noreply"
// //response
OK = "OK"
CRLF = "\r\n"
VALUE = "VALUE"
DELETED = "DELETED"
//errors
ERR_CMD_ERR = "ERR_CMD_ERR"
ERR_NOT_FOUND = "ERR_NOT_FOUND"
ERR_VERSION = "ERR_VERSION"
ERR_INTERNAL = "ERR_INTERNAL"
//constant
MAX_CMD_ARGS = 5
MIN_CMD_ARGS = 2
READ_TIMEOUT = 5
)
//represents the value in the main hashtable (key, value) pair
type Data struct {
numBytes uint64 //number of bytes of the value bytes
version uint64 //current version of the key
expTime uint64 //time offset in seconds after which the key should expire
value []byte //bytes representing the actual content of the value
isPerpetual bool //specifies that the key does not expire
}
//leader check
var sid int
//get value
func (d *Data) GetVal() []byte {
return d.value
}
//get version
func (d *Data) GetVers() uint64 {
return d.version
}
//represents the main hashtable where the dance actually happens
type KeyValueStore struct {
dictionary map[string]*Data //the hashtable that stores the (key, value) pairs
sync.RWMutex //mutex for synchronization when reading or writing to the hashtable
}
//pointer to custom logger
var logger *log.Logger
//cache
var table *KeyValueStore
//function to get access to the keyvaluestore
func GetKeyValStr() *KeyValueStore {
return table
}
//access the dictionary
func (kvstr *KeyValueStore) GetDicKVstr() map[string]*Data {
return kvstr.dictionary
}
/*Simple write function to send information to the client
*arguments: client connection, msg to send to the client
*return: none
*/
func write(conn net.Conn, msg string) {
buf := []byte(msg)
buf = append(buf, []byte(CRLF)...)
if sid == 1 {
conn.Write(buf)
}
}
/*Basic validations for various commands
*arguments: command to check against, other parmameters sent with the command (excluding the value), client connection
*return: integer representing error state
*/
func isValid(cmd string, tokens []string, conn net.Conn) int {
switch cmd {
case SET:
if len(tokens) != 4 {
logger.Println(cmd, ":Invalid no. of tokens")
write(conn, ERR_CMD_ERR)
return 1
}
if len([]byte(tokens[1])) > 250 {
logger.Println(cmd, ":Invalid size of key")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[2], 10, 64); err != nil {
logger.Println(cmd, ":expiry time invalid")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[3], 10, 64); err != nil {
logger.Println(cmd, ":numBytes invalid")
write(conn, ERR_CMD_ERR)
return 1
}
case GET:
if len(tokens) != 2 {
logger.Println(cmd, ":Invalid number of arguments")
write(conn, ERR_CMD_ERR)
return 1
}
if len(tokens[1]) > 250 {
logger.Println(cmd, ":Invalid key size")
write(conn, ERR_CMD_ERR)
return 1
}
case GETM:
if len(tokens) != 2 {
logger.Println(cmd, ":Invalid number of tokens")
write(conn, ERR_CMD_ERR)
return 1
}
if len(tokens[1]) > 250 {
logger.Println(cmd, ":Invalid key size")
write(conn, ERR_CMD_ERR)
return 1
}
case CAS:
if len(tokens) != 5 {
logger.Println(cmd, ":Invalid number of tokens")
write(conn, ERR_CMD_ERR)
return 1
}
if len([]byte(tokens[1])) > 250 {
logger.Println(cmd, ":Invalid size of key")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[2], 10, 64); err != nil {
logger.Println(cmd, ":expiry time invalid")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[3], 10, 64); err != nil {
logger.Println(cmd, ":version invalid")
write(conn, ERR_CMD_ERR)
return 1
}
if _, err := strconv.ParseUint(tokens[4], 10, 64); err != nil {
logger.Println(cmd, ":numbytes invalid")
write(conn, ERR_CMD_ERR)
return 1
}
case DELETE:
if len(tokens) != 2 {
logger.Println(cmd, ":Invalid number of tokens")
write(conn, ERR_CMD_ERR)
return 1
}
if len([]byte(tokens[1])) > 250 {
logger.Println(cmd, ":Invalid size of key")
write(conn, ERR_CMD_ERR)
return 1
}
default:
return 0
}
//compiler is happy
return 0
}
/*Function monitors the channel for committed log entries approved by majority
*arguments: Commit channel
*return: none
*/
func MonitorCommitChannel(ch chan LogEntry) {
for {
temp := <-ch
var conn net.Conn
if sid == 1 {
conn = temp.(*LogEntryData).conn
} else {
conn = nil
}
cmd := new(utils.Command)
buffer := bytes.NewBuffer(temp.GetData())
dec := gob.NewDecoder(buffer)
if err := dec.Decode(cmd); err != nil {
log.Fatal("Error decoding command!", err)
}
ParseInput(conn, cmd)
Debug()
}
}
/*Function parses the command provided by the client and delegates further action to command specific functions.
*Based on the return values of those functions, send appropriate messages to the client.
*arguments: client connection, message from client, channel shared with myRead function
*return: none
*/
func ParseInput(conn net.Conn, cmd *utils.Command) {
msg := string(cmd.Cmd)
tokens := strings.Fields(msg)
//general error, don't check for commands, avoid the pain ;)
if len(tokens) > MAX_CMD_ARGS || len(tokens) < MIN_CMD_ARGS {
write(conn, ERR_CMD_ERR)
return
}
//fmt.Println(tokens)
//for efficient string concatenation
var buffer bytes.Buffer
switch tokens[0] {
case SET:
if isValid(SET, tokens, conn) != 0 {
return
}
if ver, ok, r := performSet(tokens[1:len(tokens)], cmd); ok {
//debug(table)
logger.Println(ver)
if r {
buffer.Reset()
buffer.WriteString(OK)
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatUint(ver, 10))
logger.Println(buffer.String())
write(conn, buffer.String())
}
}
case GET:
if isValid(GET, tokens, conn) != 0 {
return
}
if data, ok := performGet(tokens[1:len(tokens)]); ok {
logger.Println("sending", tokens[1], "data")
buffer.Reset()
buffer.WriteString(VALUE)
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatUint(data.numBytes, 10))
write(conn, buffer.String())
buffer.Reset()
buffer.Write(data.value)
write(conn, buffer.String())
} else {
buffer.Reset()
buffer.WriteString(ERR_NOT_FOUND)
write(conn, buffer.String())
}
//debug(table)
case GETM:
if isValid(GETM, tokens, conn) != 0 {
return
}
if data, ok := performGetm(tokens[1:len(tokens)]); ok {
logger.Println("sending", tokens[1], "metadata")
buffer.Reset()
buffer.WriteString(VALUE)
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatUint(data.version, 10))
buffer.WriteString(" ")
if data.isPerpetual {
buffer.WriteString("0")
} else {
buffer.WriteString(strconv.FormatUint(data.expTime-uint64(time.Now().Unix()), 10))
}
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatUint(data.numBytes, 10))
write(conn, buffer.String())
buffer.Reset()
buffer.Write(data.value)
write(conn, buffer.String())
} else {
buffer.Reset()
buffer.WriteString(ERR_NOT_FOUND)
write(conn, buffer.String())
}
//debug(table)
case CAS:
if isValid(CAS, tokens, conn) != 0 {
return
}
if ver, ok, r := performCas(tokens[1:len(tokens)], cmd); r {
if r {
switch ok {
case 0:
buffer.Reset()
buffer.WriteString(OK)
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatUint(ver, 10))
logger.Println(buffer.String())
write(conn, buffer.String())
case 1:
buffer.Reset()
buffer.WriteString(ERR_CMD_ERR)
write(conn, buffer.String())
case 2:
buffer.Reset()
buffer.WriteString(ERR_VERSION)
write(conn, buffer.String())
case 3:
buffer.Reset()
buffer.WriteString(ERR_NOT_FOUND)
write(conn, buffer.String())
}
}
}
//debug(table)
case DELETE:
if isValid(DELETE, tokens, conn) != 0 {
return
}
if ok := performDelete(tokens[1:len(tokens)]); ok == 0 {
write(conn, DELETED)
} else {
write(conn, ERR_NOT_FOUND)
}
//debug(table)
default:
buffer.Reset()
buffer.WriteString(ERR_CMD_ERR)
write(conn, buffer.String())
}
}
/*Delegate function responsible for all parsing and hashtable interactions for the SET command sent by client
*arguments: tokenized command sent by the client, command structure @utils.Command
*return: version of inserted key (if successful, 0 otherwise), success or failure, whether to send reply to client
*/
func performSet(tokens []string, cmd *utils.Command) (uint64, bool, bool) {
k := tokens[0]
//expiry time offset
e, _ := strconv.ParseUint(tokens[1], 10, 64)
//numbytes
n, _ := strconv.ParseUint(tokens[2], 10, 64)
r := true
logger.Println(r)
defer table.Unlock()
table.Lock()
//critical section start
var val *Data
if _, ok := table.dictionary[k]; ok {
val = table.dictionary[k]
} else {
val = new(Data)
table.dictionary[k] = val
}
val.numBytes = n
val.version++
if e == 0 {
val.isPerpetual = true
val.expTime = 0
} else {
val.isPerpetual = false
val.expTime = e + uint64(time.Now().Unix())
}
val.value = cmd.Val
return val.version, true, r
}
/*Delegate function reponsible for activities related to the GET command sent by the client.
*arguments: tokenized command sent by the client
*return: pointer to value corresponding to the key given by client, success or failure
*/
func performGet(tokens []string) (*Data, bool) {
k := tokens[0]
defer table.Unlock()
//lock because if key is expired, we'll delete it
table.Lock()
//critical section begin
if v, ok := table.dictionary[k]; ok {
if !v.isPerpetual && v.expTime < uint64(time.Now().Unix()) {
//delete the key
delete(table.dictionary, k)
return nil, false
}
data := new(Data)
data.numBytes = v.numBytes
data.value = v.value[:]
return data, true
} else {
return nil, false
}
}
/*Delegate function reponsible for activities related to the GETM command sent by the client.
*arguments: tokenized command sent by the client
*return: pointer to value corresponding to the key given by client, success or failure
*/
func performGetm(tokens []string) (*Data, bool) {
k := tokens[0]
defer table.Unlock()
table.Lock()
//critical section begin
if v, ok := table.dictionary[k]; ok {
if !v.isPerpetual && v.expTime < uint64(time.Now().Unix()) {
//delete the key
delete(table.dictionary, k)
return nil, false
}
data := new(Data)
data.version = v.version
data.expTime = v.expTime
data.numBytes = v.numBytes
data.value = v.value[:]
data.isPerpetual = v.isPerpetual
return data, true
} else {
return nil, false
}
}
/*Delegate function reponsible for activities related to the CAS command sent by the client.
*arguments: tokenized command sent by the client, cmd pointer @utils.Command
*return: new version of updated key (if it is updated), error status {0: error while reading new value, 1: key found and changed,
*2: version mismatch with key, 3: key not found}, whether to reply to client
*/
func performCas(tokens []string, cmd *utils.Command) (uint64, int, bool) {
k := tokens[0]
e, _ := strconv.ParseUint(tokens[1], 10, 64)
ve, _ := strconv.ParseUint(tokens[2], 10, 64)
n, _ := strconv.ParseUint(tokens[3], 10, 64)
r := true
logger.Println(k, e, ve, n, r)
defer table.Unlock()
table.Lock()
if val, ok := table.dictionary[k]; ok {
if val.version == ve {
if val.isPerpetual || val.expTime >= uint64(time.Now().Unix()) {
//if expiry time is zero, key should not be deleted
if e == 0 {
val.isPerpetual = true
val.expTime = 0
} else {
val.isPerpetual = false
val.expTime = e + uint64(time.Now().Unix())
}
val.numBytes = n
val.version++
val.value = cmd.Val
//key found and changed
return val.version, 0, r
} else {
logger.Println("expired key found!")
//version found but key expired, can delete key safely and tell client that it does not exist
delete(table.dictionary, k)
return 0, 3, r
}
}
//version mismatch
return 0, 2, r
}
//key not found
return 0, 3, r
}
/*Delegate function reponsible for activities related to the DELETE command sent by the client.
*arguments: tokenized command sent by the client
*return: integer secifying error state {0: found and deleted, 1: found but expired (deleted but client told non-existent, 2: key not found}
*/
func performDelete(tokens []string) int {
k := tokens[0]
logger.Println(tokens)
flag := 1
defer table.Unlock()
table.Lock()
//begin critical section
if v, ok := table.dictionary[k]; ok {
if v.isPerpetual || v.expTime >= uint64(time.Now().Unix()) {
//found not expired
flag = 0
}
//delete anyway as expired or needs to be deleted
delete(table.dictionary, k)
return flag
}
//key not found
return 2
}
/*Simple function that dumps the contents of the hashtable
*arguments: none
*return: none
*/
func Debug() {
logger.Println("----start debug----")
for key, val := range (*table).dictionary {
logger.Println(key, val)
}
logger.Println("----end debug----")
}
/*Function that initializes the KV Store
*arguments: Logger
*return: none
*/
func InitKVStore(log *log.Logger, id int) {
logger = log
sid = id
//initialize key value store
table = &KeyValueStore{dictionary: make(map[string]*Data)}
logger.Println("KVStore initialized")
}
//server will not call this, we'll call it from test cases to clear the map
func ReInitServer() {
defer table.Unlock()
table.Lock()
for key, _ := range table.dictionary {
delete(table.dictionary, key)
}
//fmt.Println(table.dictionary)
}
package raft
import (
"encoding/json"
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"strconv"
"sync"
"time"
)
//constant values used
const (
CLIENT_PORT = 9000
LOG_PORT = 20000
ACK_TIMEOUT = 5
MIN_TIMEOUT_ELEC = 300
MAX_TIMEOUT_ELEC = 500
HEARTBEAT_TIMEOUT = 100
LEADER = 10
CANDIDATE = 20
FOLLOWER = 30
VOTED_FOR = "votedFor"
CURRENT_TERM = "currentTerm"
LOG_PERSIST = "log"
FILE_WRITTEN = 0
FILE_ERR = -1
NULL_VOTE = 0
LOG_INVALID_INDEX = -1
LOG_INVALID_TERM = -1
)
// Global variable for generating unique log sequence numbers
var lsn Lsn
// Flag for enabling/disabling logging functionality
var DEBUG = true
// See Log.Append. Implements Error interface.
type ErrRedirect int
//Log sequence number, unique for all time.
type Lsn uint64
// Stores the server information
type ServerConfig struct {
Id int // Id of server. Must be unique
Hostname string // name or ip of host
ClientPort int // port at which server listens to client messages.
LogPort int // tcp port for inter-replica protocol messages.
}
// Stores the replica information of the cluster
type ClusterConfig struct {
Path string // Directory for persistent log
Servers []ServerConfig // All servers in this cluster
}
type ClientAppend struct {
logEntry *LogEntryData
}
type VoteRequest struct {
term int
candidateId int
lastLogIndex int
lastLogTerm int
}
type VoteRequestReply struct {
currentTerm int
reply bool
}
//just alias the babe
type AppendReply VoteRequestReply
type AppendRPC struct {
term int
leaderId int
prevLogIndex int
prevLogTerm int
leaderCommit int
entries []*LogEntryData
}
// Structure used for replying to the RPC calls
type Reply struct {
X int
}
type Timeout struct {
}
type RaftEvent interface {
}
type SharedLog interface {
Append(data []byte, conn net.Conn) (LogEntry, error)
AddToChannel(entry LogEntry)
}
// Raft information
type Raft struct {
LogArray []*LogEntryData // In memory store for log entries
commitCh chan LogEntry // Commit Channel
clusterConfig *ClusterConfig // Cluster
id int // Server id
sync.RWMutex
Info *log.Logger //log for raft instance
eventCh chan RaftEvent //receive events related to various states
votedFor int
currentTerm int
commitIndex int
voters int
monitorVotesCh chan VoteRequestReply
shiftStatusCh chan int
ackCh chan AppendReply
et *time.Timer
isLeader bool
lastApplied int
nextIndex []int
matchIndex []int
}
// Log entry interface
type LogEntry interface {
GetLsn() Lsn // Returns Lsn
GetData() []byte // Returns Data
GetCommitted() bool // Returns committed status
SetCommitted(status bool) // Sets committed status
}
type LogEntryData struct {
Id Lsn // Unique identifier for log entry
Data []byte // Data bytes
Committed bool // Commit status
Term int //term number
conn net.Conn // Connection for communicating with client
}
// Structure for calling commit RPC
type CommitData struct {
Id Lsn
}
func (rft *Raft) persistLog() {
if file, err := os.OpenFile(LOG_PERSIST+strconv.Itoa(rft.id), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0666); err != nil {
rft.Info.Println("error opening log persist file", err.Error())
} else {
defer file.Close()
enc := json.NewEncoder(file)
for _, e := range rft.LogArray {
if err := enc.Encode(e); err != nil {
rft.Info.Println("error persisting log entry", err.Error())
}
}
if err := file.Sync(); err != nil {
rft.Info.Println("error synching log persist file", err.Error())
} else {
rft.Info.Println("log persist success!")
}
}
}
func (rft *Raft) readLogFromDisk() {
rft.LogArray = []*LogEntryData{}
if file, err := os.OpenFile(LOG_PERSIST+strconv.Itoa(rft.id), os.O_RDONLY, 0666); err != nil {
rft.Info.Println("error reading log persist file")
} else {
defer file.Close()
dec := json.NewDecoder(file)
for {
d := LogEntryData{}
if err := dec.Decode(d); err != nil {
rft.Info.Println("done reading log from file")
break
} else {
rft.LogArray = append(rft.LogArray, &d)
}
}
}
}
func getSingleDataFromFile(name string, serverId int, info *log.Logger) int {
filename := name + strconv.Itoa(serverId)
if file, err := os.Open(filename); err != nil {
defer file.Close()
ioutil.WriteFile(filename, []byte("0"), 0666)
info.Println("wrote in " + filename + " file")
return 0
} else {
if data, err := ioutil.ReadFile(file.Name()); err != nil {
info.Println("error reading file " + filename)
return FILE_ERR
} else {
info.Println("read from file " + filename)
if t, err2 := strconv.Atoi(string(data)); err2 != nil {
info.Println("error converting")
return FILE_ERR
} else {
info.Println("Converted success "+filename, t)
return t
}
}
}
}
func writeFile(name string, serverId int, data int, info *log.Logger) int {
filename := name + strconv.Itoa(serverId)
if file, err := os.Open(filename); err != nil {
defer file.Close()
return FILE_ERR
} else {
ioutil.WriteFile(filename, []byte(strconv.Itoa(data)), 0666)
info.Println("wrote in " + filename + " file")
return FILE_WRITTEN //file written
}
}
// Creates a raft object. This implements the SharedLog interface.
// commitCh is the channel that the kvstore waits on for committed messages.
// When the process starts, the local disk log is read and all committed
// entries are recovered and replayed
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, eventCh chan RaftEvent, monitorVotesCh chan bool, toDebug bool) (*Raft, error) {
rft := new(Raft)
rft.commitCh = commitCh
rft.clusterConfig = config
rft.id = thisServerId
rft.eventCh = eventCh
rft.Info = getLogger(thisServerId, toDebug)
if v := getSingleDataFromFile(CURRENT_TERM, thisServerId, rft.Info); v != FILE_ERR {
rft.currentTerm = v
} else {
rft.currentTerm = 0
}
rft.monitorVotesCh = monitorVotesCh
getSingleDataFromFile(VOTED_FOR, thisServerId, rft.Info) //initialize the votedFor file.
rft.isLeader = false
rft.nextIndex = make([]int, len(config.Servers))
rft.matchIndex = make([]int, len(config.Servers))
return rft, nil
}
// Creates a log entry. This implements the LogEntry interface
// data: data bytes, committed: commit status, conn: connection to client
// Returns the log entry
func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData)
entry.Id = lsn
entry.Data = data
entry.conn = conn
entry.Committed = committed
lsn++
return entry
}
// Gets the Lsn
func (entry *LogEntryData) GetLsn() Lsn {
return entry.Id
}
// Get data
func (entry *LogEntryData) GetData() []byte {
return entry.Data
}
// Get committed status
func (entry *LogEntryData) GetCommitted() bool {
return entry.Committed
}
// Sets the committed status
func (entry *LogEntryData) SetCommitted(committed bool) {
entry.Committed = committed
}
//make raft implement the append function
func (rft *Raft) Append(data []byte, conn net.Conn) (LogEntry, error) {
rft.Info.Println("Append Called")
if rft.id != 1 {
return nil, ErrRedirect(1)
}
defer rft.Unlock()
rft.Lock()
temp := NewLogEntry(data, false, conn)
rft.LogArray = append(rft.LogArray, temp)
return temp, nil
}
//AddToCommitChannel
func (rft *Raft) AddToChannel(entry LogEntry) {
rft.Info.Println("Adding to commit", entry)
rft.commitCh <- entry
}
//AddToEventChannel
func (rft *Raft) AddToEventChannel(entry Entry) {
rft.Info.Println("Adding to event channel", entry)
rft.eventCh <- entry
}
//AddToMonitorVotesChannel
func (rft *Raft) AddToMonitorVotesChannel(entry Entry) {
rft.Info.Println("Adding to montor votes", entry)
rft.monitorVotesCh <- entry
}
func NewServerConfig(serverId int) (*ServerConfig, error) {
server := new(ServerConfig)
server.Id = serverId
server.Hostname = "127.0.0.1"
server.ClientPort = CLIENT_PORT + serverId
server.LogPort = LOG_PORT + serverId
return server, nil
}
func NewClusterConfig(num_servers int) (*ClusterConfig, error) {
config := new(ClusterConfig)
config.Path = ""
config.Servers = make([]ServerConfig, num_servers)
for i := 1; i <= num_servers; i++ {
curr_server, _ := NewServerConfig(i)
config.Servers[i-1] = *(curr_server)
}
return config, nil
}
func (e ErrRedirect) Error() string {
return "Redirect to server " + strconv.Itoa(0)
}
func monitorVotesChannelRoutine(rft *Raft) {
majority := len(rft.clusterConfig.Servers) / 2
flag := false
for {
select {
case temp := <-rft.monitorVotesCh:
if temp.reply {
rft.voters++
if !rft.isLeader && rft.voters >= majority {
rft.shiftStatusCh <- LEADER
rft.isLeader = true
}
} else {
if rft.currentTerm < temp.currentTerm {
rft.updateTermAndVote(temp.currentTerm)
rft.shiftStatusCh <- FOLLOWER
}
}
case <-killCh:
flag = true
break
}
if flag {
break
}
}
}
func monitorAckChannel(rft *Raft, killCh chan bool) {
/*func (rft *Raft) replyAppendRPC(reply bool, currentTerm int, fId int) {
if reply {
rft.nextIndex[fId-1] = len(rafts[fId].LogArray)
rft.matchIndex[fId-1] = len(rafts[fId].LogArray)
} else {
rft.nextIndex[fId-1]--
}
}
for {
select {
case temp := <-rft.:
Info.Println("Ack Received:", temp)
acks_received += temp
if acks_received == required_acks {
Info.Println("Majority Achieved", log_entry.(*LogEntryData).Id)
rft.LogArray[log_entry.(*LogEntryData).Id].Committed = true
//Info.Println(rft.LogArray)
rft.commitCh <- log_entry
temp := new(CommitData)
temp.Id = log_entry.(*LogEntryData).Id
for _, server := range rft.clusterConfig.Servers[1:] {
go doCommitRPCCall(server.Hostname, server.LogPort, temp)
}
majCh <- true
err = true
break
}
case <-up:
Info.Println("Error")
err = true
break
}
if err {
break
}
}*/
for{
select{
case temp:=rft.ackCh
}
}
}
//entry loop to raft
func (rft *Raft) loop() {
state := FOLLOWER
for {
switch state {
case FOLLOWER:
state = rft.follower()
case CANDIDATE:
state = rft.candidate()
case LEADER:
state = rft.leader()
}
}
}
func getRandTime(log *log.Logger) time.Duration {
rand.Seed(time.Now().UnixNano())
t := time.Millisecond * time.Duration(rand.Intn(MAX_TIMEOUT_ELEC-MIN_TIMEOUT_ELEC)+MIN_TIMEOUT_ELEC)
//log.Println("New rand time", t)
return t
}
func doCastVoteRPC(hostname string, logPort int, temp *VoteRequestReply) {
Info.Println("Cast vote RPC")
//rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("Calling cast vote RPC", logPort)
castVoteCall := client.Go("RequestVoteReply.CastVoteRPC", args, reply, nil) //let go allocate done channel
castVoteCall = <-castVoteCall.Done
Info.Println("Reply", castVoteCall, reply.X)
}
func doVoteRequestRPC(hostname string, logPort int, temp *VoteRequest) {
Info.Println("Vote request RPC")
//rpc call to the caller
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("Calling vote requesr RPC", logPort)
voteReqCall := client.Go("VoteRequest.VoteRequestRPC", args, reply, nil) //let go allocate done channel
voteReqCall = <-voteReqCall.Done
Info.Println("Reply", voteReqCall, reply.X)
}
//make append entries rpc call to followers
func doAppendRPCCall(hostname string, logPort int, temp *AppendRPC) {
client, err := rpc.Dial("tcp", hostname+":"+strconv.Itoa(logPort))
if err != nil {
Info.Fatal("Dialing:", err)
}
reply := new(Reply)
args := temp
Info.Println("RPC Called", logPort)
appendCall := client.Go("AppendEntries.AppendRPC", args, reply, nil) //let go allocate done channel
appendCall = <-appendCall.Done
Info.Println("Reply", appendCall, reply.X)
}
//receiver is leader
/*func (rft *Raft) replyAppendRPC(reply bool, currentTerm int, fId int) {
if reply {
rft.nextIndex[fId-1] = len(rafts[fId].LogArray)
rft.matchIndex[fId-1] = len(rafts[fId].LogArray)
} else {
rft.nextIndex[fId-1]--
}
}*/
func (rft *Raft) updateTermAndVote(term int) {
writeFile(CURRENT_TERM, rft.id, term, rft.Info)
rft.currentTerm = term
writeFile(VOTED_FOR, rft.id, NULL_VOTE, rft.Info)
rft.votedFor = NULL_VOTE
}
func (rft *Raft) LogF(msg string) {
rft.Info.Println("F:", rft.id, msg)
}
func (rft *Raft) LogC(msg string) {
rft.Info.Println("C:", rft.id, msg)
}
func (rft *Raft) LogL(msg string) {
rft.Info.Println("L:", rft.id, msg)
}
func (rft *Raft) follower() int {
//start candidate timeout
rft.et = time.NewTimer(getRandTime(rft.Info))
for {
//wrap in select
select {
case <-rft.et.C:
rft.LogF("election timeout")
return CANDIDATE
case event := <-rft.eventCh:
switch event.(type) {
case *ClientAppend:
rft.LogF("got client append")
//Do not handle clients in follower mode.
//Send it back up the pipeline.
event.(*ClientAppend).logEntry.SetCommitted(false)
rft.eventCh <- event.(*ClientAppend).logEntry
case *VoteRequest:
rft.LogF("got vote request")
req := event.(*VoteRequest)
reply := false
if req.term < rft.currentTerm {
reply = false
}
if req.term > rft.currentTerm ||
req.lastLogTerm > rft.currentTerm ||
(req.lastLogTerm == rft.currentTerm && req.lastLogIndex >= len(rft.LogArray)) {
rft.updateTermAndVote(req.term)
reply = true
}
if reply && rft.votedFor == NULL_VOTE {
rft.et.Reset(getRandTime(rft.Info))
rft.LogF("reset timer after voting")
writeFile(VOTED_FOR, rft.id, req.candidateId, rft.Info)
rft.LogF("voted for " + strconv.Itoa(req.candidateId))
rft.votedFor = req.candidateId
}
//let the asker know about the vote
voteReply := &VoteRequestReply{rft.currentTerm, reply}
server := rft.clusterConfig[req.candidateId]
doCastVoteRPC(server.Hostname, server.LogPort, voteReply)
case *AppendRPC:
//rft.LogF("got append rpc")
rft.et.Reset(getRandTime(rft.Info))
//rft.LogF("reset timer on appendRPC")
req := event.(*AppendRPC)
if len(req.entries) == 0 { //heartbeat
//rft.LogF("got hearbeat from " + strconv.Itoa(req.leaderId))
continue
}
reply := true
if req.prevLogIndex == LOG_INVALID_INDEX || req.prevLogIndex == LOG_INVALID_TERM {
rft.updateTermAndVote(req.term)
reply = true
} else if req.term < rft.currentTerm {
reply = false
} else if req.term > rft.currentTerm {
rft.updateTermAndVote(req.term)
reply = true
}
//first condition to prevent out of bounds except
if !(req.prevLogIndex == LOG_INVALID_INDEX) && rft.LogArray[req.prevLogIndex].Term != req.prevLogTerm {
rft.LogF("terms unequal")
reply = false
}
if reply {
i := req.prevLogIndex + 1
for ; i < len(rft.LogArray); i++ {
if req.prevLogIndex == LOG_INVALID_INDEX || req.entries[i-req.prevLogIndex-1].Term != rft.LogArray[i].Term {
break
}
}
if req.prevLogIndex == LOG_INVALID_INDEX {
rft.LogArray = append(rft.LogArray, req.entries...)
} else {
rft.LogArray = append(rft.LogArray[0:i], req.entries[i-req.prevLogIndex-1:]...)
}
//todo:also add to log
if req.leaderCommit > rft.commitIndex {
if req.leaderCommit > len(rft.LogArray)-1 {
rft.commitIndex = len(rft.LogArray) - 1
} else {
rft.commitIndex = req.leaderCommit
}
}
}
rafts[req.leaderId].replyAppendRPC(reply, rft.currentTerm, rft.id)
if reply {
rft.persistLog()
}
rft.Info.Println("F: log is size", len(rft.LogArray))
}
}
}
}
func (rft *Raft) candidate() int {
//increment current term
rft.LogC("became candidate")
writeFile(CURRENT_TERM, rft.id, rft.currentTerm+1, rft.Info)
rft.currentTerm++
//vote for self
rft.voters = 1
writeFile(VOTED_FOR, rft.id, rft.id, rft.Info)
rft.votedFor = rft.id
//reset timer
rft.et = time.NewTimer(getRandTime(rft.Info))
rft.Info.Println(rft.id, "candidate got new timer")
//create a vote request object
req := &VoteRequest{
term: rft.currentTerm,
candidateId: rft.id,
}
if len(rft.LogArray) == 0 {
req.lastLogIndex = LOG_INVALID_INDEX
req.lastLogTerm = LOG_INVALID_TERM
} else {
req.lastLogIndex = len(rft.LogArray) - 1
req.lastLogTerm = rft.LogArray[req.lastLogIndex].Term
}
//reinitialize rft.monitorVotesCh
rft.monitorVotesCh = make(chan *VoteRequestReply)
killCh := make(chan bool)
go monitorVotesChannelRoutine(rft, killCh)
//send vote request to all servers
for _, server := range rft.clusterConfig.Servers {
if server.Id != rft.id {
rft.LogC("sent vote request to " + strconv.Itoa(server.Id))
doVoteRequestRPC(server.Hostname, server.LogPort, req)
}
}
for {
select {
case status := <-rft.shiftStatusCh:
if status == LEADER {
rft.LogC("C to L")
killCh <- true
return LEADER
} else {
rft.LogC("C to F")
killCh <- true
return FOLLOWER
}
case <-rft.et.C:
rft.LogC("C to C")
killCh <- true
return CANDIDATE
case event := <-rft.eventCh:
switch event.(type) {
case (*AppendRPC):
rft.LogC("C to F")
rft.et.Reset(getRandTime(rft.Info))
killCh <- true
return FOLLOWER
}
}
}
}
func enforceLog(rft *Raft) {
for {
for _, server := range rft.clusterConfig.Servers {
if rft.id != server.Id && len(rft.LogArray)-1 >= rft.nextIndex[server.Id] {
req := &AppendRPC{}
req.term = rft.currentTerm
req.leaderId = rft.id
req.leaderCommit = rft.commitIndex
req.entries = rft.LogArray[rft.nextIndex[server.Id]:len(rft.LogArray)]
req.prevLogIndex = rft.nextIndex[server.Id] - 1
if req.prevLogIndex <= 0 {
req.prevLogTerm = LOG_INVALID_TERM
} else {
req.prevLogTerm = rft.LogArray[rft.nextIndex[server.Id]-1].Term
}
//appendRPC call
doAppendRPCCall(server.Hostname, server.LogPort, req)
rft.LogL("sent append entries to " + strconv.Itoa(i+1))
}
/*if !rafts[i+1].isLeader && len(rft.LogArray)-1 >= rft.nextIndex[i] {
req := &AppendRPC{}
req.term = rft.currentTerm
req.leaderId = rft.id
req.leaderCommit = rft.commitIndex
req.entries = rft.LogArray[rft.nextIndex[i]:len(rft.LogArray)]
req.prevLogIndex = rft.nextIndex[i] - 1
if req.prevLogIndex <= 0 {
req.prevLogTerm = LOG_INVALID_TERM
} else {
req.prevLogTerm = rft.LogArray[rft.nextIndex[i]-1].Term
}
//send to other rafts
rafts[i+1].eventCh <- req
rft.LogL("sent append entries to " + strconv.Itoa(i+1))
}*/
time.Sleep(time.Millisecond * 2)
}
}
}
func (rft *Raft) leader() int {
rft.LogL("became leader")
heartbeat := time.NewTimer(time.Millisecond * HEARTBEAT_TIMEOUT)
heartbeatReq := new(AppendRPC)
heartbeatReq.entries = []*LogEntryData{}
heartbeatReq.leaderId = rft.id
rft.currentTerm++
rft.LogArray = append(
rft.LogArray,
&LogEntryData{
Id: 1,
Data: []byte("hello"),
Committed: false,
Term: rft.currentTerm,
},
&LogEntryData{
Id: 2,
Data: []byte("world"),
Committed: false,
Term: rft.currentTerm,
})
//build nextIndex and matchIndex
for i := 0; i < len(rft.nextIndex); i++ {
rft.nextIndex[i] = 0
rft.matchIndex[i] = 0
}
go enforceLog(rft)
for {
select {
case <-heartbeat.C:
for _, server := range rft.clusterConfig.Servers {
if server.Id != rft.id {
//doRPCCall for hearbeat
doAppendRPCCall(server.Hostname, server.LogPort, heartbeatReq)
}
}
heartbeat.Reset(time.Millisecond * HEARTBEAT_TIMEOUT)
case event := <-rft.eventCh:
switch event.(type) {
case *ClientAppend:
//write data to log
rft.LogL("got client data")
entry := event.(*ClientAppend).logEntry
rft.LogArray = append(rft.LogArray, entry)
//todo:apply to state machine
//todo:respond to client
case *AppendRPC:
case *VoteRequest:
}
}
}
}
// server.go
package raft
import (
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
)
var rafts map[int]*Raft
func getLogger(serverId int, toDebug bool) (l *log.Logger) {
if !toDebug {
l = log.New(ioutil.Discard, "INFO: ", log.Ltime|log.Lshortfile)
} else {
logf, _ := os.OpenFile(strconv.Itoa(serverId), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
l = log.New(logf, "INFO: ", log.Ltime|log.Lmicroseconds|log.Lshortfile)
}
l.Println("Initialized server.")
return l
}
func Start(serverId int, toDebug bool) {
eventCh := make(chan RaftEvent)
commitCh := make(chan LogEntry)
monitorVotesCh := make(chan bool)
clusterConfig, _ := NewClusterConfig(5)
rft, _ := NewRaft(clusterConfig, serverId, commitCh, eventCh, monitorVotesCh, true)
if rafts == nil {
rafts = make(map[int]*Raft)
}
rafts[serverId] = rft
fmt.Println(len(rafts))
rft.loop()
}
// server.go
package main
import (
"bytes"
"connhandler"
"io/ioutil"
"log"
"net"
"net/rpc"
"os"
"raft"
"strconv"
)
// Logger
var Info *log.Logger
//global raft object for each server instance
var rft *raft.Raft
//Receiver for RPC
type AppendEntries struct{}
//Receiver for voting related RPC
type Voting struct{}
//receiver for testing RPC
//only for testing purpose
type Tester struct{}
//RPC argument for testing the replication of keys value version across key value stores
type TestArgs struct {
key string
value []byte
version uint64
}
// RPC argument with boolean value in the reply to confirm that indeed the replication went through across servers
type TestReply struct {
replica_updated bool
}
//only for testing purpose
//this function checks for the key value in its kvstore and sets reply.replica_updated true if present and false if absent
//arguments: args contains the key, value, version to be matched
//reply is the reply to be sent
func (t *Tester) testerRPC(args *TestArgs, reply *TestReply) error {
table := raft.GetKeyValStr()
table.RLock()
defer table.RUnlock()
dic := table.GetDicKVstr()
if v, ok := dic[args.key]; ok {
the_val := v.GetVal()
the_vers := v.GetVers()
if bytes.Equal(the_val, args.value) && the_vers == args.version {
reply.replica_updated = true
return nil
} else {
return nil
}
} else {
return nil
}
}
type Reply struct {
X int
}
//RPC for follower server. To let followers know that they can append their logs
//arguments: pointer to argument struct (has LogEntryData), pointer to reply struct
//returns: error
//receiver: pointer to AppendEntries
func (t *AppendEntries) AppendRPC(args *raft.AppendRPC, reply *Reply) error {
Info.Println("append RPC invoked")
rft.AddToEventChannel(args)
reply.X = 1
return nil
/*Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
reply.X = 1
return nil*/
}
func (t *AppendEntries) AppendReplyRPC(args *raft.AppendReplyRPC, reply *Reply) error {
Info.Println("append reply to leader RPC invoked")
rft.AddToEventChannel(args)
reply.X = 1
return nil
/*Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
reply.X = 1
return nil*/
}
//RPC for follower server. To let followers know that and entry can be committed.
//arguments: pointer to argument struct (has LogEntry), pointer to reply struct
//returns: error
//receiver: pointer to AppendEntries
func (t *AppendEntries) CommitRPC(args *raft.CommitData, reply *Reply) error {
Info.Println("Commit RPC invoked")
rft.LogArray[(*args).Id].SetCommitted(true)
rft.AddToChannel(rft.LogArray[(*args).Id])
reply.X = 1
return nil
}
func (t *Voting) VoteRequestRPC(args *raft.VoteRequest, reply *Reply) {
Info.Println("Request Vote RPC received from server", id)
rft.AddToEventChannel(args)
reply.X = 1
return nil
}
func (t *Voting) CastVoteRPC(args *raft.VoteRequestReply, reply *Reply) {
Info.Println("Request Vote RPC received from server", id)
rft.AddToMonitorVotesChannel(args)
reply.X = 1
return nil
}
//Initialize all the things necessary for start the server for inter raft communication.
//The servers are running on ports 20000+serverId. {1..5}
//arguments: pointer to current server config, pointer to raft object, a bool channel to set to true to let
//the invoker know that the proc ended.
//returns: none
//receiver: none
func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) {
appendRpc := new(AppendEntries)
rpc.Register(appendRpc)
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.LogPort))
if e != nil {
Info.Fatal("listen error:", e)
}
for {
if conn, err := listener.Accept(); err != nil {
Info.Fatal("accept error: " + err.Error())
} else {
Info.Printf("new connection established\n")
go rpc.ServeConn(conn)
}
}
ch <- true
}
//Simple logger that is enabled or disabled according to the command line arguments. In test cases
//it is redirected to a file per server {1..5}.
//arguments: current server id, toggle enable/disable
//return: none
//receiver: none
func initLogger(serverId int, toDebug bool) {
// Logger Initializaion
if !toDebug {
Info = log.New(ioutil.Discard, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
} else {
Info = log.New(os.Stdout, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
}
Info.Println("Initialized server.")
}
//Initialize all the things necessary for start the server for communication with client.
//The servers are running on ports 9000+serverId {1..5}.
//arguments: pointer to current server config, pointer to raft object, a bool channel to set to true to let
//the invoker know that the proc ended.
//returns: none
//receiver: none
func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) {
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort))
if e != nil {
Info.Fatal("client listen error:", e)
}
for {
if conn, err := listener.Accept(); err != nil {
Info.Fatal("client accept error: " + err.Error())
} else {
Info.Printf("client new connection established\n")
go connhandler.HandleClient(conn, rft, Info)
}
}
ch <- true
}
//Entry point for application. Starts all major server go routines and then waits for ever
func main() {
sid, err := strconv.Atoi(os.Args[1])
ch1 := make(chan bool)
ch2 := make(chan bool)
if err != nil {
Info.Println("argument ", os.Args[1], "is not string")
}
if len(os.Args) > 3 {
initLogger(sid, true)
} else {
initLogger(sid, false)
}
Info.Println("Starting")
serverCount, err2 := strconv.Atoi((os.Args[2]))
if err2 != nil {
Info.Println("argument ", os.Args[2], "is not string")
}
server, _ := raft.NewServerConfig(sid)
clusterConfig, _ := raft.NewClusterConfig(serverCount)
commitCh := make(chan raft.LogEntry)
rft, _ = raft.NewRaft(clusterConfig, sid, commitCh, Info)
raft.InitKVStore(Info, sid)
go raft.MonitorCommitChannel(commitCh) //for kvstore
go initClientCommunication(server, rft, ch1)
go initInterServerCommunication(server, rft, ch2)
for <-ch1 && <-ch2 {
}
}
//testing
package main
import (
"bytes"
//"fmt"
"net"
"net/rpc"
"os"
"os/exec"
"raft"
"strconv"
"testing"
"time"
)
//constant values used
const (
NUM_SERVERS int = 5
)
type Testpair struct {
to_server []byte
from_server []byte
}
//
func TestAll(t *testing.T) {
//start the servers
for i := 1; i <= NUM_SERVERS; i++ {
go startServers(i, t)
}
//wait for some time so that servers are ready
time.Sleep(4 * time.Second)
}
//run servers
func startServers(i int, t *testing.T) {
cmd := exec.Command("go", "run", "server.go", strconv.Itoa(i), strconv.Itoa(NUM_SERVERS), "x")
f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
t.Errorf("error opening file: %v", err)
}
defer f.Close()
cmd.Stdout = f
cmd.Stderr = f
cmd.Run()
}
// utils
package utils
import (
"bytes"
"encoding/gob"
)
//Struct to help extraction of command and value from the Data field of raft.LogEntryData
type Command struct {
Cmd []byte //the command like set .s..
Val []byte //the value the user wants to send
}
//Custom encoder to encode the Command struct into a byte array. gob encoder will call it
//arguments: none
//returns: the byte array for encoded data, error
//receiver: pointer to Command struct
func (d *Command) GobEncode() ([]byte, error) {
w := new(bytes.Buffer)
encoder := gob.NewEncoder(w)
err := encoder.Encode(d.Cmd)
if err != nil {
return nil, err
}
err = encoder.Encode(d.Val)
if err != nil {
return nil, err
}
return w.Bytes(), nil
}
//Custom decoder to decode a byte array with appr data to Command struct. gob decoder will call it.
//arguments: byte array with data to be decoded
//returns: error if any
//receiver: pointer to Command struct
func (d *Command) GobDecode(buf []byte) error {
r := bytes.NewBuffer(buf)
decoder := gob.NewDecoder(r)
err := decoder.Decode(&d.Cmd)
if err != nil {
return err
}
return decoder.Decode(&d.Val)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment