Commit 5d3d4f56 authored by nilanjandaw's avatar nilanjandaw

streamlined the dispatch pipeline

parent 7f78ed74
...@@ -12,6 +12,7 @@ const heap = require('heap'); ...@@ -12,6 +12,7 @@ const heap = require('heap');
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const swStats = require('swagger-stats'); const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json'); const apiSpec = require('./swagger.json');
const util = require('util')
/** /**
* URL to the couchdb database server used to store function metadata * URL to the couchdb database server used to store function metadata
...@@ -19,6 +20,9 @@ const apiSpec = require('./swagger.json'); ...@@ -19,6 +20,9 @@ const apiSpec = require('./swagger.json');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.function_db_name + "/" metadataDB = metadataDB + "/" + constants.function_db_name + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
const logger = libSupport.logger const logger = libSupport.logger
...@@ -256,68 +260,14 @@ function dispatch() { ...@@ -256,68 +260,14 @@ function dispatch() {
if (!db.has(functionHash + runtime)) { if (!db.has(functionHash + runtime)) {
db.set(functionHash + runtime, []) db.set(functionHash + runtime, [])
db.get(functionHash + runtime).push({ req, res }) db.get(functionHash + runtime).push({ req, res })
let resource_id = libSupport.makeid(constants.id_size) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
/**
* Request RM for resource
*/
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
}))
resourceMap.set(resource_id, { let payload = [{
runtime, functionHash, port: null, node_id: null, topic: constants.topics.hscale,
deployed: false, deploy_request_time: Date.now() messages: JSON.stringify({ runtime, functionHash })
})
let payloadToRM = [{
topic: constants.topics.request_dm_2_rm, // changing from REQUEST_DM_2_RM
messages: JSON.stringify({
resource_id,
"memory": 332,
timestamp: Date.now()
}),
partition: 0
}] }]
producer.send(payloadToRM, () => { producer.send(payload, function () { })
// db.set(functionHash + runtime, { req, res })
console.log("sent rm");
})
/**
* Speculative deployment:
* If function MLE path is present then deploy those parts of the path which are
* not already running
*/
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node })
}]
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
}
}
}
}
speculative_deployment(req, runtime)
} else { } else {
logger.info("deployment process already started waiting") logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res }) db.get(functionHash + runtime).push({ req, res })
...@@ -470,15 +420,17 @@ consumer.on('message', function (message) { ...@@ -470,15 +420,17 @@ consumer.on('message', function (message) {
let resource_id = libSupport.makeid(constants.id_size), // each function resource request is associated with an unique ID let resource_id = libSupport.makeid(constants.id_size), // each function resource request is associated with an unique ID
runtime = message.runtime, runtime = message.runtime,
functionHash = message.functionHash functionHash = message.functionHash
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
console.log("Resource Status: ", functionToResource); console.log("Resource Status: ", functionToResource);
/**
* Request RM for resource
*/
logger.info("Requesting RM " + JSON.stringify({ logger.info("Requesting RM " + JSON.stringify({
resource_id, resource_id,
"memory": 332, "memory": 332,
})) }))
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, { resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null, runtime, functionHash, port: null, node_id: null,
deployed: false, deploy_request_time: Date.now() deployed: false, deploy_request_time: Date.now()
...@@ -553,6 +505,53 @@ function autoscalar() { ...@@ -553,6 +505,53 @@ function autoscalar() {
} }
/**
* Speculative deployment:
* If function MLE path is present then deploy those parts of the path which are
* not already running
*
* FIXME: Currently supports homogenous runtime chain i.e takes runtime as a param.
* Change it to also profile runtime
*/
async function speculative_deployment(req, runtime) {
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
for (let node of branchInfo.mle_path)
node.id = node.node
let metrics = await libSupport.fetchData(metricsDB + "_bulk_get", {
method: 'post',
body: JSON.stringify({
docs: branchInfo.mle_path
}),
headers: { 'Content-Type': 'application/json' },
})
console.log(util.inspect(metrics, false, null, true /* enable colors */))
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node })
}]
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
}
}
}
}
}
setInterval(libSupport.viterbi, 1000, functionBranchTree) setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000); setInterval(autoscalar, 1000);
setInterval(dispatch, 1000); setInterval(dispatch, 1000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment