Commit b1a0f94b authored by nilanjandaw's avatar nilanjandaw

moving internal pub/sub to kafka

initial move complete for internal pub/sub to Kafka - needs testing
parent e63226ee
...@@ -2,5 +2,5 @@ ...@@ -2,5 +2,5 @@
"mqtt_url": "10.129.6.5", "mqtt_url": "10.129.6.5",
"registry_url" :"10.129.6.5:5000/", "registry_url" :"10.129.6.5:5000/",
"master_port": 8080, "master_port": 8080,
"master_address": "10.129.6.5" "master_address": "localhost"
} }
\ No newline at end of file
{
"id": "tpt8hqn7ok"
}
\ No newline at end of file
const mqtt = require('mqtt') const mqtt = require('mqtt')
const constants = require(".././constants.json") const constants = require(".././constants.json")
console.log(constants.mqtt_url); const node_id = require("./config.json").id
const client = mqtt.connect('mqtt://' + constants.mqtt_url)
const libSupport = require('./lib') const libSupport = require('./lib')
const execute = require('./execute') const execute = require('./execute')
const fs = require('fs') const fs = require('fs')
const node_id = libSupport.makeid(10)
const local_repository = __dirname + "/local_repository/" const local_repository = __dirname + "/local_repository/"
const host_url = "http://" + constants.master_address + ":" + constants.master_port const host_url = "http://" + constants.master_address + ":" + constants.master_port
client.on('connect', function () {
client.subscribe(node_id, function (err) {
if (!err) {
console.log("node listening to id", node_id);
}
})
})
client.on('message', function (topic, message) { let kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({
kafkaHost: '10.129.6.5:9092',
autoConnect: true
}),
producer = new Producer(client),
Consumer = kafka.Consumer,
consumer = new Consumer(client,
[
{ topic: node_id, partition: 0, offset: 0}
],
[
{ autoCommit: true }
])
consumer.on('message', function (message) {
console.log(message);
let topic = message.topic
message = message.value
message = JSON.parse(message) message = JSON.parse(message)
if (topic !== 'heartbeat') { if (topic !== 'heartbeat') {
let runtime = message.runtime let runtime = message.runtime
...@@ -34,30 +42,43 @@ client.on('message', function (topic, message) { ...@@ -34,30 +42,43 @@ client.on('message', function (topic, message) {
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository + functionHash).then(result => { execute.runIsolate(local_repository + functionHash).then(result => {
client.publish("response", JSON.stringify({ producer.send([{
status: "success", topic: "response",
result, messages: JSON.stringify({
function_id status: "success",
})) result,
function_id})
}], () =>{})
}) })
else if (runtime === "process") else if (runtime === "process")
execute.runProcess(local_repository + functionHash).then(result => { execute.runProcess(local_repository + functionHash).then(result => {
client.publish("response", JSON.stringify({ producer.send(
status: "success", [{
result, topic: "response",
function_id messages: JSON.stringify({
})) status: "success",
result,
function_id})
}], () =>{})
}) })
else if (runtime === "container") else if (runtime === "container")
execute.runContainer(functionHash).then(result => { execute.runContainer(functionHash).then(result => {
client.publish("response", JSON.stringify({ producer.send(
status: "success", [{
result, topic: "response",
function_id messages: JSON.stringify({
})) status: "success",
result,
function_id})
}], () =>{})
}) })
else { else {
client.publish("response", JSON.stringify({ status: "unknown runtime" })) producer.send(
[{
topic: "response",
messages: JSON.stringify({ status: "unknown runtime" })
}], () =>{})
return return
} }
}) })
...@@ -65,30 +86,43 @@ client.on('message', function (topic, message) { ...@@ -65,30 +86,43 @@ client.on('message', function (topic, message) {
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository + functionHash).then(result => { execute.runIsolate(local_repository + functionHash).then(result => {
client.publish("response", JSON.stringify({ producer.send(
status: "success", [{
result, topic: "response",
function_id messages: JSON.stringify({
})) status: "success",
result,
function_id})
}], () =>{})
}) })
else if (runtime === "process") else if (runtime === "process")
execute.runProcess(local_repository + functionHash).then(result => { execute.runProcess(local_repository + functionHash).then(result => {
client.publish("response", JSON.stringify({ producer.send(
status: "success", [{
result, topic: "response",
function_id messages: JSON.stringify({
})) status: "success",
result,
function_id})
}], () =>{})
}) })
else if (runtime === "container") else if (runtime === "container")
execute.runContainer(functionHash).then(result => { execute.runContainer(functionHash).then(result => {
client.publish("response", JSON.stringify({ producer.send(
status: "success", [{
result, topic: "response",
function_id messages: JSON.stringify({
})) status: "success",
result,
function_id})
}], () =>{})
}) })
else { else {
client.publish("response", JSON.stringify({ status: "unknown runtime" })) producer.send(
[{
topic: "response",
messages: JSON.stringify({ status: "unknown runtime" })
}], () =>{})
return return
} }
} }
...@@ -99,7 +133,13 @@ client.on('message', function (topic, message) { ...@@ -99,7 +133,13 @@ client.on('message', function (topic, message) {
}) })
function heartbeat() { function heartbeat() {
client.publish("heartbeat", JSON.stringify({"address": node_id})) let payload = [{
topic: "heartbeat",
messages: JSON.stringify({"address": node_id})
}]
producer.send(payload, function() {
})
} }
setInterval(heartbeat, 1000); setInterval(heartbeat, 1000);
\ No newline at end of file
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
"express": "^4.17.1", "express": "^4.17.1",
"express-fileupload": "^1.1.6", "express-fileupload": "^1.1.6",
"isolated-vm": "^3.0.0", "isolated-vm": "^3.0.0",
"kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"redis": "^2.8.0" "redis": "^2.8.0"
......
...@@ -26,8 +26,7 @@ let kafka = require('kafka-node'), ...@@ -26,8 +26,7 @@ let kafka = require('kafka-node'),
{ topic: 'response', partition: 0, offset: 0}, { topic: 'heartbeat' } { topic: 'response', partition: 0, offset: 0}, { topic: 'heartbeat' }
], ],
[ [
{ autoCommit: false }, { autoCommit: true }
{ fromOffset: true}
]) ])
let db = new Map() let db = new Map()
...@@ -161,11 +160,16 @@ function dispatch() { ...@@ -161,11 +160,16 @@ function dispatch() {
let function_id = libSupport.makeid(20) let function_id = libSupport.makeid(20)
console.log("Dispatching function with Id", function_id, runtime); console.log("Dispatching function with Id", function_id, runtime);
let node_id = getAddress() let node_id = getAddress()
client.publish(node_id, JSON.stringify({ let payload = [{
"type": "execute", topic: node_id,
function_id, messages: JSON.stringify({
runtime, functionHash "type": "execute",
})) function_id,
runtime, functionHash
}),
partition: 0
}]
producer.send(payload, () => {})
db.set(function_id, res) db.set(function_id, res)
} }
...@@ -204,7 +208,25 @@ app.listen(port, () => console.log(`Server listening on port ${port}!`)) ...@@ -204,7 +208,25 @@ app.listen(port, () => console.log(`Server listening on port ${port}!`))
// }) // })
consumer.on('message', function (message) { consumer.on('message', function (message) {
console.log(message); // console.log(message);
let topic = message.topic
message = message.value
if (topic === "response") {
message = JSON.parse(message)
console.log(message);
let res = db.get(message.function_id)
res.json({
"status": "success",
"reply": message.result
})
db.delete(message.function_id)
} else if (topic === "heartbeat") {
message = JSON.parse(message)
if (workerNodes.indexOf(message.address) === -1) {
workerNodes.push(message.address)
console.log(workerNodes);
}
}
}); });
setInterval(dispatch, 1000); setInterval(dispatch, 2000);
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
"express": "^4.17.1", "express": "^4.17.1",
"express-fileupload": "^1.1.6", "express-fileupload": "^1.1.6",
"isolated-vm": "^3.0.0", "isolated-vm": "^3.0.0",
"kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"nano": "^8.1.0", "nano": "^8.1.0",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment