Commit 7fc8376d authored by nilanjandaw's avatar nilanjandaw

kafka node integration started

started listener for kafka node in dispatcher, but not working correctly
parent f7a0de90
Design decisions to be considered:
1. Container: copy executable file inside prebuilt images.
2. Container: run warm request based container server instead of current arch
possibly will require request redirection.
3. Move away from mosquitto to Apache Kafka / Zookeeper
4. Implement heartbeat based daemon monitor.
...@@ -8,10 +8,28 @@ const fs = require('fs') ...@@ -8,10 +8,28 @@ const fs = require('fs')
const { spawn } = require('child_process'); const { spawn } = require('child_process');
const morgan = require('morgan') const morgan = require('morgan')
const mqtt = require('mqtt') const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://' + constants.mqtt_url) // const client = mqtt.connect('mqtt://' + constants.mqtt_url)
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({
kafkaHost: '10.129.6.5:9092',
autoConnect: true
}),
producer = new Producer(client),
Consumer = kafka.Consumer,
consumer = new Consumer(client,
[
{ topic: 'response', partition: 0, offset: 0}, { topic: 'heartbeat' }
],
[
{ autoCommit: false },
{ fromOffset: true}
])
let db = new Map() let db = new Map()
app.use(morgan('combined')) app.use(morgan('combined'))
app.use(bodyParser.urlencoded({ extended: true })) app.use(bodyParser.urlencoded({ extended: true }))
...@@ -158,31 +176,35 @@ function getAddress() { ...@@ -158,31 +176,35 @@ function getAddress() {
} }
client.on('message', function (topic, message) { // client.on('message', function (topic, message) {
if (topic === "response") { // if (topic === "response") {
message = JSON.parse(message) // message = JSON.parse(message)
console.log(message); // console.log(message);
let res = db.get(message.function_id) // let res = db.get(message.function_id)
res.json({ // res.json({
"status": "success", // "status": "success",
"reply": message.result // "reply": message.result
}) // })
db.delete(message.function_id) // db.delete(message.function_id)
} else if (topic === "heartbeat") { // } else if (topic === "heartbeat") {
message = JSON.parse(message) // message = JSON.parse(message)
if (workerNodes.indexOf(message.address) === -1) { // if (workerNodes.indexOf(message.address) === -1) {
workerNodes.push(message.address) // workerNodes.push(message.address)
console.log(workerNodes); // console.log(workerNodes);
} // }
} // }
}) // })
app.listen(port, () => console.log(`Server listening on port ${port}!`)) app.listen(port, () => console.log(`Server listening on port ${port}!`))
client.on('connect', function () { // client.on('connect', function () {
client.subscribe("response") // client.subscribe("response")
client.subscribe("heartbeat") // client.subscribe("heartbeat")
}) // })
consumer.on('message', function (message) {
console.log(message);
});
setInterval(dispatch, 1000); setInterval(dispatch, 1000);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment