Commit 39e6edb7 authored by Sushant Mahajan's avatar Sushant Mahajan

added documentation for code

parent aafa7ab8
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
/* /*
*Helper function to read value or cause timeout after READ_TIMEOUT seconds *Helper function to read value or cause timeout after READ_TIMEOUT seconds
*parameters: channel to read data from, threshold number of bytes to read *parameters: channel to read data from, threshold number of bytes to read, log pointer to write into
*returns: the value string and error state *returns: the value string and error state
*/ */
func readValue(ch chan []byte, n uint64, logger *log.Logger) ([]byte, bool) { func readValue(ch chan []byte, n uint64, logger *log.Logger) ([]byte, bool) {
...@@ -59,7 +59,8 @@ func readValue(ch chan []byte, n uint64, logger *log.Logger) ([]byte, bool) { ...@@ -59,7 +59,8 @@ func readValue(ch chan []byte, n uint64, logger *log.Logger) ([]byte, bool) {
return v[:n], err return v[:n], err
} }
/*Copied from the bufio.Scanner (originally ScanLines). By default it splits by '\n' but now we want it to split by '\r\n' /*Copied from the bufio.Scanner (originally ScanLines).
*By default it splits by '\n' but now we want it to split by '\r\n'
*arguments: data in bytes, is eof reached *arguments: data in bytes, is eof reached
*return: next sequence of bytes, chunk of data found, err state *return: next sequence of bytes, chunk of data found, err state
*/ */
...@@ -92,7 +93,8 @@ func CustomSplitter(data []byte, atEOF bool) (advance int, token []byte, err err ...@@ -92,7 +93,8 @@ func CustomSplitter(data []byte, atEOF bool) (advance int, token []byte, err err
} }
/*Function to read data from the connection and put it on the channel so it could be read in a systematic fashion. /*Function to read data from the connection and put it on the channel so it could be read in a systematic fashion.
*arguments: channel shared between this go routine and other functions performing actions based on the commands given, client connection *arguments: channel shared between this go routine and other functions performing actions based on the commands given,
*client connection
*return: none *return: none
*/ */
func MyRead(ch chan []byte, conn net.Conn) { func MyRead(ch chan []byte, conn net.Conn) {
...@@ -118,6 +120,11 @@ func Write(conn net.Conn, msg string) { ...@@ -118,6 +120,11 @@ func Write(conn net.Conn, msg string) {
conn.Write(buf) conn.Write(buf)
} }
/*Will be invoked as go routine by server to every client connection. Will take care of all communication with the
*client and the raft/kvstore
*arguments: connection to client, pointer to raft, pointer to logger
*return: none
*/
func HandleClient(conn net.Conn, rft *raft.Raft, logger *log.Logger) { func HandleClient(conn net.Conn, rft *raft.Raft, logger *log.Logger) {
defer conn.Close() defer conn.Close()
//channel for every connection for every client //channel for every connection for every client
......
...@@ -15,29 +15,45 @@ import ( ...@@ -15,29 +15,45 @@ import (
// Logger // Logger
var Info *log.Logger var Info *log.Logger
//global raft object for each server instance
var rft *raft.Raft var rft *raft.Raft
//Receiver fot RPC
type AppendEntries struct{} type AppendEntries struct{}
//encapsulate the return value of RPC
type Reply struct { type Reply struct {
X int X int
} }
//RPC for follower server. To let followers know that they can append their logs
//arguments: pointer to argument struct (has LogEntryData), pointer to reply struct
//returns: error
//receiver: pointer to AppendEntries
func (t *AppendEntries) AppendEntriesRPC(args *raft.LogEntryData, reply *Reply) error { func (t *AppendEntries) AppendEntriesRPC(args *raft.LogEntryData, reply *Reply) error {
Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted()) Info.Println("Append Entries RPC invoked", (*args).GetLsn(), (*args).GetData(), (*args).GetCommitted())
rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil)) rft.LogArray = append(rft.LogArray, raft.NewLogEntry((*args).GetData(), (*args).GetCommitted(), nil))
reply.X = 1 reply.X = 1
return nil return nil
} }
func (t *AppendEntries) CommitRPC(args *raft.LogEntry, reply *Reply) error { //RPC for follower server. To let followers know that and entry can be committed.
//arguments: pointer to argument struct (has LogEntry), pointer to reply struct
//returns: error
//receiver: pointer to AppendEntries
func (t *AppendEntries) CommitRPC(args *raft.LogEntryData, reply *Reply) error {
Info.Println("Commit RPC invoked") Info.Println("Commit RPC invoked")
rft.LogArray[(*args).GetLsn()].SetCommitted(true) rft.LogArray[(*args).GetLsn()].SetCommitted(true)
reply.X = 1 reply.X = 1
return nil return nil
} }
//Initialize all the things necessary for start the server for inter raft communication.
//The servers are running on ports 20000+serverId. {1..5}
//arguments: pointer to current server config, pointer to raft object, a bool channel to set to true to let
//the invoker know that the proc ended.
//returns: none
//receiver: none
func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) { func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) {
appendRpc := new(AppendEntries) appendRpc := new(AppendEntries)
rpc.Register(appendRpc) rpc.Register(appendRpc)
...@@ -56,7 +72,11 @@ func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch ...@@ -56,7 +72,11 @@ func initInterServerCommunication(server *raft.ServerConfig, rft *raft.Raft, ch
ch <- true ch <- true
} }
// Initialize Logger //Simple logger that is enabled or disabled according to the command line arguments. In test cases
//it is redirected to a file per server {1..5}.
//arguments: current server id, toggle enable/disable
//return: none
//receiver: none
func initLogger(serverId int, toDebug bool) { func initLogger(serverId int, toDebug bool) {
// Logger Initializaion // Logger Initializaion
if !toDebug { if !toDebug {
...@@ -68,6 +88,12 @@ func initLogger(serverId int, toDebug bool) { ...@@ -68,6 +88,12 @@ func initLogger(serverId int, toDebug bool) {
Info.Println("Initialized server.") Info.Println("Initialized server.")
} }
//Initialize all the things necessary for start the server for communication with client.
//The servers are running on ports 9000+serverId {1..5}.
//arguments: pointer to current server config, pointer to raft object, a bool channel to set to true to let
//the invoker know that the proc ended.
//returns: none
//receiver: none
func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) { func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan bool) {
listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort)) listener, e := net.Listen("tcp", ":"+strconv.Itoa(server.ClientPort))
if e != nil { if e != nil {
...@@ -84,6 +110,7 @@ func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan ...@@ -84,6 +110,7 @@ func initClientCommunication(server *raft.ServerConfig, rft *raft.Raft, ch chan
ch <- true ch <- true
} }
//Entry point for application. Starts all major server go routines and then waits for ever
func main() { func main() {
sid, err := strconv.Atoi(os.Args[1]) sid, err := strconv.Atoi(os.Args[1])
ch1 := make(chan bool) ch1 := make(chan bool)
......
...@@ -6,11 +6,16 @@ import ( ...@@ -6,11 +6,16 @@ import (
"encoding/gob" "encoding/gob"
) )
//Struct to help extraction of command and value from the Data field of raft.LogEntryData
type Command struct { type Command struct {
Cmd []byte Cmd []byte //the command like set .s..
Val []byte Val []byte //the value the user wants to send
} }
//Custom encoder to encode the Command struct into a byte array. gob encoder will call it
//arguments: none
//returns: the byte array for encoded data, error
//receiver: pointer to Command struct
func (d *Command) GobEncode() ([]byte, error) { func (d *Command) GobEncode() ([]byte, error) {
w := new(bytes.Buffer) w := new(bytes.Buffer)
encoder := gob.NewEncoder(w) encoder := gob.NewEncoder(w)
...@@ -25,6 +30,10 @@ func (d *Command) GobEncode() ([]byte, error) { ...@@ -25,6 +30,10 @@ func (d *Command) GobEncode() ([]byte, error) {
return w.Bytes(), nil return w.Bytes(), nil
} }
//Custom decoder to decode a byte array with appr data to Command struct. gob decoder will call it.
//arguments: byte array with data to be decoded
//returns: error if any
//receiver: pointer to Command struct
func (d *Command) GobDecode(buf []byte) error { func (d *Command) GobDecode(buf []byte) error {
r := bytes.NewBuffer(buf) r := bytes.NewBuffer(buf)
decoder := gob.NewDecoder(r) decoder := gob.NewDecoder(r)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment