Commit 8b4f1d1a authored by Nilanjan Daw's avatar Nilanjan Daw

Viterbi algorithm

The probability model generated so far is generative in nature, in the sense that it tries to model the true probability distribution of the incoming requests. After a large number of requests have been received we can expect to have a reasonable MLE model of the probability distribution. We will then use the viterbi algorithm to detect the most likely path to be taken by a request. This can be used to preallocate the resources in the functions chain span to reduce cascading cold starts.
Path detection has been implemented.
TODO: resource pre-allocation
parent 4d37177c
{"id":"10.196.6.51","master_node":"10.129.6.5"} {"id":"192.168.31.51","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
...@@ -57,7 +57,7 @@ function runProcess(local_repository, metadata) { ...@@ -57,7 +57,7 @@ function runProcess(local_repository, metadata) {
}); });
process.on('close', (code) => { process.on('close', (code) => {
resolve(code); resolve(process.pid);
logger.info(`Process Environment with resource_id ${resource_id} blown`); logger.info(`Process Environment with resource_id ${resource_id} blown`);
}); });
}) })
...@@ -100,7 +100,7 @@ function runContainer(metadata) { ...@@ -100,7 +100,7 @@ function runContainer(metadata) {
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference); logger.info("container run time taken: ", timeDifference);
result += data; result += data;
resolve(result); resolve(resource_id);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
...@@ -126,7 +126,7 @@ function runContainer(metadata) { ...@@ -126,7 +126,7 @@ function runContainer(metadata) {
logger.info(`stdout: ${data}`); logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference); logger.info("container run time taken: ", timeDifference);
resolve(result); resolve(resource_id);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
......
...@@ -55,7 +55,7 @@ var download = function (url, dest, cb) { ...@@ -55,7 +55,7 @@ var download = function (url, dest, cb) {
console.log(url); console.log(url);
if (!fs.existsSync(dest)) { if (!fs.existsSync(dest)) {
var file = fs.createWriteStream(dest); var file = fs.createWriteStream(dest);
var request = http.get(url, function (response) { var request = https.get(url, function (response) {
response.pipe(file); response.pipe(file);
file.on('finish', function () { file.on('finish', function () {
file.close(cb); // close() is async, call cb after close completes. file.close(cb); // close() is async, call cb after close completes.
......
...@@ -264,6 +264,7 @@ function dispatch() { ...@@ -264,6 +264,7 @@ function dispatch() {
messages: JSON.stringify({ messages: JSON.stringify({
resource_id, resource_id,
"memory": 332, "memory": 332,
timestamp: Date.now()
}), }),
partition: 0 partition: 0
}] }]
...@@ -327,12 +328,14 @@ function postDeploy(message) { ...@@ -327,12 +328,14 @@ function postDeploy(message) {
topic: log_channel, topic: log_channel,
messages: JSON.stringify({ messages: JSON.stringify({
resource_id: message.resource_id, resource_id: message.resource_id,
// type: "deployment_launch",
node_id: resource.node_id, node_id: resource.node_id,
runtime: resource.runtime, runtime: resource.runtime,
function_id: resource.functionHash, function_id: resource.functionHash,
entity_id: message.entity_id,
"reason": "deployment", "reason": "deployment",
"status": true, "status": true,
"timestamp": date.toISOString() "timestamp": Date.now()
}), }),
partition: 0 partition: 0
}] }]
...@@ -362,6 +365,7 @@ consumer.on('message', function (message) { ...@@ -362,6 +365,7 @@ consumer.on('message', function (message) {
let topic = message.topic let topic = message.topic
message = message.value message = message.value
// console.log(topic, message)
if (topic === "response") { if (topic === "response") {
logger.info("response " + message); logger.info("response " + message);
...@@ -443,11 +447,17 @@ consumer.on('message', function (message) { ...@@ -443,11 +447,17 @@ consumer.on('message', function (message) {
logger.info("Response from RM: " + message); logger.info("Response from RM: " + message);
message = JSON.parse(message) message = JSON.parse(message)
let resourceChoice = message.grunts[0] let resourceChoice = message.nodes[0]
if (resourceMap.has(message.resource_id)) { if (resourceMap.has(message.resource_id)) {
let resource = resourceMap.get(message.resource_id) let resource = resourceMap.get(message.resource_id)
if (typeof resourceChoice === 'string') {
resource.port = libSupport.getPort(usedPort)
resource.node_id = resourceChoice
} else {
resource.port = (resourceChoice.port) ? resourceChoice.port : libSupport.getPort(usedPort) resource.port = (resourceChoice.port) ? resourceChoice.port : libSupport.getPort(usedPort)
resource.node_id = resourceChoice.node_id resource.node_id = resourceChoice.node_id
}
let payload = [{ let payload = [{
topic: resource.node_id, topic: resource.node_id,
messages: JSON.stringify({ messages: JSON.stringify({
......
...@@ -191,9 +191,76 @@ function branchChainPredictor(req, resourceMap) { ...@@ -191,9 +191,76 @@ function branchChainPredictor(req, resourceMap) {
} }
} }
console.log(functionBranchTree); console.log("branch tree", functionBranchTree);
} }
function viterbi() {
let path = []
functionBranchTree.forEach((metadata, node) => {
if (metadata.parent) {
let parents = [[node, {
prob: 1,
metadata
}]]
path.push({node, probability: 1})
let siblings = new Map()
while(parents.length > 0) {
// console.log("parent_group", parents);
for (const parent of parents) {
// console.log("=========begin==========\n",parent, "\n=============end============");
// console.log(parent[1].metadata);
if (parent[1].metadata === undefined)
continue
let forwardBranches = parent[1].metadata.branches
// console.log(forwardBranches);
let parentProbability = parent[1].prob
forwardBranches.forEach((branchProb, subNode) => {
let probability = 0
if (siblings.has(subNode))
probability = siblings.get(subNode)
probability += branchProb * parentProbability
// console.log("prob", probability);
siblings.set(subNode, probability)
})
// console.log("siblings", siblings);
}
parents = []
let maxSibling, maxProb = 0
siblings.forEach((prob, sibling) => {
if (prob > maxProb) {
maxSibling = sibling
maxProb = prob
}
})
parentIDs = Array.from( siblings.keys() );
for (const id of parentIDs) {
let metadata = functionBranchTree.get(id)
parents.push([
id, {
prob: siblings.get(id),
metadata
}
])
}
if (maxSibling !== undefined)
path.push({node: maxSibling, probability: maxProb})
siblings = new Map()
}
}
});
if (path.length > 0)
console.log("path", path);
}
setInterval(viterbi, 5000)
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger, compare makeid, generateExecutor, reverseProxy, getPort, logger, compare
} }
\ No newline at end of file
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
const express = require('express') const express = require('express')
const bodyParser = require('body-parser') const bodyParser = require('body-parser')
let request = require('request') let request = require('request')
const process = require('process')
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime let port = 5000, resource_id, functionHash, runtime
...@@ -51,7 +51,7 @@ app.listen(port, () => { ...@@ -51,7 +51,7 @@ app.listen(port, () => {
producer.send( producer.send(
[{ [{
topic: "deployed", topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id }), messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id, entity_id: process.pid}),
"status": true "status": true
}], () => { }) }], () => { })
}) })
...@@ -63,7 +63,7 @@ function shouldDie() { ...@@ -63,7 +63,7 @@ function shouldDie() {
producer.send( producer.send(
[{ [{
topic: "removeWorker", topic: "removeWorker",
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id }) messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id, entity_id: process.pid})
}], () => { }], () => {
console.log("Ending worker for function", functionHash, "resource_id", resource_id); console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0) process.exit(0)
......
#!/usr/bin/env node
var mime = require('./mime.js');
var file = process.argv[2];
var type = mime.lookup(file);
process.stdout.write(type + '\n');
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment