fully working gossip protocol

parent 0c4d374f
...@@ -36,7 +36,7 @@ class Network(): ...@@ -36,7 +36,7 @@ class Network():
with open('Nodes.dat', 'r') as f: with open('Nodes.dat', 'r') as f:
reader = csv.reader(f) reader = csv.reader(f)
for row in reader: for row in reader:
node = row[0] # NOde id node = row[0] # Node id
xnode = {node:OverlayNode(node,self,w[i])} xnode = {node:OverlayNode(node,self,w[i])}
self.pk_weight_map.update({xnode[node].pk:w[i]}) self.pk_weight_map.update({xnode[node].pk:w[i]})
self.nodes.update(xnode)#added node to network self.nodes.update(xnode)#added node to network
...@@ -46,7 +46,13 @@ class Network(): ...@@ -46,7 +46,13 @@ class Network():
with open('Links.dat', 'r') as f: with open('Links.dat', 'r') as f:
reader = csv.reader(f) reader = csv.reader(f)
for row in reader: for row in reader:
link = tuple(row[0].split()) # tuple of a link tup = row[0].split()
link = None # tuple of a link
if int(tup[0]) < int(tup[1]):
link = (tup[0],tup[1])
else:
link = (tup[1],tup[0])
linkDelay = 1 linkDelay = 1
self.links.update({link:Link(link,self,linkDelay)}) self.links.update({link:Link(link,self,linkDelay)})
...@@ -58,6 +64,11 @@ class Network(): ...@@ -58,6 +64,11 @@ class Network():
pass pass
def getLink(self,tup):
if int(tup[0]) < int(tup[1]) :
return self.links.get(tup)
else:
return self.links.get((tup[1],tup[0]))
def simulate(self,time): def simulate(self,time):
# logger.info("simulating .. ") # logger.info("simulating .. ")
......
from SystemEntity import SystemEntity from SystemEntity import SystemEntity
import logging import logging
import Utility import Utility
import hashlib
class Node(SystemEntity): class Node(SystemEntity):
def __init__(self,id,network,weight): def __init__(self,id,network,weight):
...@@ -18,6 +19,8 @@ class Node(SystemEntity): ...@@ -18,6 +19,8 @@ class Node(SystemEntity):
"nextOn":self.nextOn "nextOn":self.nextOn
} }
self.messagesTillNow = []
self.uniqueReceivedMessages = [] #this is a list of messages in buffer which were not processed
def enqueMessage(self,message): def enqueMessage(self,message):
...@@ -43,23 +46,39 @@ class Node(SystemEntity): ...@@ -43,23 +46,39 @@ class Node(SystemEntity):
:param params: ('1','2','hello2') :param params: ('1','2','hello2')
:return: :return:
''' '''
link = self.network.links.get((params[0],params[1])) link = self.network.getLink((params[0],params[1]))
link.enqueMessage((time,params)) link.enqueMessage((time,params))
# any message going out is recorded as sent and will be checked for duplicates
self.messagesTillNow.append(hashlib.sha256((str(params[2])).encode('utf-8')).hexdigest())
self.logger.info("Sending : " + str((time,params))) self.logger.info("Sending : " + str((time,params)))
pass pass
def broadcast(self,time,gossipPayload): def broadcast(self,time,gossipPayload):
for node in self.adjacentNodes: '''
link = self.network.links.get((self.id, node.id))
:param time:
:param gossipPayload:
:return:
broadcast and processMessage together makes it gossip protocol
'''
for nodeid in self.adjacentNodes:
node = self.network.nodes.get(nodeid)
link = self.network.getLink((self.id, node.id))
link.enqueMessage((time, (self.id, node.id,gossipPayload))) link.enqueMessage((time, (self.id, node.id,gossipPayload)))
self.logger.info("Sending : " + str((time, (self.id, node.id,gossipPayload)))) self.logger.info("Sending : " + str((time, (self.id, node.id,gossipPayload))))
def processMessage(self,time,payload): def processMessage(self,time,payload):
print("fianlly reached : " + str(payload))
'''remove any extra headers like source and destination here ''' '''remove any extra headers like source and destination here '''
payload.pop(0) # removed source header payload.pop(0) # removed source header
payload.pop(0) #removed destination header payload.pop(0) #removed destination header
# TODO check messages for duplicates and brodacast them : Done
if not (hashlib.sha256((str(payload[0])).encode('utf-8')).hexdigest() in self.messagesTillNow) :
self.messagesTillNow.append(hashlib.sha256((str(payload[0])).encode('utf-8')).hexdigest())
self.broadcast(time,payload[0])
self.uniqueReceivedMessages.append(payload[0])
def nextOn(self,time,generator): def nextOn(self,time,generator):
next(generator[0]) next(generator[0])
...@@ -98,6 +117,8 @@ class Node(SystemEntity): ...@@ -98,6 +117,8 @@ class Node(SystemEntity):
pass pass
except StopIteration as si: except StopIteration as si:
pass pass
except IndexError as Ie:
self.logger.info("No resume task from ba*")
...@@ -108,14 +129,9 @@ class Node(SystemEntity): ...@@ -108,14 +129,9 @@ class Node(SystemEntity):
for task in tasks: for task in tasks:
command = self.commands.get(task[0]) command = self.commands.get(task[0])
out = command(time,task[1:]) out = command(time,task[1:])
if out and out[0] == "resume":
self.logger.info("resume has been called")
self.todoList[time+1].append(("nextOn", self.startNodeLifeCycleGenerator,))
except KeyError as ke: except KeyError as ke:
# not task pending at this time # not task pending at this time
pass pass
except StopIteration as si:
pass
......
...@@ -9,14 +9,13 @@ class OverlayNode(Node): ...@@ -9,14 +9,13 @@ class OverlayNode(Node):
def __init__(self,id,network,weight): def __init__(self,id,network,weight):
Node.__init__(self,id,network,weight) Node.__init__(self,id,network,weight)
self.startNodeLifeCycleGenerator = self.startNodeLifeCycle() self.startNodeLifeCycleGenerator = self.startNodeLifeCycle()
# adding initial task of bootstraping node. # adding initial task of bootstraping node.and it only contains ba* resume tasks
self.ResumeTasks.append(("nextOn", self.startNodeLifeCycleGenerator,)) self.ResumeTasks.append(("nextOn", self.startNodeLifeCycleGenerator,))
# self.todoList['0'].append(("nextOn", self.startNodeLifeCycleGenerator,))
self.commands.update({}) self.commands.update({})
def processMessage(self,time,payload): # def processMessage(self,time,payload):
super(OverlayNode, self).processMessage(time,payload) # super(OverlayNode, self).processMessage(time,payload)
print("from overlay node : "+str(payload)) # print("from overlay node : "+str(payload))
def startNodeLifeCycle(self): def startNodeLifeCycle(self):
'''THis is monolithic algorand algorithm for a NOde''' '''THis is monolithic algorand algorithm for a NOde'''
...@@ -26,45 +25,45 @@ class OverlayNode(Node): ...@@ -26,45 +25,45 @@ class OverlayNode(Node):
#increment round number #increment round number
# Checking if I am a Block Propser # # Checking if I am a Block Propser
previousHash = hashlib.sha256(genesisBlock.encode('utf-8')).hexdigest() # previousHash = hashlib.sha256(genesisBlock.encode('utf-8')).hexdigest()
currentRound = 0 # currentRound = 0
step = 0 # step = 0
blockHeight = 0 # blockHeight = 0
seed = ( previousHash,currentRound,blockHeight ) # seed = ( previousHash,currentRound,blockHeight )
roleCount = 20 # roleCount = 20
role = "BLOCK_PROPOSER" # role = "BLOCK_PROPOSER"
w = self.weight # w = self.weight
badaW = 100 # badaW = 100
#
hash, proof, j = Utility.sortition(self.sk,seed,roleCount, role, w, self.network.badaW, self.pk) # hash, proof, j = Utility.sortition(self.sk,seed,roleCount, role, w, self.network.badaW, self.pk)
print("I am the mighty J =",j) # print("I am the mighty J =",j)
if j > 0 : # if j > 0 :
min_sha_priority = None # min_sha_priority = None
# min_sub_user_index = None # # min_sub_user_index = None
# # msg_to_broadcast = ( currentRound, hash , min_sub_user_index , min_sha_priority )
# print(w)
# print(range(w))
# for sub_user_index in range(w):
# input_to_SHA256 = (hash , sub_user_index,) # TODO : can concatenation means this
# sha256_hash = hashlib.sha256((str(input_to_SHA256)).encode('utf-8')).hexdigest()
# print(sha256_hash)
# if not min_sha_priority :
# min_sha_priority = sha256_hash
# min_sub_user_index = sub_user_index
# if sha256_hash < min_sha_priority:
# min_sha_priority = sha256_hash
# min_sub_user_index = sub_user_index
#
#
# msg_to_broadcast = ( currentRound, hash , min_sub_user_index , min_sha_priority ) # msg_to_broadcast = ( currentRound, hash , min_sub_user_index , min_sha_priority )
print(w) # print(msg_to_broadcast)
print(range(w)) # self.logger.info(msg_to_broadcast)
for sub_user_index in range(w): #
input_to_SHA256 = (hash , sub_user_index,) # TODO : can concatenation means this # print("i am before yield 1",self.id)
sha256_hash = hashlib.sha256((str(input_to_SHA256)).encode('utf-8')).hexdigest()
print(sha256_hash)
if not min_sha_priority :
min_sha_priority = sha256_hash
min_sub_user_index = sub_user_index
if sha256_hash < min_sha_priority:
min_sha_priority = sha256_hash
min_sub_user_index = sub_user_index
msg_to_broadcast = ( currentRound, hash , min_sub_user_index , min_sha_priority )
print(msg_to_broadcast)
self.logger.info(msg_to_broadcast)
print("i am before yield 1",self.id)
yield "resume" yield "resume"
print("i am before yield 2", self.id) # print("i am before yield 2", self.id)
yield # yield
''' '''
TODo check if node is selected as BLOCK_PROPOSER TODo check if node is selected as BLOCK_PROPOSER
......
...@@ -12,6 +12,6 @@ if __name__ == '__main__': ...@@ -12,6 +12,6 @@ if __name__ == '__main__':
metronome = TimeSimulator() metronome = TimeSimulator()
network = Network() network = Network()
network.setupNetwork() network.setupNetwork()
populateTodolist(network) # populateTodolist(network)
metronome.startTicking(network.simulate) metronome.startTicking(network.simulate)
pass pass
\ No newline at end of file
...@@ -45,6 +45,8 @@ if __name__ == '__main__': ...@@ -45,6 +45,8 @@ if __name__ == '__main__':
# simulateNetwork(None) # simulateNetwork(None)
ak = ['hello','ramesh'] ak = ['hello','ramesh']
ak.append('rao') ak.append('rao')
if 'hello' in ak:
print("yay")
print(ak.pop()) print(ak.pop())
pass pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment