Commit 8aec2c81 authored by Nilanjan Daw's avatar Nilanjan Daw

JIT deployment support for conditional chains

parent 178f8a33
...@@ -32,6 +32,6 @@ ...@@ -32,6 +32,6 @@
"alpha": 0.7 "alpha": 0.7
}, },
"speculative_deployment": true, "speculative_deployment": true,
"JIT_deployment": true, "JIT_deployment": false,
"id_size": 20 "id_size": 20
} }
...@@ -20,7 +20,10 @@ const logger = libSupport.logger ...@@ -20,7 +20,10 @@ const logger = libSupport.logger
const registry_url = constants.registry_url const registry_url = constants.registry_url
let functionToResource = sharedStructures.functionToResource, let functionToResource = sharedStructures.functionToResource,
db = sharedStructures.db db = sharedStructures.db,
conditionProbabilityExpilict = sharedStructures.conditionProbabilityExpilict
router.post('/deploy', (req, res) => { router.post('/deploy', (req, res) => {
// let runtime = req.body.runtime // let runtime = req.body.runtime
...@@ -193,38 +196,36 @@ async function deployContainer(path, imageName) { ...@@ -193,38 +196,36 @@ async function deployContainer(path, imageName) {
router.post('/execute/:id', (req, res) => { router.post('/execute/:id', (req, res) => {
let map, aliases let map, aliases
// if (req.body.map) let chain_id = req.params.id
// map = req.body.map
// else {
if (req.files && req.files.map) { if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString()); map = JSON.parse(req.files.map.data.toString());
let mapPlanner = JSON.parse(req.files.map.data.toString()); let mapPlanner = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${req.params.id}.json`, true) readMap(`./repository/aliases${chain_id}.json`, true)
.then(data => { .then(data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
speculative_deployment(aliases, mapPlanner); speculative_deployment(chain_id, aliases, mapPlanner, 0);
orchestrator(res, payload, map, aliases, {}) orchestrator(chain_id, res, payload, map, aliases, {})
}) })
} else { } else {
readMap(`./repository/map${req.params.id}.json`) readMap(`./repository/map${chain_id}.json`)
.then(data => { .then(data => {
map = data map = data
let mapPlanner = JSON.parse(JSON.stringify(map)) let mapPlanner = JSON.parse(JSON.stringify(map))
readMap(`./repository/aliases${req.params.id}.json`, true) readMap(`./repository/aliases${chain_id}.json`, true)
.then(data => { .then(data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
speculative_deployment(aliases, mapPlanner); speculative_deployment(chain_id, aliases, mapPlanner, 0);
orchestrator(res, payload, map, aliases, {}) orchestrator(chain_id, res, payload, map, aliases, {})
}) })
}) })
} }
}) })
async function orchestrator(res, payload, map, aliases, result) { async function orchestrator(chain_id, res, payload, map, aliases, result) {
if (Object.keys(map).length == 0) { if (Object.keys(map).length == 0) {
console.log("time to resolve", result); console.log("time to resolve", result);
...@@ -234,8 +235,6 @@ async function orchestrator(res, payload, map, aliases, result) { ...@@ -234,8 +235,6 @@ async function orchestrator(res, payload, map, aliases, result) {
else { else {
for (const [functionName, metadata] of Object.entries(map)) { for (const [functionName, metadata] of Object.entries(map)) {
// console.log(functionName, metadata, aliases[functionName]);
// console.log(metadata);
if (metadata.type === "function" && metadata.wait_for.length == 0) { if (metadata.type === "function" && metadata.wait_for.length == 0) {
let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}` let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}`
...@@ -269,7 +268,21 @@ async function orchestrator(res, payload, map, aliases, result) { ...@@ -269,7 +268,21 @@ async function orchestrator(res, payload, map, aliases, result) {
if (metadata.type === "conditional" && metadata.wait_for.length == 0) { if (metadata.type === "conditional" && metadata.wait_for.length == 0) {
let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result) let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result)
console.log(conditionResult, "aliases", aliases); if (conditionProbabilityExpilict[chain_id] === undefined)
conditionProbabilityExpilict[chain_id] = {}
if (conditionProbabilityExpilict[chain_id][_key] === undefined)
conditionProbabilityExpilict[chain_id][_key] = {
request_count: 0,
probability: 0
}
let oldProbability = conditionProbabilityExpilict[chain_id][_key].probability
let updateProbability = (conditionResult === 'success') ? 1.0 : 0.0
conditionProbabilityExpilict[chain_id][_key].probability =
oldProbability * conditionProbabilityExpilict[chain_id][_key].request_count + updateProbability
conditionProbabilityExpilict[chain_id][_key].request_count++
conditionProbabilityExpilict[chain_id][_key].probability /=
conditionProbabilityExpilict[chain_id][_key].request_count
console.log(conditionResult, "probability table", conditionProbabilityExpilict);
let branchToTake = metadata[conditionResult] let branchToTake = metadata[conditionResult]
branchMap = map[branchToTake] branchMap = map[branchToTake]
delete map[_key] delete map[_key]
...@@ -277,7 +290,7 @@ async function orchestrator(res, payload, map, aliases, result) { ...@@ -277,7 +290,7 @@ async function orchestrator(res, payload, map, aliases, result) {
} }
} }
orchestrator(res, payload, (branchMap == null)? map: branchMap, aliases, result) orchestrator(chain_id, res, payload, (branchMap == null)? map: branchMap, aliases, result)
}) })
} }
} }
...@@ -304,11 +317,29 @@ function checkCondition(op1, op2, op, result) { ...@@ -304,11 +317,29 @@ function checkCondition(op1, op2, op, result) {
return (operator[op](data, op2))? "success": "fail" return (operator[op](data, op2))? "success": "fail"
} }
async function speculative_deployment(aliases, map) { async function speculative_deployment(chain_id, aliases, map, offset, done, toBeDone) {
if (constants.speculative_deployment) { if (constants.speculative_deployment) {
console.log(aliases);
let getData = [] let getData = []
for (const [mod, metadata] of Object.entries(map)) { for (const [mod, metadata] of Object.entries(map)) {
if (metadata.type !== 'function') {
if (metadata.type === 'conditional') {
let probability
try {
probability = conditionProbabilityExpilict[chain_id][mod].probability
} catch (error) {
console.log("branch probability not present, random branch taken");
probability = Math.random()
}
let branch = (probability >= 0.5) ? metadata['success'] : metadata['fail']
let branchMap = JSON.parse(JSON.stringify(map[branch]))
delete branchMap['type']
console.log("success probability", probability, "taking branch: ", branch);
speculative_deployment(chain_id, aliases, branchMap)
}
continue
}
if (constants.JIT_deployment) { if (constants.JIT_deployment) {
console.log(mod, metadata, aliases[mod].alias); console.log(mod, metadata, aliases[mod].alias);
...@@ -325,22 +356,26 @@ async function speculative_deployment(aliases, map) { ...@@ -325,22 +356,26 @@ async function speculative_deployment(aliases, map) {
for (const data of values) { for (const data of values) {
dataMap[data._id] = data dataMap[data._id] = data
} }
if (!done)
let done = new Map() done = new Map()
let toBeDone = new Set() if (!toBeDone)
toBeDone = new Set()
// var plannerMap = new Map(map) // var plannerMap = new Map(map)
do { do {
for (const [mod, metadata] of Object.entries(map)) { for (const [mod, metadata] of Object.entries(map)) {
if (metadata.type !== 'function' && metadata.type !== 'conditional') {
continue
}
if (metadata.wait_for.length == 0 && done[mod] === undefined) { if (metadata.wait_for.length == 0 && done[mod] === undefined) {
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart // expecting the first ones to run /**
// to be hit by coldstarts * expecting the first ones to run to be hit by coldstarts
*/
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart
// delete plannerMap[mod]; // delete plannerMap[mod];
} else if (done[mod] === undefined) { } else if (done[mod] === undefined) {
let flag = true let flag = true
let maxWait = 0 let maxWait = 0
for (const dependency of metadata.wait_for) { for (const dependency of metadata.wait_for) {
console.log(dependency);
if (done[dependency] === undefined) { if (done[dependency] === undefined) {
flag = false flag = false
break break
...@@ -348,12 +383,32 @@ async function speculative_deployment(aliases, map) { ...@@ -348,12 +383,32 @@ async function speculative_deployment(aliases, map) {
maxWait = done[dependency] maxWait = done[dependency]
} }
if (flag) { if (flag) {
console.log("notification set", mod); if (metadata.type === 'conditional') {
let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ? console.log("setting notification for conditional", mod);
maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0 let probability
console.log(mod, "max wait", maxWait, "notify time:", notifyTime); try {
setTimeout(notify, notifyTime, metadata.runtime, aliases[mod].alias) probability = conditionProbabilityExpilict[chain_id][mod].probability
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart } catch (error) {
console.log("branch probability not present, random branch taken");
probability = Math.random()
}
let branch = (probability >= 0.5)? metadata['success']: metadata['fail']
let branchMap = JSON.parse(JSON.stringify(map[branch]))
delete branchMap['type']
console.log("success probability", probability, "taking branch: ", branch);
speculative_deployment(chain_id, aliases, branchMap, maxWait, done, toBeDone)
done[mod] = maxWait
} else {
console.log("notification set", mod);
let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ?
maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0
notifyTime += offset
console.log(mod, "max wait", maxWait, "notify time:", notifyTime);
setTimeout(notify, notifyTime, metadata.runtime, aliases[mod].alias)
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart
}
if (toBeDone.has(mod)) if (toBeDone.has(mod))
delete toBeDone[mod] delete toBeDone[mod]
// delete plannerMap[mod] // delete plannerMap[mod]
...@@ -361,7 +416,7 @@ async function speculative_deployment(aliases, map) { ...@@ -361,7 +416,7 @@ async function speculative_deployment(aliases, map) {
toBeDone.add(mod) toBeDone.add(mod)
} }
} }
console.log(done, toBeDone); console.log("done", done);
} }
} while (toBeDone.size != 0) } while (toBeDone.size != 0)
}) })
......
...@@ -526,7 +526,7 @@ function autoscalar() { ...@@ -526,7 +526,7 @@ function autoscalar() {
*/ */
async function speculative_deployment(req, runtime) { async function speculative_deployment(req, runtime) {
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) { if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id); // console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) { if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id) let branchInfo = functionBranchTree.get(req.params.id)
......
...@@ -3,9 +3,11 @@ let db = new Map(), // queue holding request to be dispatched ...@@ -3,9 +3,11 @@ let db = new Map(), // queue holding request to be dispatched
functionToResource = new Map(), // a function to resource map. Each map contains a minheap of functionToResource = new Map(), // a function to resource map. Each map contains a minheap of
// resources associated with the function // resources associated with the function
workerNodes = new Map(), // list of worker nodes currently known to the DM workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions functionBranchTree = new Map(), // a tree to store function branch predictions
conditionProbabilityExpilict = new Map() // tree holding conditional probabilities for explicit chains
module.exports = { module.exports = {
db, functionBranchTree, functionToResource, workerNodes, resourceMap db, functionBranchTree, functionToResource, workerNodes, resourceMap,
conditionProbabilityExpilict
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment