'use strict'; const express = require('express') const libSupport = require('./lib') const router = express.Router() const fs = require('fs') const { spawn } = require('child_process') const fetch = require('node-fetch') const constants = require('../constants.json') const operator = require('./operator') const sharedMeta = require('./shared_meta') const logger = libSupport.logger const registry_url = constants.registry_url let functionToResource = sharedMeta.functionToResource, db = sharedMeta.db, conditionProbabilityExplicit = sharedMeta.conditionProbabilityExplicit, metricsDB = sharedMeta.metricsDB, metadataDB = sharedMeta.metadataDB, explicitChainDB = sharedMeta.explicitChainDB router.post('/deploy', (req, res) => { // let runtime = req.body.runtime let files = req.files const chain_id = libSupport.makeid(constants.id_size) const file_path = __dirname + "/repository/" let aliases = {} let deployHandles = [] createDirectory(file_path).then(() => { for (const [file_alias, file] of Object.entries(files)) { let functionHash = file.md5 if (file_alias === 'map') { file.mv(file_path + 'map' + chain_id + ".json") continue } // aliases[file_alias] = functionHash deployHandles.push(deploy(file_path, functionHash, file, aliases, file_alias)) } console.log("aliases", aliases); Promise.all(deployHandles).then(() => { console.log("done"); fs.writeFile(file_path + `aliases${chain_id}.json`, JSON.stringify(aliases, null, 2), function(err) { res.json({ status: "success", function_id: chain_id }) }) }).catch(err => { res.json({ status: "error", reason: err }).status(400) }) }) }) async function deploy(file_path, functionHash, file, aliases, file_alias) { let runtime = "container", memory = 330 try { await moveFile(file, file_path, functionHash) functionHash = libSupport.generateExecutor(file_path, functionHash) aliases[file_alias] = functionHash /** * Adding meta caching via couchdb * This will create / update function related metadata like resource limits etc * on a database named "serverless". */ let res = await fetch(metadataDB + functionHash) let json = await res.json() console.log(json); if (json.error === "not_found") { logger.warn("New function, creating metadata") await fetch(metadataDB + functionHash, { method: 'put', body: JSON.stringify({ memory: memory }), headers: { 'Content-Type': 'application/json' }, }) // let json = await res.json() // console.log(json) } else { logger.warn('Repeat deployment, updating metadata') try { await fetch(metadataDB + functionHash, { method: 'put', body: JSON.stringify({ memory: memory, _rev: json._rev }), headers: { 'Content-Type': 'application/json' }, }) // let json = await res.json() // console.log(json) } catch (err) { console.log(err); } } if (runtime === "container") { try { await deployContainer(file_path, functionHash) console.log("called"); return Promise.resolve(functionHash) } catch(err) { return Promise.reject(err) } } else { return Promise.resolve(functionHash) } } catch (err) { logger.error(err) return Promise.reject(err) } } function moveFile(file, file_path, functionHash) { return new Promise((resolve, reject) =>{ file.mv(file_path + functionHash, function (err) { if (err) reject(err) resolve() }) }) } async function deployContainer(path, imageName) { return new Promise((resolve, reject) => { let buildStart = Date.now() fs.writeFile('./repository/Dockerfile' + imageName, `FROM node:latest WORKDIR /app COPY ./worker_env/package.json /app ADD ./worker_env/node_modules /app/node_modules COPY ${imageName}.js /app ENTRYPOINT ["node", "${imageName}.js"]` , function (err) { if (err) { logger.error("failed", err); reject(err); } else { logger.info('Dockerfile created'); const process = spawn('docker', ["build", "-t", registry_url + imageName, path, "-f", path + `Dockerfile${imageName}`]); process.stdout.on('data', (data) => { logger.info(`stdout: ${data}`); }); process.stderr.on('data', (data) => { logger.error(`stderr: ${data}`); }); process.on('close', (code) => { logger.warn(`child process exited with code ${code}`); let timeDifference = Math.ceil((Date.now() - buildStart)) logger.info("image build time taken: ", timeDifference); const process_push = spawn('docker', ["push", registry_url + imageName]); process_push.stdout.on('data', (data) => { console.log(`stdout: ${data}`); }); process_push.stderr.on('data', (data) => { logger.error(`stderr: ${data}`); }); process_push.on('close', (code) => { logger.info("image pushed to repository"); resolve(); }) }); } }); }) } router.post('/execute/:id', (req, res) => { let map, aliases let chain_id = req.params.id libSupport.fetchData(explicitChainDB + chain_id) .then(chainData => { console.log(chainData); if (chainData.error !== "not_found") conditionProbabilityExplicit[chain_id] = chainData if (req.files && req.files.map) { map = JSON.parse(req.files.map.data.toString()); let mapPlanner = JSON.parse(req.files.map.data.toString()); readMap(`./repository/aliases${chain_id}.json`, true) .then(data => { aliases = data let payload = JSON.parse(req.body.data) speculative_deployment(chain_id, aliases, mapPlanner, 0); orchestrator(chain_id, res, payload, map, aliases, {}) }) } else { readMap(`./repository/map${chain_id}.json`) .then(data => { map = data let mapPlanner = JSON.parse(JSON.stringify(map)) readMap(`./repository/aliases${chain_id}.json`, true) .then(data => { aliases = data let payload = JSON.parse(req.body.data) speculative_deployment(chain_id, aliases, mapPlanner, 0); orchestrator(chain_id, res, payload, map, aliases, {}) }) }) } }) }) async function orchestrator(chain_id, res, payload, map, aliases, result) { if (Object.keys(map).length == 0) { console.log("time to resolve", result); res.json(result) let payload = { method: 'put', body: JSON.stringify(conditionProbabilityExplicit[chain_id]), headers: { 'Content-Type': 'application/json' } } libSupport.fetchData(explicitChainDB + chain_id, payload) // return resolve(result) } else { for (const [functionName, metadata] of Object.entries(map)) { if (metadata.type === "function" && metadata.wait_for.length == 0) { let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}` console.log(url); let data = { method: 'post', body: JSON.stringify({ runtime: metadata.runtime, payload }), headers: { 'Content-Type': 'application/json' } } delete map[functionName] aliases[functionName].status = "running" libSupport.fetchData(url, data) .then(json => { // console.log(json); result[functionName] = json aliases[functionName].status = "done" let branchMap = null for (const [_key, metadata] of Object.entries(map)) { if (metadata.type === "function" || metadata.type === "conditional") { let index = metadata.wait_for.indexOf(functionName) if (index >= 0) metadata.wait_for.splice(index, 1); } if (metadata.type === "conditional" && metadata.wait_for.length == 0) { let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result) if (conditionProbabilityExplicit[chain_id] === undefined) conditionProbabilityExplicit[chain_id] = {} if (conditionProbabilityExplicit[chain_id][_key] === undefined) conditionProbabilityExplicit[chain_id][_key] = { request_count: 0, probability: 0 } let oldProbability = conditionProbabilityExplicit[chain_id][_key].probability let updateProbability = (conditionResult === 'success') ? 1.0 : 0.0 conditionProbabilityExplicit[chain_id][_key].probability = oldProbability * conditionProbabilityExplicit[chain_id][_key].request_count + updateProbability conditionProbabilityExplicit[chain_id][_key].request_count++ conditionProbabilityExplicit[chain_id][_key].probability /= conditionProbabilityExplicit[chain_id][_key].request_count console.log(conditionResult, "probability table", conditionProbabilityExplicit); let branchToTake = metadata[conditionResult] branchMap = map[branchToTake] delete map[_key] makeBranchRunnable(branchMap, aliases) } } orchestrator(chain_id, res, payload, (branchMap == null)? map: branchMap, aliases, result) }) } } } } function makeBranchRunnable(branchMap, aliases) { delete branchMap['type'] for (const [_key, metadata] of Object.entries(branchMap)) { if (metadata.type === "function" || metadata.type === "conditional") { let wait_for = [] for (const dependent of metadata.wait_for) { if (aliases[dependent].status !== "done") wait_for.push(dependent) metadata.wait_for = wait_for } } } } function checkCondition(op1, op2, op, result) { op1 = op1.split(".") let data = result[op1[0]][op1[1]] return (operator[op](data, op2))? "success": "fail" } async function speculative_deployment(chain_id, aliases, map, offset, done, toBeDone) { console.log(done, toBeDone); if (constants.speculative_deployment) { let getData = [] for (const [mod, metadata] of Object.entries(map)) { if (metadata.type !== 'function') { if (metadata.type === 'conditional' && !constants.JIT_deployment) { let probability try { probability = conditionProbabilityExplicit[chain_id][mod].probability } catch (error) { console.log("branch probability not present, random branch taken"); probability = Math.random() } let branch = (probability >= 0.5) ? metadata['success'] : metadata['fail'] let branchMap = JSON.parse(JSON.stringify(map[branch])) delete branchMap['type'] console.log("success probability", probability, "taking branch: ", branch); speculative_deployment(chain_id, aliases, branchMap) } continue } if (constants.JIT_deployment) { console.log(mod, metadata, aliases[mod].alias); let url = metricsDB + aliases[mod].alias let data = libSupport.fetchData(url) getData.push(data) } else { notify(metadata.runtime, aliases[mod].alias) } } if (constants.JIT_deployment) { Promise.all(getData).then((values) => { let dataMap = new Map() for (const data of values) { dataMap[data._id] = data } console.log("line 361", done, toBeDone); if (done === undefined) { console.log("new map"); done = new Map() } if (toBeDone === undefined) { toBeDone = new Set() } // var plannerMap = new Map(map) do { for (const [mod, metadata] of Object.entries(map)) { if (metadata.type !== 'function' && metadata.type !== 'conditional') { continue } if (metadata.wait_for.length == 0 && done[mod] === undefined) { /** * expecting the first ones to run to be hit by coldstarts */ done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart // delete plannerMap[mod]; } else if (done[mod] === undefined) { let flag = true let maxWait = 0 for (const dependency of metadata.wait_for) { if (done[dependency] === undefined) { flag = false break } else if (maxWait < done[dependency]) maxWait = done[dependency] } if (flag) { if (metadata.type === 'conditional') { console.log("setting notification for conditional", mod); let probability try { probability = conditionProbabilityExplicit[chain_id][mod].probability } catch (error) { console.log("branch probability not present, random branch taken"); probability = Math.random() } let branch = (probability >= 0.5)? metadata['success']: metadata['fail'] let branchMap = JSON.parse(JSON.stringify(map[branch])) delete branchMap['type'] console.log("success probability", probability, "taking branch: ", branch); speculative_deployment(chain_id, aliases, branchMap, maxWait, done, toBeDone) done[mod] = maxWait } else { console.log("notification set", mod); let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ? maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0 notifyTime += offset console.log(mod, "max wait", maxWait, "notify time:", notifyTime); setTimeout(notify, notifyTime, metadata.runtime, aliases[mod].alias) done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart } if (toBeDone.has(mod)) delete toBeDone[mod] // delete plannerMap[mod] } else { toBeDone.add(mod) } } console.log("done", done); } } while (toBeDone.size != 0) }) } } } function readMap(filename, alias = false) { return new Promise((resolve, reject) => { fs.readFile(filename, (err, blob) => { if (err) reject(err) else { const data = JSON.parse(blob) if (alias) { for (const [key, functionHash] of Object.entries(data)) { data[key] = { alias: functionHash, status: "waiting" } // libSupport.fetchData(metricsDB + functionHash, null) // .then(metrics => { // data[key] // }) } } resolve(data) } }) }) } function notify(runtime, functionHash) { console.log("check map: ", functionToResource.has(functionHash + runtime)); if (!functionToResource.has(functionHash + runtime) && !db.has(functionHash + runtime)) { let payload = [{ topic: constants.topics.hscale, messages: JSON.stringify({ runtime, functionHash }) }] libSupport.producer.send(payload, function () { }) } else { console.log("resource already present: skipping speculation"); } } function createDirectory(path) { return new Promise((resolve, reject) => { if (!fs.existsSync(path)) { fs.mkdir(path, err => { if (err) reject(); resolve(); }) } else { resolve(); } }) } module.exports = { router }