Commit 57e15b96 authored by Nilanjan Daw's avatar Nilanjan Daw

Speculative JIT deployment support for explicit chains

Support is only for non-conditional chains.
parent 742d8b49
......@@ -6,6 +6,7 @@
"couchdb_host": "localhost:5984",
"function_db_name": "serverless",
"metrics_db_name": "metrics",
"implicit_chain_db_name": "implicit_chain",
"network": {
"network_bridge": "hybrid_kafka-serverless",
"internal": {
......
......@@ -195,23 +195,26 @@ router.post('/execute/:id', (req, res) => {
// else {
if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString());
let mapPlanner = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${req.params.id}.json`, true)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
console.log(payload);
// getTimeLines(aliases, map);
getTimeLines(aliases, mapPlanner);
orchestrator(res, payload, map, aliases, {})
})
} else {
readMap(`./repository/map${req.params.id}.json`)
.then(data => {
map = data
let mapPlanner = JSON.parse(JSON.stringify(map))
readMap(`./repository/aliases${req.params.id}.json`, true)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
// getTimeLines(aliases, map);
getTimeLines(aliases, mapPlanner);
orchestrator(res, payload, map, aliases, {})
})
})
......@@ -299,20 +302,72 @@ function checkCondition(op1, op2, op, result) {
return (operator[op](data, op2))? "success": "fail"
}
function getTimeLines(aliases) {
async function getTimeLines(aliases, map) {
if (constants.speculative_deployment) {
console.log(aliases);
let getData = []
for (const [functionName, metadata] of Object.entries(aliases)) {
let url = metricsDB + `_design/designdoc/_view/testview?startkey=[${metadata.alias}]&endkey=[{}]`
for (const [mod, metadata] of Object.entries(map)) {
console.log(mod, metadata, aliases[mod].alias);
let url = metricsDB + aliases[mod].alias
console.log(url);
getData.push(libSupport.fetchData(url))
let data = libSupport.fetchData(url)
console.log(data);
getData.push(data)
}
Promise.all(getData).then((values) => {
console.log(values);
let dataMap = new Map()
for (const data of values) {
dataMap[data._id] = data
}
let done =new Map()
let toBeDone = new Set()
// var plannerMap = new Map(map)
do {
for (const [mod, metadata] of Object.entries(map)) {
if (metadata.wait_for.length == 0 && done[mod] === undefined) {
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart // expecting the first ones to run
// to be hit by coldstarts
// delete plannerMap[mod];
} else if (done[mod] === undefined) {
let flag = true
let maxWait = 0
for (const dependency of metadata.wait_for) {
console.log(dependency);
if (done[dependency] === undefined) {
flag = false
break
} else if (maxWait < done[dependency])
maxWait = done[dependency]
}
if (flag) {
console.log("notifying", mod);
let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0)?
maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime: 0
console.log(mod, "max wait", maxWait, "notify time:", notifyTime);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias })
}]
// setTimeout(notify, notifyTime, payload)
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart
if (toBeDone.has(mod))
delete toBeDone[mod]
// delete plannerMap[mod]
} else {
toBeDone.add(mod)
}
}
console.log(done, toBeDone);
}
} while (toBeDone.size != 0)
})
}
}
function readMap(filename, alias = false) {
......@@ -341,6 +396,10 @@ function readMap(filename, alias = false) {
})
}
function notify(payload) {
libSupport.producer.send(payload, function () { })
}
function createDirectory(path) {
return new Promise((resolve, reject) => {
if (!fs.existsSync(path)) {
......
......@@ -75,6 +75,11 @@ const WINDOW_SIZE = 10
const port = constants.master_port
const registry_url = constants.registry_url
app.get('/metrics', (req, res) => {
res.set('Content-Type', libSupport.metrics.register.contentType);
res.end(libSupport.metrics.register.metrics());
});
/**
* REST API to receive deployment requests
*/
......@@ -552,7 +557,7 @@ async function speculative_deployment(req, runtime) {
}
}
}
setInterval(libSupport.metrics.broadcastMetrics, 5000)
setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
......
......@@ -4,10 +4,12 @@ const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const metrics = require('./metrics')
const { createLogger, format, transports } = winston;
const heap = require('heap')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
......@@ -16,6 +18,9 @@ let kafka = require('kafka-node'),
}),
producer = new Producer(client)
let implicitChainDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
implicitChainDB = implicitChainDB + "/" + constants.implicit_chain_db_names + "/"
/**
* Generates unique IDs of arbitrary length
* @param {Length of the ID} length
......@@ -127,7 +132,6 @@ function getPort(usedPort) {
return port
}
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
......@@ -270,9 +274,17 @@ function viterbi(functionBranchTree) {
path.push({node: maxSibling, probability: maxProb})
siblings = new Map()
}
if (path.length > 0)
console.log("path", path);
// if (path.length > 0)
// console.log("path", path);
metadata.mle_path = path
if (path.length > 1) {
let payload = {
method: 'put',
body: JSON.stringify(path),
headers: { 'Content-Type': 'application/json' }
}
fetch(implicitChainDB + functionHash, payload)
}
}
});
}
......@@ -305,15 +317,18 @@ function logBroadcast(message, resource_id, resourceMap) {
}
async function fetchData(url, data) {
let res = await fetch(url, data)
async function fetchData(url, data = null) {
let res
if (data === undefined || data === null)
res = await fetch(url)
else
res = await fetch(url, data)
return await res.json()
}
setInterval(metrics.broadcastMetrics, 5000)
module.exports = {
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi, logBroadcast, fetchData, metrics
viterbi, logBroadcast, fetchData, metrics,
producer
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment