Commit f0280c29 authored by Nilanjan Daw's avatar Nilanjan Daw

Merging RM functionality

adding RM interfaces with DM
parent 551148c1
{"id":"192.168.31.51","master_node":"10.129.6.5"} {"id":"10.196.11.241","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
...@@ -17,7 +17,8 @@ const libSupport = require('./lib') ...@@ -17,7 +17,8 @@ const libSupport = require('./lib')
* TODO: change this to hold a list of mappings of horizontal scaling * TODO: change this to hold a list of mappings of horizontal scaling
*/ */
let functionToPort = new Map(), let functionToPort = new Map(),
usedPort = new Map() // TODO: remove after integration with RM usedPort = new Map(), // TODO: remove after integration with RM
rmQueue = new Map()
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -33,8 +34,8 @@ let kafka = require('kafka-node'), ...@@ -33,8 +34,8 @@ let kafka = require('kafka-node'),
{ topic: 'response', partition: 0, offset: 0}, { topic: 'response', partition: 0, offset: 0},
{ topic: 'heartbeat' }, { topic: 'heartbeat' },
{ topic: "deployed" }, { topic: "deployed" },
{ topic: "removeWorker" } { topic: "removeWorker" },
// { topic: "RESPONSE_ARBITER_2_DISPATCHER"} { topic: "RESPONSE_ARBITER_2_DISPATCHER"}
], ],
[ [
{ autoCommit: true } { autoCommit: true }
...@@ -197,6 +198,7 @@ function dispatch() { ...@@ -197,6 +198,7 @@ function dispatch() {
let function_id = libSupport.makeid(20) // each function resource request is associated with an unique ID let function_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
console.log("Dispatching function with Id", function_id, runtime); console.log("Dispatching function with Id", function_id, runtime);
let node_id = getAddress() // Requests the RM for address and other metadata for function placement let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let payload = [{ let payload = [{
topic: node_id, topic: node_id,
messages: JSON.stringify({ messages: JSON.stringify({
...@@ -207,8 +209,20 @@ function dispatch() { ...@@ -207,8 +209,20 @@ function dispatch() {
}), }),
partition: 0 partition: 0
}] }]
producer.send(payload, () => {}) rmQueue.set(function_id, payload)
db.set(functionHash + runtime, {req, res}) let payloadToRM = [{
topic: "REQUEST_DISPATCHER_2_ARBITER",
messages: JSON.stringify({
"id": function_id,
"memory": 332,
}),
partition: 0
}]
producer.send(payloadToRM, () => {
db.set(functionHash + runtime, { req, res })
})
} }
} }
...@@ -220,7 +234,7 @@ function getAddress() { ...@@ -220,7 +234,7 @@ function getAddress() {
app.listen(port, () => console.log(`Server listening on port ${port}!`)) app.listen(port, () => console.log(`Server listening on port ${port}!`))
consumer.on('message', function (message) { consumer.on('message', function (message) {
// console.log(message);
let topic = message.topic let topic = message.topic
message = message.value message = message.value
if (topic === "response") { if (topic === "response") {
...@@ -267,8 +281,14 @@ consumer.on('message', function (message) { ...@@ -267,8 +281,14 @@ consumer.on('message', function (message) {
usedPort.delete(message.port) usedPort.delete(message.port)
functionToPort.delete(message.functionHash + message.runtime) functionToPort.delete(message.functionHash + message.runtime)
} else if (topic == "RESPONSE_ARBITER_2_DISPATCHER") { } else if (topic == "RESPONSE_ARBITER_2_DISPATCHER") {
message = JSON.parse(message)
console.log(message); console.log(message);
let payload = rmQueue.get(message.id)
payload[0].topic = getAddress()
console.log(payload);
producer.send(payload, () => { })
} }
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment