Commit c81d9dd5 authored by Nilanjan Daw's avatar Nilanjan Daw

Massively streamlined speculative deployer for explicit chains

Reduced complexity of the speculative deployer. Moved path discovery to orchestrator. Removed need to recalculate path on each run
parent 0c95d5ed
......@@ -8,6 +8,7 @@ const fetch = require('node-fetch')
const constants = require('../constants.json')
const operator = require('./operator')
const sharedMeta = require('./shared_meta')
const util = require('util')
const logger = libSupport.logger
......@@ -197,30 +198,36 @@ router.post('/execute/:id', (req, res) => {
libSupport.fetchData(explicitChainDB + chain_id)
.then(chainData => {
console.log(chainData);
let path = {
path: [],
onPath: true,
dependency: {},
level: 0
}
if (chainData.error !== "not_found")
conditionProbabilityExplicit[chain_id] = chainData
if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString());
let mapPlanner = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${chain_id}.json`, true)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
speculative_deployment(chain_id, aliases, mapPlanner, 0);
orchestrator(chain_id, res, payload, map, aliases, {})
if (chainData.error != "not_found")
speculative_deployment(chain_id, aliases, chainData);
orchestrator(chain_id, res, payload, map, aliases, {}, path)
})
} else {
readMap(`./repository/map${chain_id}.json`)
.then(data => {
map = data
let mapPlanner = JSON.parse(JSON.stringify(map))
readMap(`./repository/aliases${chain_id}.json`, true)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
speculative_deployment(chain_id, aliases, mapPlanner, 0);
orchestrator(chain_id, res, payload, map, aliases, {})
if (chainData.error != "not_found")
speculative_deployment(chain_id, aliases, chainData);
orchestrator(chain_id, res, payload, map, aliases, {}, path)
})
})
}
......@@ -228,17 +235,41 @@ router.post('/execute/:id', (req, res) => {
})
async function orchestrator(chain_id, res, payload, map, aliases, result) {
/**
* Orchestrator function to execute a chain and return the result to the invoker
* @param {string} chain_id ID of the chain to be executed
* @param {JSON} res response object
* @param {JSON} payload data to be passed to the chain
* @param {JSON} map holds the chain map
* @param {JSON} aliases internal alias to function chain mapping
* @param {JSON} result result obtained after chain executes
*/
async function orchestrator(chain_id, res, payload, map, aliases, result, path) {
/**
* Adding dependencies on MLE path to a map
* for fast lookup during speculation
*/
for (const [functionName, metadata] of Object.entries(map)) {
if (metadata.type === "function" || metadata.type === "conditional") {
if (path.dependency[functionName] === undefined)
path.dependency[functionName] = JSON.parse(JSON.stringify(metadata.wait_for))
}
}
if (Object.keys(map).length == 0) {
console.log("time to resolve", result);
res.json(result)
if (path.onPath)
conditionProbabilityExplicit[chain_id]["path"] = path
let payload = {
method: 'put',
body: JSON.stringify(conditionProbabilityExplicit[chain_id]),
headers: { 'Content-Type': 'application/json' }
}
libSupport.fetchData(explicitChainDB + chain_id, payload)
console.log("detected path", util.inspect(path, false, null, true /* enable colors */));
// return resolve(result)
}
......@@ -258,22 +289,27 @@ async function orchestrator(chain_id, res, payload, map, aliases, result) {
}
delete map[functionName]
aliases[functionName].status = "running"
if (typeof path.path[path.level] === 'undefined') {
path.path[path.level] = []
}
path.path[path.level].push({functionName, type: "function", runtime: metadata.runtime})
libSupport.fetchData(url, data)
.then(json => {
// console.log(json);
result[functionName] = json
aliases[functionName].status = "done"
let branchMap = null
let branchMap = null, flag = false
for (const [_key, metadata] of Object.entries(map)) {
if (metadata.type === "function" || metadata.type === "conditional") {
let index = metadata.wait_for.indexOf(functionName)
if (index >= 0)
metadata.wait_for.splice(index, 1);
if (metadata.wait_for.length == 0)
flag = true // something is runnable
}
if (metadata.type === "conditional" && metadata.wait_for.length == 0) {
let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result)
......@@ -296,16 +332,34 @@ async function orchestrator(chain_id, res, payload, map, aliases, result) {
branchMap = map[branchToTake]
delete map[_key]
makeBranchRunnable(branchMap, aliases)
if ((conditionResult === 'success') && conditionProbabilityExplicit[chain_id][_key].probability < 0.5 ||
(conditionResult !== 'success') && conditionProbabilityExplicit[chain_id][_key].probability > 0.5) {
path.onPath = false
console.log("out of path");
}
path.level++
if (typeof path.path[path.level] === 'undefined') {
path.path[path.level] = []
}
path.path[path.level].push({functionName: _key, type: "condition"})
}
}
orchestrator(chain_id, res, payload, (branchMap == null)? map: branchMap, aliases, result)
if (flag)
path.level++
orchestrator(chain_id, res, payload, (branchMap == null)? map: branchMap, aliases, result, path)
})
}
}
}
}
/**
* Make the branch runnable by removing redundant dependencies in the map
* which have already executed
* @param {JSON} branchMap sub map of the chain holding the branch to be executed
* @param {JSON} aliases internal alias to function chain mapping
*/
function makeBranchRunnable(branchMap, aliases) {
delete branchMap['type']
for (const [_key, metadata] of Object.entries(branchMap)) {
......@@ -326,142 +380,65 @@ function checkCondition(op1, op2, op, result) {
return (operator[op](data, op2))? "success": "fail"
}
async function speculative_deployment(chain_id, aliases, map, offset, done, toBeDone, ignoreSet) {
console.log("offset: ", offset, "ignoreSet", ignoreSet);
async function speculative_deployment(chain_id, aliases, chainData) {
// console.log("chainData", util.inspect(chainData, false, null, true /* enable colors */));
let plan = []
let path = chainData.path.path
if (constants.speculative_deployment) {
let getData = []
for (const [mod, metadata] of Object.entries(map)) {
if (metadata.type !== 'function') {
if (metadata.type === 'conditional' && !constants.JIT_deployment) {
let probability
try {
probability = conditionProbabilityExplicit[chain_id][mod].probability
} catch (error) {
console.log("branch probability not present, random branch taken");
probability = Math.random()
for (let i = 0; i < path.length; i++) {
for (const node of path[i]) {
if (node.type === "function") {
node.id = aliases[node.functionName].alias
node.invokeTime = 0
// console.log(node);
plan.push(node)
}
let branch = (probability >= 0.5) ? metadata['success'] : metadata['fail']
let branchMap = JSON.parse(JSON.stringify(map[branch]))
delete branchMap['type']
console.log("success probability", probability, "taking branch: ", branch);
speculative_deployment(chain_id, aliases, branchMap)
}
continue
}
if (constants.JIT_deployment) {
// console.log(mod, metadata, aliases[mod].alias);
let url = metricsDB + aliases[mod].alias
let data = libSupport.fetchData(url)
getData.push(data)
} else {
notify(metadata.runtime, aliases[mod].alias)
}
}
if (constants.JIT_deployment) {
Promise.all(getData).then((values) => {
let dataMap = new Map()
for (const data of values) {
if (values.error === "not_found")
dataMap[data._id] = 0
dataMap[data._id] = data
}
if (done === undefined) {
console.log("new map");
done = new Map()
}
if (toBeDone === undefined) {
toBeDone = new Set()
}
// var plannerMap = new Map(map)
do {
for (const [mod, metadata] of Object.entries(map)) {
if (metadata.type !== 'function' && metadata.type !== 'conditional') {
continue
}
if (metadata.wait_for.length == 0 && done[mod] === undefined) {
/**
* expecting the first ones to run to be hit by coldstarts
*/
try {
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart
} catch (e) {
done[mod] = 0
}
// delete plannerMap[mod];
} else if (done[mod] === undefined) {
let flag = true, redundantFlag = false
let maxWait = 0
for (const dependency of metadata.wait_for) {
if (done[dependency] === undefined) {
flag = false
break
} else if (maxWait < done[dependency] && (ignoreSet === undefined || !ignoreSet.has(dependency)))
maxWait = done[dependency]
else if (ignoreSet !== undefined && ignoreSet.has(dependency)) {
redundantFlag = true
console.log("ignoring redundant dependency", dependency);
}
}
// if (redundantFlag)
// maxWait += offset;
maxWait += offset
if (flag) {
if (metadata.type === 'conditional') {
console.log("setting notification for conditional", mod);
let probability
try {
probability = conditionProbabilityExplicit[chain_id][mod].probability
} catch (error) {
console.log("branch probability not present, random branch taken");
probability = Math.random()
}
let branch = (probability >= 0.5)? metadata['success']: metadata['fail']
let branchMap = JSON.parse(JSON.stringify(map[branch]))
delete branchMap['type']
console.log("success probability", probability, "taking branch: ", branch);
if (ignoreSet === undefined)
ignoreSet = new Set(metadata.wait_for)
else
ignoreSet = new Set(ignoreSet, new Set(metadata.wait_for))
speculative_deployment(chain_id, aliases, branchMap, maxWait, done, toBeDone, ignoreSet)
done[mod] = maxWait - offset
let conditionalDelay = 0, metricsData = new Map(), delayMap = new Map()
let data = await libSupport.fetchData(metricsDB + "_bulk_get", {
method: 'post',
body: JSON.stringify({
docs: plan
}),
headers: { 'Content-Type': 'application/json' },
})
data = data.results
for (let i = 0; i < path.length; i++) {
let id = data[i].id
metricsData[id] = data[i].docs[0].ok
}
// console.log(metricsData);
for (let i = 0; i < path.length; i++) {
for (const node of path[i]) {
let maxDelay = conditionalDelay
for (const dependency of chainData.path.dependency[node.functionName]) {
if (delayMap.get(dependency) > maxDelay)
maxDelay = delayMap.get(dependency)
}
if (maxDelay == 0) {
maxDelay += (node.type === "function")? metricsData[node.id][node.runtime].coldstart: 0
delayMap.set(node.functionName, maxDelay)
} else {
console.log("notification set", mod);
let starttime
try {
starttime = dataMap[aliases[mod].alias][metadata.runtime].starttime
} catch (e) {
starttime = 0
}
let notifyTime = ((maxWait - starttime) > 0) ?
maxWait - starttime: 0
// notifyTime += offset
console.log(mod, "max wait", maxWait, "notify time:", notifyTime, "offset added", offset);
setTimeout(notify, notifyTime, metadata.runtime, aliases[mod].alias)
try {
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart - offset
} catch (e) {
done[mod] = maxWait - offset
if (node.type === "function")
node.invokeTime = maxDelay - metricsData[node.id][node.runtime].starttime
maxDelay += (node.type === "function")? metricsData[node.id][node.runtime].warmstart: 0
delayMap.set(node.functionName, maxDelay)
}
}
if (toBeDone.has(mod))
delete toBeDone[mod]
// delete plannerMap[mod]
} else {
toBeDone.add(mod)
if (node.type === "condition")
conditionalDelay = maxDelay
}
}
console.log("done", done);
console.log("delay map", delayMap);
console.log("notifcation plan", plan);
}
} while (toBeDone.size != 0)
})
for (const node of plan) {
console.log("notification set for", node.functionName);
setTimeout(notify, node.invokeTime, node.runtime, node.id)
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment