Commit 4948a724 authored by Sushant Mahajan's avatar Sushant Mahajan

Merge branch 'master' of git.cse.iitb.ac.in:smahajan/cs733

parents 6bb021c1 2103549e
...@@ -189,6 +189,10 @@ func isValid(cmd string, tokens []string, conn net.Conn) int { ...@@ -189,6 +189,10 @@ func isValid(cmd string, tokens []string, conn net.Conn) int {
return 0 return 0
} }
/*Function monitors the channel for committed log entries approved by majority
*arguments: Commit channel
*return: none
*/
func MonitorCommitChannel(ch chan LogEntry) { func MonitorCommitChannel(ch chan LogEntry) {
for { for {
temp := <-ch temp := <-ch
...@@ -511,6 +515,10 @@ func Debug() { ...@@ -511,6 +515,10 @@ func Debug() {
logger.Println("----end debug----") logger.Println("----end debug----")
} }
/*Function that initializes the KV Store
*arguments: Logger
*return: none
*/
func InitKVStore(log *log.Logger) { func InitKVStore(log *log.Logger) {
logger = log logger = log
......
...@@ -19,15 +19,19 @@ const ( ...@@ -19,15 +19,19 @@ const (
// Logger // Logger
var Info *log.Logger var Info *log.Logger
// Global variable for generating unique log sequence numbers
var lsn Lsn var lsn Lsn
// Flag for enabling/disabling logging functionality // Flag for enabling/disabling logging functionality
var DEBUG = true var DEBUG = true
type ErrRedirect int // See Log.Append. Implements Error interface. // See Log.Append. Implements Error interface.
type ErrRedirect int
type Lsn uint64 //Log sequence number, unique for all time. //Log sequence number, unique for all time.
type Lsn uint64
// Stores the server information
type ServerConfig struct { type ServerConfig struct {
Id int // Id of server. Must be unique Id int // Id of server. Must be unique
Hostname string // name or ip of host Hostname string // name or ip of host
...@@ -35,6 +39,7 @@ type ServerConfig struct { ...@@ -35,6 +39,7 @@ type ServerConfig struct {
LogPort int // tcp port for inter-replica protocol messages. LogPort int // tcp port for inter-replica protocol messages.
} }
// Stores the replica information of the cluster
type ClusterConfig struct { type ClusterConfig struct {
Path string // Directory for persistent log Path string // Directory for persistent log
Servers []ServerConfig // All servers in this cluster Servers []ServerConfig // All servers in this cluster
...@@ -44,32 +49,36 @@ type SharedLog interface { ...@@ -44,32 +49,36 @@ type SharedLog interface {
Append(data []byte) (LogEntry, error) Append(data []byte) (LogEntry, error)
} }
// Raft information
type Raft struct { type Raft struct {
LogArray []*LogEntryData LogArray []*LogEntryData // In memory store for log entries
commitCh chan LogEntry commitCh chan LogEntry // Commit Channel
clusterConfig *ClusterConfig //cluster clusterConfig *ClusterConfig // Cluster
id int //this server id id int // Server id
sync.RWMutex sync.RWMutex
} }
// Log entry interface
type LogEntry interface { type LogEntry interface {
GetLsn() Lsn GetLsn() Lsn // Returns Lsn
GetData() []byte GetData() []byte // Returns Data
GetCommitted() bool GetCommitted() bool // Returns committed status
SetCommitted(status bool) SetCommitted(status bool) // Sets committed status
} }
type LogEntryData struct { type LogEntryData struct {
Id Lsn Id Lsn // Unique identifier for log entry
Data []byte Data []byte // Data bytes
Committed bool Committed bool // Commit status
conn net.Conn conn net.Conn // Connection for communicating with client
} }
// Structure used for replying to the RPC calls
type Reply struct { type Reply struct {
X int X int
} }
// Structure for registering RPC methods
type AppendEntries struct{} type AppendEntries struct{}
var cluster_config *ClusterConfig var cluster_config *ClusterConfig
...@@ -78,6 +87,10 @@ func GetClusterConfig() *ClusterConfig { ...@@ -78,6 +87,10 @@ func GetClusterConfig() *ClusterConfig {
return cluster_config return cluster_config
} }
// Creates a raft object. This implements the SharedLog interface.
// commitCh is the channel that the kvstore waits on for committed messages.
// When the process starts, the local disk log is read and all committed
// entries are recovered and replayed
func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, logger *log.Logger) (*Raft, error) { func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, logger *log.Logger) (*Raft, error) {
rft := new(Raft) rft := new(Raft)
rft.commitCh = commitCh rft.commitCh = commitCh
...@@ -89,6 +102,9 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, lo ...@@ -89,6 +102,9 @@ func NewRaft(config *ClusterConfig, thisServerId int, commitCh chan LogEntry, lo
return rft, nil return rft, nil
} }
// Creates a log entry. This implements the LogEntry interface
// data: data bytes, committed: commit status, conn: connection to client
// Returns the log entry
func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData { func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
entry := new(LogEntryData) entry := new(LogEntryData)
entry.Id = lsn entry.Id = lsn
...@@ -99,7 +115,7 @@ func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData { ...@@ -99,7 +115,7 @@ func NewLogEntry(data []byte, committed bool, conn net.Conn) *LogEntryData {
return entry return entry
} }
//goroutine that monitors channel to check if the majority of servers have replied // Goroutine that monitors channel to check if the majority of servers have replied
func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) { func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh chan bool) {
acks_received := 0 acks_received := 0
num_servers := len(rft.clusterConfig.Servers) num_servers := len(rft.clusterConfig.Servers)
...@@ -138,19 +154,22 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c ...@@ -138,19 +154,22 @@ func monitorAckChannel(rft *Raft, ack_ch <-chan int, log_entry LogEntry, majCh c
} }
} }
//make LogEntryData implement the LogEntry Interface // Gets the Lsn
func (entry *LogEntryData) GetLsn() Lsn { func (entry *LogEntryData) GetLsn() Lsn {
return entry.Id return entry.Id
} }
// Get data
func (entry *LogEntryData) GetData() []byte { func (entry *LogEntryData) GetData() []byte {
return entry.Data return entry.Data
} }
// Get committed status
func (entry *LogEntryData) GetCommitted() bool { func (entry *LogEntryData) GetCommitted() bool {
return entry.Committed return entry.Committed
} }
// Sets the committed status
func (entry *LogEntryData) SetCommitted(committed bool) { func (entry *LogEntryData) SetCommitted(committed bool) {
entry.Committed = committed entry.Committed = committed
} }
......
...@@ -45,7 +45,7 @@ func TestAll(t *testing.T) { ...@@ -45,7 +45,7 @@ func TestAll(t *testing.T) {
//run servers //run servers
func testServersCommunic(i int, t *testing.T) { func testServersCommunic(i int, t *testing.T) {
cmd := exec.Command("go", "run", "server.go", strconv.Itoa(i), strconv.Itoa(NUM_SERVERS)) cmd := exec.Command("go", "run", "server.go", strconv.Itoa(i), strconv.Itoa(NUM_SERVERS), "x")
f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) f, err := os.OpenFile(strconv.Itoa(i), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil { if err != nil {
t.Errorf("error opening file: %v", err) t.Errorf("error opening file: %v", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment