Commit ce3d339d authored by NILANJAN DAW's avatar NILANJAN DAW

restructuring resource manager

parent f738058f
let workerNodes = {}, timeline = {}
const constants = require('../constants.json')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client),
Consumer = kafka.Consumer,
consumer = new Consumer(client,
[
{ topic: 'heartbeat' }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: "request" } // receives deployment details from RM
],
[
{ autoCommit: true }
])
function getAddress() {
}
consumer.on('message', function (message) {
let topic = message.topic
message = message.value
if (topic !== "heartbeat")
console.log(message);
if (topic === "heartbeat") {
message = JSON.parse(message)
if (Date.now() - message.timestamp < 1000)
if (!workerNodes[message.address]) {
workerNodes[message.address] = message
console.log("New worker discovered. Worker List: ")
console.log(workerNodes);
}
} else if (topic === "request") {
message = JSON.parse(message)
console.log(message);
let payload = [{
topic: "RESPONSE_RM_2_DM_DUMMY",
messages: JSON.stringify({
"resource_id": message.resource_id,
"timestamp": Date.now(),
"nodes": [
{ node_id: getAddress(), port: null }]
}),
partition: 0
}]
producer.send(payload, () => {
console.log(`Replied`)
})
}
})
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment