Commit fb183429 authored by nilanjandaw's avatar nilanjandaw

multi node setup tryout

parent 451319e4
'use strict';
const mqtt = require('mqtt') const mqtt = require('mqtt')
const constants = require(".././constants.json") const constants = require(".././constants.json")
console.log(constants.mqtt_url); console.log(constants.mqtt_url);
...@@ -9,7 +7,7 @@ const libSupport = require('./lib') ...@@ -9,7 +7,7 @@ const libSupport = require('./lib')
const execute = require('./execute') const execute = require('./execute')
const fs = require('fs') const fs = require('fs')
const node_id = "20sez54hq8" const node_id = libSupport.makeid(10)
const local_repository = __dirname + "/local_repository/" const local_repository = __dirname + "/local_repository/"
const host_url = "http://" + constants.master_address + ":" + constants.master_port const host_url = "http://" + constants.master_address + ":" + constants.master_port
client.on('connect', function () { client.on('connect', function () {
...@@ -22,10 +20,9 @@ client.on('connect', function () { ...@@ -22,10 +20,9 @@ client.on('connect', function () {
}) })
client.on('message', function (topic, message) { client.on('message', function (topic, message) {
// message is Buffer
message = JSON.parse(message) message = JSON.parse(message)
if (message.type !== 'heartbeat') { if (topic !== 'heartbeat') {
let runtime = message.runtime let runtime = message.runtime
let functionHash = message.functionHash let functionHash = message.functionHash
let function_id = message.function_id let function_id = message.function_id
...@@ -102,7 +99,7 @@ client.on('message', function (topic, message) { ...@@ -102,7 +99,7 @@ client.on('message', function (topic, message) {
}) })
function heartbeat() { function heartbeat() {
client.publish(node_id, JSON.stringify({"type": "heartbeat"})) client.publish("heartbeat", JSON.stringify({"address": node_id}))
} }
setInterval(heartbeat, 10000); setInterval(heartbeat, 1000);
\ No newline at end of file \ No newline at end of file
...@@ -19,4 +19,16 @@ var download = function (url, dest, cb) { ...@@ -19,4 +19,16 @@ var download = function (url, dest, cb) {
}; };
function makeid(length) {
var result = '';
var characters = 'abcdefghijklmnopqrstuvwxyz0123456789';
var charactersLength = characters.length;
for (var i = 0; i < length; i++) {
result += characters.charAt(Math.floor(Math.random() * charactersLength));
}
return result;
}
module.exports.download = download module.exports.download = download
module.exports.makeid = makeid;
\ No newline at end of file
...@@ -22,8 +22,8 @@ app.use('/repository', express.static(file_path)); ...@@ -22,8 +22,8 @@ app.use('/repository', express.static(file_path));
app.use(fileUpload()) app.use(fileUpload())
let requestQueue = [] let requestQueue = []
let workerNodes = []
const node_id = "20sez54hq8"
const WINDOW_SIZE = 10 const WINDOW_SIZE = 10
const port = 8080 const port = 8080
const registry_url = constants.registry_url const registry_url = constants.registry_url
...@@ -141,7 +141,7 @@ function dispatch() { ...@@ -141,7 +141,7 @@ function dispatch() {
let functionHash = req.params.id let functionHash = req.params.id
let function_id = libSupport.makeid(20) let function_id = libSupport.makeid(20)
console.log("Dispatching function with Id", function_id); console.log("Dispatching function with Id", function_id);
let node_id = getAddress()
client.publish(node_id, JSON.stringify({ client.publish(node_id, JSON.stringify({
"type": "execute", "type": "execute",
function_id, function_id,
...@@ -152,6 +152,10 @@ function dispatch() { ...@@ -152,6 +152,10 @@ function dispatch() {
} }
} }
function getAddress() {
return items[Math.floor(Math.random() * items.length)];
}
client.on('message', function (topic, message) { client.on('message', function (topic, message) {
...@@ -164,6 +168,12 @@ client.on('message', function (topic, message) { ...@@ -164,6 +168,12 @@ client.on('message', function (topic, message) {
"reply": message.result "reply": message.result
}) })
db.delete(message.function_id) db.delete(message.function_id)
} else if (topic === "heartbeat") {
message = JSON.parse(message)
if (workerNodes.indexOf(newItem) === -1)
workerNodes.push(newItem)
console.log(workerNodes);
} }
}) })
...@@ -171,6 +181,7 @@ app.listen(port, () => console.log(`Server listening on port ${port}!`)) ...@@ -171,6 +181,7 @@ app.listen(port, () => console.log(`Server listening on port ${port}!`))
client.on('connect', function () { client.on('connect', function () {
client.subscribe("response") client.subscribe("response")
client.subscribe("heartbeat")
}) })
setInterval(dispatch, 1000); setInterval(dispatch, 1000);
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment