Commit ec2a1e27 authored by Nilanjan Daw's avatar Nilanjan Daw

Bug fix: Fixed function deadline issues. Need more testing

parent 38c00ae9
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
"metrics": { "metrics": {
"alpha": 0.7 "alpha": 0.7
}, },
"speculative_deployment": true, "speculative_deployment": false,
"JIT_deployment": true, "JIT_deployment": true,
"id_size": 20 "id_size": 20
} }
...@@ -326,8 +326,8 @@ function checkCondition(op1, op2, op, result) { ...@@ -326,8 +326,8 @@ function checkCondition(op1, op2, op, result) {
return (operator[op](data, op2))? "success": "fail" return (operator[op](data, op2))? "success": "fail"
} }
async function speculative_deployment(chain_id, aliases, map, offset, done, toBeDone) { async function speculative_deployment(chain_id, aliases, map, offset, done, toBeDone, ignoreSet) {
console.log(done, toBeDone); console.log("offset: ", offset, "ignoreSet", ignoreSet);
if (constants.speculative_deployment) { if (constants.speculative_deployment) {
let getData = [] let getData = []
...@@ -352,7 +352,7 @@ async function speculative_deployment(chain_id, aliases, map, offset, done, toBe ...@@ -352,7 +352,7 @@ async function speculative_deployment(chain_id, aliases, map, offset, done, toBe
continue continue
} }
if (constants.JIT_deployment) { if (constants.JIT_deployment) {
console.log(mod, metadata, aliases[mod].alias); // console.log(mod, metadata, aliases[mod].alias);
let url = metricsDB + aliases[mod].alias let url = metricsDB + aliases[mod].alias
let data = libSupport.fetchData(url) let data = libSupport.fetchData(url)
...@@ -365,9 +365,10 @@ async function speculative_deployment(chain_id, aliases, map, offset, done, toBe ...@@ -365,9 +365,10 @@ async function speculative_deployment(chain_id, aliases, map, offset, done, toBe
Promise.all(getData).then((values) => { Promise.all(getData).then((values) => {
let dataMap = new Map() let dataMap = new Map()
for (const data of values) { for (const data of values) {
if (values.error === "not_found")
dataMap[data._id] = 0
dataMap[data._id] = data dataMap[data._id] = data
} }
console.log("line 361", done, toBeDone);
if (done === undefined) { if (done === undefined) {
console.log("new map"); console.log("new map");
...@@ -386,18 +387,30 @@ async function speculative_deployment(chain_id, aliases, map, offset, done, toBe ...@@ -386,18 +387,30 @@ async function speculative_deployment(chain_id, aliases, map, offset, done, toBe
/** /**
* expecting the first ones to run to be hit by coldstarts * expecting the first ones to run to be hit by coldstarts
*/ */
try {
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart
} catch (e) {
done[mod] = 0
}
// delete plannerMap[mod]; // delete plannerMap[mod];
} else if (done[mod] === undefined) { } else if (done[mod] === undefined) {
let flag = true let flag = true, redundantFlag = false
let maxWait = 0 let maxWait = 0
for (const dependency of metadata.wait_for) { for (const dependency of metadata.wait_for) {
if (done[dependency] === undefined) { if (done[dependency] === undefined) {
flag = false flag = false
break break
} else if (maxWait < done[dependency]) } else if (maxWait < done[dependency] && (ignoreSet === undefined || !ignoreSet.has(dependency)))
maxWait = done[dependency] maxWait = done[dependency]
else if (ignoreSet !== undefined && ignoreSet.has(dependency)) {
redundantFlag = true
console.log("ignoring redundant dependency", dependency);
}
} }
// if (redundantFlag)
// maxWait += offset;
maxWait += offset
if (flag) { if (flag) {
if (metadata.type === 'conditional') { if (metadata.type === 'conditional') {
console.log("setting notification for conditional", mod); console.log("setting notification for conditional", mod);
...@@ -412,17 +425,30 @@ async function speculative_deployment(chain_id, aliases, map, offset, done, toBe ...@@ -412,17 +425,30 @@ async function speculative_deployment(chain_id, aliases, map, offset, done, toBe
let branchMap = JSON.parse(JSON.stringify(map[branch])) let branchMap = JSON.parse(JSON.stringify(map[branch]))
delete branchMap['type'] delete branchMap['type']
console.log("success probability", probability, "taking branch: ", branch); console.log("success probability", probability, "taking branch: ", branch);
if (ignoreSet === undefined)
speculative_deployment(chain_id, aliases, branchMap, maxWait, done, toBeDone) ignoreSet = new Set(metadata.wait_for)
done[mod] = maxWait else
ignoreSet = new Set(ignoreSet, new Set(metadata.wait_for))
speculative_deployment(chain_id, aliases, branchMap, maxWait, done, toBeDone, ignoreSet)
done[mod] = maxWait - offset
} else { } else {
console.log("notification set", mod); console.log("notification set", mod);
let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ? let starttime
maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0 try {
notifyTime += offset starttime = dataMap[aliases[mod].alias][metadata.runtime].starttime
console.log(mod, "max wait", maxWait, "notify time:", notifyTime); } catch (e) {
starttime = 0
}
let notifyTime = ((maxWait - starttime) > 0) ?
maxWait - starttime: 0
// notifyTime += offset
console.log(mod, "max wait", maxWait, "notify time:", notifyTime, "offset added", offset);
setTimeout(notify, notifyTime, metadata.runtime, aliases[mod].alias) setTimeout(notify, notifyTime, metadata.runtime, aliases[mod].alias)
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart try {
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart - offset
} catch (e) {
done[mod] = maxWait - offset
}
} }
if (toBeDone.has(mod)) if (toBeDone.has(mod))
...@@ -467,7 +493,7 @@ function readMap(filename, alias = false) { ...@@ -467,7 +493,7 @@ function readMap(filename, alias = false) {
} }
function notify(runtime, functionHash) { function notify(runtime, functionHash) {
console.log("check map: ", functionToResource.has(functionHash + runtime)); // console.log("check map: ", functionToResource.has(functionHash + runtime));
if (!functionToResource.has(functionHash + runtime) && !db.has(functionHash + runtime)) { if (!functionToResource.has(functionHash + runtime) && !db.has(functionHash + runtime)) {
let payload = [{ let payload = [{
topic: constants.topics.hscale, topic: constants.topics.hscale,
......
...@@ -5,6 +5,7 @@ const secrets = require('./secrets.json') ...@@ -5,6 +5,7 @@ const secrets = require('./secrets.json')
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const util = require('util') const util = require('util')
const prom = require('prom-client'); const prom = require('prom-client');
const sharedMeta = require('./shared_meta');
const Registry = prom.Registry; const Registry = prom.Registry;
const register = new Registry(); const register = new Registry();
...@@ -29,8 +30,7 @@ register.registerMetric(coldstartMetric); ...@@ -29,8 +30,7 @@ register.registerMetric(coldstartMetric);
register.registerMetric(starttimeMetric); register.registerMetric(starttimeMetric);
register.registerMetric(requestMetric); register.registerMetric(requestMetric);
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metricsDB = sharedMeta.metricsDB
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -129,6 +129,7 @@ async function broadcastMetrics() { ...@@ -129,6 +129,7 @@ async function broadcastMetrics() {
warmstart: metric.longterm.warmstart, warmstart: metric.longterm.warmstart,
starttime: metric.longterm.starttime starttime: metric.longterm.starttime
} }
let payload = { let payload = {
method: 'put', method: 'put',
body: JSON.stringify(dbData), body: JSON.stringify(dbData),
...@@ -136,6 +137,7 @@ async function broadcastMetrics() { ...@@ -136,6 +137,7 @@ async function broadcastMetrics() {
} }
await fetch(metricsDB + functionHash, payload) await fetch(metricsDB + functionHash, payload)
metric.timestamp = Date.now() metric.timestamp = Date.now()
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment