Commit 6c6e37e5 authored by Nilanjan Daw's avatar Nilanjan Daw

Speculative function chain Deployment. Issue #23

Added support for speculative deployment of function chains. If function MLE path is present then deploy those parts of the path which are  not already running to avoid cascading cold starts.
parent 57ce14d2
......@@ -17,5 +17,6 @@
},
"autoscalar_metrics": {
"open_request_threshold": 100
}
},
"speculative_deployment": true
}
\ No newline at end of file
......@@ -23,12 +23,12 @@ let date = new Date();
let log_channel = constants.log_channel
let usedPort = new Map(), // TODO: remove after integration with RM
rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation
db = new Map(), // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map(), // a function to resource map. Each map contains a minheap of
// resources associated with the function
workerNodes = new Map()
workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -207,8 +207,9 @@ app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
if (functionToResource.has(id)) {
libSupport.reverseProxy(req, res, functionToResource, resourceMap)
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
} else {
/**
* Requests are queued up before being dispatched. To prevent requests coming in for the
......@@ -254,7 +255,8 @@ function dispatch() {
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null
runtime, functionHash, port: null, node_id: null,
deployed: false
})
......@@ -270,8 +272,39 @@ function dispatch() {
producer.send(payloadToRM, () => {
// db.set(functionHash + runtime, { req, res })
console.log("sent rm");
})
/**
* Speculative deployment:
* If function MLE path is present then deploy those parts of the path which are
* not already running
*/
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node })
}]
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
}
}
}
}
} else {
logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res })
......@@ -321,8 +354,10 @@ function postDeploy(message) {
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
}
let resource = resourceMap.get(message.resource_id)
try {
let resource = resourceMap.get(message.resource_id)
resource.deployed = true
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
......@@ -347,7 +382,7 @@ function postDeploy(message) {
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap)
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
.then(() => {
})
......@@ -425,7 +460,8 @@ consumer.on('message', function (message) {
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null
runtime, functionHash, port: null, node_id: null,
deployed: false
})
......@@ -486,7 +522,7 @@ function autoscalar() {
let resource = resourceMap.get(resourceList[resourceList.length - 1].resource_id)
logger.warn(`resource ${resourceList[resourceList.length - 1]} exceeded autoscalar threshold. Scaling up!`)
let payload = [{
topic: "hscale",
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": resource.runtime, "functionHash": resource.functionHash })
}]
producer.send(payload, function () { })
......@@ -495,7 +531,7 @@ function autoscalar() {
}
setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
......@@ -7,7 +7,6 @@ const constants = require('.././constants.json')
const { createLogger, format, transports } = winston;
const heap = require('heap')
functionBranchTree = new Map() // a tree to store function branch predictions
/**
* Generates unique IDs of arbitrary length
......@@ -46,8 +45,8 @@ function generateExecutor(functionPath, functionHash) {
return hash
}
function reverseProxy(req, res, functionToResource, resourceMap) {
branchChainPredictor(req, resourceMap)
function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) {
branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree)
return new Promise((resolve, reject) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
......@@ -140,7 +139,7 @@ function compare(a, b) {
return a.open_request_count - b.open_request_count
}
function branchChainPredictor(req, resourceMap) {
function branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree) {
// console.log(req.headers['x-resource-id']);
if (req.headers['x-resource-id'] === undefined) {
......@@ -191,13 +190,14 @@ function branchChainPredictor(req, resourceMap) {
}
}
console.log("branch tree", functionBranchTree);
// console.log("branch tree", functionBranchTree);
}
function viterbi() {
let path = []
function viterbi(functionBranchTree) {
functionBranchTree.forEach((metadata, node) => {
if (metadata.parent) {
if (metadata.parent && metadata.req_count % 5 == 0) {
let path = []
let parents = [[node, {
prob: 1,
metadata
......@@ -252,15 +252,17 @@ function viterbi() {
path.push({node: maxSibling, probability: maxProb})
siblings = new Map()
}
if (path.length > 0)
console.log("path", path);
metadata.mle_path = path
}
});
if (path.length > 0)
console.log("path", path);
}
setInterval(viterbi, 5000)
module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger, compare
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment