Commit 83e0dc1b authored by Nilanjan Daw's avatar Nilanjan Daw

Moved primary identification to resource ID. Added better logging

parent 407c256c
...@@ -22,7 +22,6 @@ let functionToResource = new Map(), // TODO: make the resource a list for horizo ...@@ -22,7 +22,6 @@ let functionToResource = new Map(), // TODO: make the resource a list for horizo
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host, kafkaHost: constants.kafka_host,
autoConnect: true autoConnect: true
...@@ -170,7 +169,8 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -170,7 +169,8 @@ app.post('/serverless/execute/:id', (req, res) => {
*/ */
let forwardTo = functionToResource.get(req.params.id + runtime) let forwardTo = functionToResource.get(req.params.id + runtime)
let resource = resourceMap.get(forwardTo.resource_id) let resource = resourceMap.get(forwardTo.resource_id)
logger.info("resource found", forwardTo, resource); logger.info("resource found " + JSON.stringify(forwardTo) +
" forwarding via reverse proxy to: " + JSON.stringify(resource));
libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`) libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`)
} else { } else {
...@@ -195,19 +195,16 @@ function dispatch() { ...@@ -195,19 +195,16 @@ function dispatch() {
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length) let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
for (let i = 0; i < lookbackWindow; i++) { for (let i = 0; i < lookbackWindow; i++) {
let {req, res} = requestQueue.shift() let {req, res} = requestQueue.shift()
logger.info(req.body) // logger.info(req.body)
let runtime = req.body.runtime let runtime = req.body.runtime
let functionHash = req.params.id let functionHash = req.params.id
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
logger.info("Dispatching function with Id", resource_id, runtime); logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
let node_id = getAddress() // Requests the RM for address and other metadata for function placement let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM
resourceMap.set(resource_id, {
runtime, functionHash, port, node_id
})
logger.info(resourceMap);
let payload = [{ let payload = [{
topic: node_id, topic: node_id,
messages: JSON.stringify({ messages: JSON.stringify({
...@@ -218,17 +215,25 @@ function dispatch() { ...@@ -218,17 +215,25 @@ function dispatch() {
}), }),
partition: 0 partition: 0
}] }]
/** uncomment when RM is unavailable */
producer.send(payload, () => { })
db.set(functionHash + runtime, { req, res })
logger.info("Requesting RM " + JSON.stringify({ logger.info("Requesting RM " + JSON.stringify({
resource_id, resource_id,
"memory": 332, "memory": 332,
})) }))
/** uncomment when RM is available
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port, node_id
})
logger.info(resourceMap);
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
db.set(resource_id, { req, res })
/** uncomment when RM is available, TODO: also update resourceMap
rmQueue.set(resource_id, payload) rmQueue.set(resource_id, payload)
let payloadToRM = [{ let payloadToRM = [{
topic: "REQUEST_DM_2_RM", topic: "REQUEST_DM_2_RM",
...@@ -255,15 +260,15 @@ consumer.on('message', function (message) { ...@@ -255,15 +260,15 @@ consumer.on('message', function (message) {
let topic = message.topic let topic = message.topic
message = message.value message = message.value
if (topic === "response") { if (topic === "response") {
logger.info("response", message); logger.info("response " + message);
} else if (topic === "heartbeat") { } else if (topic === "heartbeat") {
message = JSON.parse(message) message = JSON.parse(message)
if (Date.now() - message.timestamp < 300) if (Date.now() - message.timestamp < 1000)
if (workerNodes.indexOf(message.address) === -1) { if (workerNodes.indexOf(message.address) === -1) {
workerNodes.push(message.address) workerNodes.push(message.address)
logger.warn(workerNodes); logger.warn("New worker discovered. Worker List: " + workerNodes)
} }
} else if (topic == "deployed") { } else if (topic == "deployed") {
try { try {
...@@ -272,10 +277,10 @@ consumer.on('message', function (message) { ...@@ -272,10 +277,10 @@ consumer.on('message', function (message) {
// process.exit(0) // process.exit(0)
} }
logger.info("deployed", message); logger.info("Deployed Resource: " + JSON.stringify(message));
if (db.has(message.functionHash + message.runtime)) { if (db.has(message.resource_id)) {
let { req, res } = db.get(message.functionHash + message.runtime) let { req, res } = db.get(message.resource_id)
functionToResource.set(message.functionHash + message.runtime, { functionToResource.set(message.functionHash + message.runtime, {
resource_id: message.resource_id resource_id: message.resource_id
}) })
...@@ -285,23 +290,26 @@ consumer.on('message', function (message) { ...@@ -285,23 +290,26 @@ consumer.on('message', function (message) {
messages: JSON.stringify({ messages: JSON.stringify({
resource_id: message.resource_id, resource_id: message.resource_id,
node_id: resource.node_id, node_id: resource.node_id,
runtime, runtime: resource.runtime,
function_id: functionHash, function_id: resource.functionHash,
"reason": "deployment", "reason": "deployment",
"status": true, "status": true,
"timestamp": date.toISOString() "timestamp": date.toISOString()
}), }),
partition: 0 partition: 0
}] }]
producer.send(confirmRM, () => {}) producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
libSupport.reverseProxy(req, res, libSupport.reverseProxy(req, res,
`http://${resource.node_id}:${resource.port}/serverless/function/execute`) `http://${resource.node_id}:${resource.port}/serverless/function/execute`)
.then(() => { .then(() => {
db.delete(message.functionHash + message.runtime) db.delete(message.resource_id)
}) })
} }
} else if (topic == "removeWorker") { } else if (topic == "removeWorker") {
logger.info("removing metadata", message); logger.info("Worker blown: Removing Metadata " + message);
try { try {
message = JSON.parse(message) message = JSON.parse(message)
} catch(e) { } catch(e) {
...@@ -321,7 +329,12 @@ consumer.on('message', function (message) { ...@@ -321,7 +329,12 @@ consumer.on('message', function (message) {
if (payload != null) { if (payload != null) {
payload[0].topic = message.nodes[0] payload[0].topic = message.nodes[0]
logger.info(payload); logger.info(payload);
/** get port and other resources
resourceMap.set(resource_id, {
runtime, functionHash, port, node_id
})
logger.info(resourceMap);
*/
producer.send(payload, () => { }) producer.send(payload, () => { })
} else { } else {
logger.error("something went wrong"); logger.error("something went wrong");
......
...@@ -44,7 +44,7 @@ function makeid(length) { ...@@ -44,7 +44,7 @@ function makeid(length) {
function reverseProxy(req, res, url, tryout) { function reverseProxy(req, res, url, tryout) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
console.log("requesting reverseproxy"); logger.info("Request received at reverseproxy. Forwarding to: " + url);
var options = { var options = {
method: 'POST', method: 'POST',
...@@ -55,7 +55,7 @@ function makeid(length) { ...@@ -55,7 +55,7 @@ function makeid(length) {
rp(options) rp(options)
.then(function (parsedBody) { .then(function (parsedBody) {
console.log("parsed body:", parsedBody); // console.log("parsed body:", parsedBody);
res.json(parsedBody) res.json(parsedBody)
resolve() resolve()
}) })
...@@ -63,7 +63,7 @@ function makeid(length) { ...@@ -63,7 +63,7 @@ function makeid(length) {
if (err.error.errno === "ECONNREFUSED") { if (err.error.errno === "ECONNREFUSED") {
reverseProxy(req, res, url, (tryout != null) ? tryout + 1 : 1) reverseProxy(req, res, url, (tryout != null) ? tryout + 1 : 1)
} else { } else {
console.log("error", err.error.errno); logger.error("error", err.error.errno);
res.json(err.message).status(err.statusCode) res.json(err.message).status(err.statusCode)
resolve() resolve()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment