Commit ebffa4a2 authored by nilanjandaw's avatar nilanjandaw

worker UID changed to IP address

UID of worker nodes has been changed to IP, streamlined DM code base
parent 5846c061
{ {
"mqtt_url": "10.129.6.5",
"registry_url" :"10.129.6.5:5000/", "registry_url" :"10.129.6.5:5000/",
"master_port": 8080, "master_port": 8080,
"master_address": "localhost" "master_address": "localhost",
"kafka_host": "10.129.6.5:9092"
} }
\ No newline at end of file
{ {"id":"192.168.31.51","master_node":"10.129.6.5"}
"id": "tpt8hqn7ok" \ No newline at end of file
}
\ No newline at end of file
const mqtt = require('mqtt') 'use strict';
const constants = require(".././constants.json") const constants = require(".././constants.json")
const node_id = require("./config.json").id const config = require('./config.json')
const libSupport = require('./lib') const libSupport = require('./lib')
libSupport.updateConfig()
const node_id = config.id
const execute = require('./execute') const execute = require('./execute')
const fs = require('fs') const fs = require('fs')
const kafka = require('kafka-node')
const local_repository = __dirname + "/local_repository/" const local_repository = __dirname + "/local_repository/"
const host_url = "http://" + constants.master_address + ":" + constants.master_port const host_url = "http://" + constants.master_address + ":" + constants.master_port
let kafka = require('kafka-node'), let Producer = kafka.Producer,
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage, KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
kafkaHost: '10.129.6.5:9092', kafkaHost: constants.kafka_host,
autoConnect: true autoConnect: true
}), }),
producer = new Producer(client), producer = new Producer(client),
Consumer = kafka.Consumer, Consumer = kafka.Consumer
consumer = new Consumer(client,
libSupport.makeTopic(node_id).then(() => {
console.log("node topic created")
let consumer = new Consumer(client,
[ [
{ topic: node_id, partition: 0, offset: 0} { topic: node_id, partition: 0, offset: 0 }
], ],
[ [
{ autoCommit: true } { autoCommit: true }
]) ])
consumer.on('message', function (message) {
consumer.on('message', function (message) {
console.log(message); console.log(message);
let topic = message.topic let topic = message.topic
message = message.value message = message.value
...@@ -37,63 +40,28 @@ consumer.on('message', function (message) { ...@@ -37,63 +40,28 @@ consumer.on('message', function (message) {
if (message.type === "execute") { if (message.type === "execute") {
console.log("function_id", function_id); console.log("function_id", function_id);
if (!fs.existsSync(local_repository + functionHash)) {
libSupport.download(host_url + "/repository/" + functionHash, local_repository + functionHash).then(() => { libSupport.download(host_url + "/repository/" + functionHash, local_repository + functionHash).then(() => {
startWorker(local_repository, functionHash, producer, runtime)
if (runtime === "isolate")
execute.runIsolate(local_repository + functionHash).then(result => {
producer.send([{
topic: "response",
messages: JSON.stringify({
status: "success",
result,
function_id})
}], () =>{})
}) })
else if (runtime === "process")
execute.runProcess(local_repository + functionHash).then(result => {
producer.send(
[{
topic: "response",
messages: JSON.stringify({
status: "success",
result,
function_id})
}], () =>{})
})
else if (runtime === "container")
execute.runContainer(functionHash).then(result => {
producer.send(
[{
topic: "response",
messages: JSON.stringify({
status: "success",
result,
function_id})
}], () =>{})
})
else {
producer.send(
[{
topic: "response",
messages: JSON.stringify({ status: "unknown runtime" })
}], () =>{})
return
} }
}
}) })
} else { })
function startWorker(local_repository, functionHash, producer, runtime) {
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository + functionHash).then(result => { execute.runIsolate(local_repository + functionHash).then(result => {
producer.send( producer.send([{
[{
topic: "response", topic: "response",
messages: JSON.stringify({ messages: JSON.stringify({
status: "success", status: "success",
result, result,
function_id}) function_id
}], () =>{}) })
}], () => { })
}) })
else if (runtime === "process") else if (runtime === "process")
execute.runProcess(local_repository + functionHash).then(result => { execute.runProcess(local_repository + functionHash).then(result => {
...@@ -103,8 +71,9 @@ consumer.on('message', function (message) { ...@@ -103,8 +71,9 @@ consumer.on('message', function (message) {
messages: JSON.stringify({ messages: JSON.stringify({
status: "success", status: "success",
result, result,
function_id}) function_id
}], () =>{}) })
}], () => { })
}) })
else if (runtime === "container") else if (runtime === "container")
execute.runContainer(functionHash).then(result => { execute.runContainer(functionHash).then(result => {
...@@ -114,23 +83,20 @@ consumer.on('message', function (message) { ...@@ -114,23 +83,20 @@ consumer.on('message', function (message) {
messages: JSON.stringify({ messages: JSON.stringify({
status: "success", status: "success",
result, result,
function_id}) function_id
}], () =>{}) })
}], () => { })
}) })
else { else {
producer.send( producer.send(
[{ [{
topic: "response", topic: "response",
messages: JSON.stringify({ status: "unknown runtime" }) messages: JSON.stringify({ status: "unknown runtime" })
}], () =>{}) }], () => { })
return
}
}
return
} }
} }
})
function heartbeat() { function heartbeat() {
let payload = [{ let payload = [{
......
var http = require('http'); const http = require('http');
var fs = require('fs'); const fs = require('fs');
const process = require('process')
const { spawnSync } = require('child_process');
const constants = require(".././constants.json")
const kafka = require('kafka-node')
function updateConfig() {
console.log("Retrieving primary IP");
let file = JSON.parse(fs.readFileSync('./config.json', { encoding: 'utf-8' }))
const getIP = spawnSync("ip", ["route", "get", file.master_node]);
let err = getIP.stderr.toString().trim()
if (err !== '') {
console.log(err);
process.exit(1);
}
let data = getIP.stdout.toString().trim().split(' ')
file.id = data[6]
fs.writeFileSync('./config.json', JSON.stringify(file));
console.log("Updated Config file");
}
function makeTopic(id) {
console.log("Using Primary IP", id, "as topic");
let client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
autoConnect: true
}),
Producer = kafka.Producer,
producer = new Producer(client)
return new Promise((resolve, reject) => {
producer.send([{
topic: id,
messages: JSON.stringify({
status: "success",
})
}], (err, data) => {
if (err)
reject();
else
resolve();
})
})
}
var download = function (url, dest, cb) { var download = function (url, dest, cb) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
console.log(url); console.log(url);
if (!fs.existsSync(local_repository + functionHash)) {
var file = fs.createWriteStream(dest); var file = fs.createWriteStream(dest);
var request = http.get(url, function (response) { var request = http.get(url, function (response) {
response.pipe(file); response.pipe(file);
...@@ -17,6 +59,9 @@ var download = function (url, dest, cb) { ...@@ -17,6 +59,9 @@ var download = function (url, dest, cb) {
if (cb) cb(err.message); if (cb) cb(err.message);
reject(err); reject(err);
}); });
} else {
resolve();
}
}) })
}; };
...@@ -31,6 +76,6 @@ function makeid(length) { ...@@ -31,6 +76,6 @@ function makeid(length) {
return result; return result;
} }
module.exports = {
module.exports.download = download download, makeid, updateConfig, makeTopic
module.exports.makeid = makeid; }
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment