'use strict'; const express = require('express') const libSupport = require('./lib') const router = express.Router() const fs = require('fs') const { spawn } = require('child_process') const fetch = require('node-fetch') const constants = require('../constants.json') const operator = require('./operator') const sharedMeta = require('./shared_meta') const util = require('util') const nanoid = require('nanoid') const logger = libSupport.logger const registry_url = constants.registry_url let functionToResource = sharedMeta.functionToResource, db = sharedMeta.db, conditionProbabilityExplicit = sharedMeta.conditionProbabilityExplicit, metricsDB = sharedMeta.metricsDB, metadataDB = sharedMeta.metadataDB, explicitChainDB = sharedMeta.explicitChainDB router.post('/deploy', (req, res) => { // let runtime = req.body.runtime let files = req.files const chain_id = libSupport.makeid(constants.id_size) const file_path = __dirname + "/repository/" let aliases = {} let deployHandles = [] createDirectory(file_path).then(() => { for (const [file_alias, file] of Object.entries(files)) { let functionHash = file.md5 if (file_alias === 'map') { file.mv(file_path + 'map' + chain_id + ".json") continue } // aliases[file_alias] = functionHash deployHandles.push(deploy(file_path, functionHash, file, aliases, file_alias)) } console.log("aliases", aliases); Promise.all(deployHandles).then(() => { console.log("done"); fs.writeFile(file_path + `aliases${chain_id}.json`, JSON.stringify(aliases, null, 2), function(err) { res.json({ status: "success", function_id: chain_id }) }) }).catch(err => { res.json({ status: "error", reason: err }).status(400) }) }) }) async function deploy(file_path, functionHash, file, aliases, file_alias) { let runtime = "container", memory = 330 try { await moveFile(file, file_path, functionHash) functionHash = libSupport.generateExecutor(file_path, functionHash) aliases[file_alias] = functionHash /** * Adding meta caching via couchdb * This will create / update function related metadata like resource limits etc * on a database named "serverless". */ let res = await fetch(metadataDB + functionHash) let json = await res.json() console.log(json); if (json.error === "not_found") { logger.warn("New function, creating metadata") await fetch(metadataDB + functionHash, { method: 'put', body: JSON.stringify({ memory: memory }), headers: { 'Content-Type': 'application/json' }, }) // let json = await res.json() // console.log(json) } else { logger.warn('Repeat deployment, updating metadata') try { await fetch(metadataDB + functionHash, { method: 'put', body: JSON.stringify({ memory: memory, _rev: json._rev }), headers: { 'Content-Type': 'application/json' }, }) // let json = await res.json() // console.log(json) } catch (err) { console.log(err); } } if (runtime === "container") { try { await deployContainer(file_path, functionHash) console.log("called"); return Promise.resolve(functionHash) } catch(err) { return Promise.reject(err) } } else { return Promise.resolve(functionHash) } } catch (err) { logger.error(err) return Promise.reject(err) } } function moveFile(file, file_path, functionHash) { return new Promise((resolve, reject) =>{ file.mv(file_path + functionHash, function (err) { if (err) reject(err) resolve() }) }) } async function deployContainer(path, imageName) { return new Promise((resolve, reject) => { let buildStart = Date.now() fs.writeFile('./repository/Dockerfile' + imageName, `FROM node:latest WORKDIR /app COPY ./worker_env/package.json /app ADD ./worker_env/node_modules /app/node_modules COPY ${imageName}.js /app ENTRYPOINT ["node", "${imageName}.js"]` , function (err) { if (err) { logger.error("failed", err); reject(err); } else { logger.info('Dockerfile created'); const process = spawn('docker', ["build", "-t", registry_url + imageName, path, "-f", path + `Dockerfile${imageName}`]); process.stdout.on('data', (data) => { logger.info(`stdout: ${data}`); }); process.stderr.on('data', (data) => { logger.error(`stderr: ${data}`); }); process.on('close', (code) => { logger.warn(`child process exited with code ${code}`); let timeDifference = Math.ceil((Date.now() - buildStart)) logger.info("image build time taken: ", timeDifference); const process_push = spawn('docker', ["push", registry_url + imageName]); process_push.stdout.on('data', (data) => { console.log(`stdout: ${data}`); }); process_push.stderr.on('data', (data) => { logger.error(`stderr: ${data}`); }); process_push.on('close', (code) => { logger.info("image pushed to repository"); resolve(); }) }); } }); }) } router.post('/execute/:id', async (req, res) => { let map, aliases let chain_id = req.params.id let request_id = nanoid.nanoid() console.log("Request ID", request_id); libSupport.fetchData(explicitChainDB + chain_id) .then(chainData => { console.log(chainData); let path = { path: [], onPath: true, dependency: {}, level: 0 } if (chainData.error !== "not_found") conditionProbabilityExplicit[chain_id] = chainData else conditionProbabilityExplicit[chain_id] = {} if (req.files && req.files.map) { map = JSON.parse(req.files.map.data.toString()); readMap(`./repository/aliases${chain_id}.json`, true) .then(async data => { aliases = data let payload = JSON.parse(req.body.data) if (chainData.error != "not_found") await speculative_deployment(chain_id, aliases, chainData, request_id); orchestrator(chain_id, res, payload, map, aliases, {}, path. request_id); }) } else { readMap(`./repository/map${chain_id}.json`) .then(data => { map = data readMap(`./repository/aliases${chain_id}.json`, true) .then(async data => { aliases = data let payload = JSON.parse(req.body.data) if (chainData.error != "not_found") await speculative_deployment(chain_id, aliases, chainData, request_id); orchestrator(chain_id, res, payload, map, aliases, {}, path, request_id); }) }) } }) }) /** * Orchestrator function to execute a chain and return the result to the invoker * @param {string} chain_id ID of the chain to be executed * @param {JSON} res response object * @param {JSON} payload data to be passed to the chain * @param {JSON} map holds the chain map * @param {JSON} aliases internal alias to function chain mapping * @param {JSON} result result obtained after chain executes */ async function orchestrator(chain_id, res, payload, map, aliases, result, path, request_id) { /** * Adding dependencies on MLE path to a map * for fast lookup during speculation */ for (const [functionName, metadata] of Object.entries(map)) { if (metadata.type === "function" || metadata.type === "conditional") { if (path.dependency[functionName] === undefined) path.dependency[functionName] = JSON.parse(JSON.stringify(metadata.wait_for)) } } if (Object.keys(map).length == 0) { console.log("time to resolve", result); res.json(result) if (path.onPath) conditionProbabilityExplicit[chain_id]["path"] = path let payload = { method: 'put', body: JSON.stringify(conditionProbabilityExplicit[chain_id]), headers: { 'Content-Type': 'application/json' } } libSupport.fetchData(explicitChainDB + chain_id, payload) console.log("detected path", util.inspect(path, false, null, true /* enable colors */)); // return resolve(result) } else { for (const [functionName, metadata] of Object.entries(map)) { if (metadata.type === "function" && metadata.wait_for.length == 0) { let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}` console.log(url); let data = { method: 'post', body: JSON.stringify({ runtime: metadata.runtime, payload }), headers: { 'Content-Type': 'application/json', 'x-chain-type': 'explicit' } } delete map[functionName] aliases[functionName].status = "running" if (typeof path.path[path.level] === 'undefined') { path.path[path.level] = [] } path.path[path.level].push({functionName, type: "function", runtime: metadata.runtime, id: aliases[functionName].alias}) /** * The function node is cleared to run. * Setting status in Redis. * Dispatching function */ sharedMeta.client.sadd(request_id, aliases[functionName].alias) libSupport.fetchData(url, data) .then(json => { // console.log(json); result[functionName] = json sharedMeta.client.srem(request_id, aliases[functionName].alias) aliases[functionName].status = "done" let branchMap = null, flag = false for (const [_key, metadata] of Object.entries(map)) { if (metadata.type === "function" || metadata.type === "conditional") { let index = metadata.wait_for.indexOf(functionName) if (index >= 0) metadata.wait_for.splice(index, 1); if (metadata.wait_for.length == 0) flag = true // something is runnable } if (metadata.type === "conditional" && metadata.wait_for.length == 0) { let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result) if (conditionProbabilityExplicit[chain_id] === undefined) conditionProbabilityExplicit[chain_id] = {} if (conditionProbabilityExplicit[chain_id][_key] === undefined) conditionProbabilityExplicit[chain_id][_key] = { request_count: 0, probability: 0 } let oldProbability = conditionProbabilityExplicit[chain_id][_key].probability let updateProbability = (conditionResult === 'success') ? 1.0 : 0.0 conditionProbabilityExplicit[chain_id][_key].probability = oldProbability * conditionProbabilityExplicit[chain_id][_key].request_count + updateProbability conditionProbabilityExplicit[chain_id][_key].request_count++ conditionProbabilityExplicit[chain_id][_key].probability /= conditionProbabilityExplicit[chain_id][_key].request_count console.log(conditionResult, "probability table", conditionProbabilityExplicit); let branchToTake = metadata[conditionResult] branchMap = map[branchToTake] delete map[_key] makeBranchRunnable(branchMap, aliases) if ((conditionResult === 'success') && conditionProbabilityExplicit[chain_id][_key].probability < 0.5 || (conditionResult !== 'success') && conditionProbabilityExplicit[chain_id][_key].probability > 0.5) { path.onPath = false console.log("out of path"); } path.level++ if (typeof path.path[path.level] === 'undefined') { path.path[path.level] = [] } path.path[path.level].push({functionName: _key, type: "condition"}) } } if (flag) path.level++ orchestrator(chain_id, res, payload, (branchMap == null)? map: branchMap, aliases, result, path, request_id) }) } } } } /** * Make the branch runnable by removing redundant dependencies in the map * which have already executed * @param {JSON} branchMap sub map of the chain holding the branch to be executed * @param {JSON} aliases internal alias to function chain mapping */ function makeBranchRunnable(branchMap, aliases) { delete branchMap['type'] for (const [_key, metadata] of Object.entries(branchMap)) { if (metadata.type === "function" || metadata.type === "conditional") { let wait_for = [] for (const dependent of metadata.wait_for) { if (aliases[dependent].status !== "done") wait_for.push(dependent) metadata.wait_for = wait_for } } } } function checkCondition(op1, op2, op, result) { op1 = op1.split(".") let data = result[op1[0]][op1[1]] return (operator[op](data, op2))? "success": "fail" } async function speculative_deployment(chain_id, aliases, chainData, request_id) { // console.log("chainData", util.inspect(chainData, false, null, true /* enable colors */)); let plan = [] let path = chainData.path.path if (constants.speculative_deployment) { for (let i = 0; i < path.length; i++) { for (const node of path[i]) { if (node.type === "function") { node.id = aliases[node.functionName].alias node.invokeTime = 0 // console.log(node); plan.push(node) } } } if (constants.JIT_deployment) { let conditionalDelay = 0, metricsData = new Map(), delayMap = new Map() let data = await libSupport.fetchData(metricsDB + "_bulk_get", { method: 'post', body: JSON.stringify({ docs: plan }), headers: { 'Content-Type': 'application/json' }, }) data = data.results for (let i = 0; i < path.length; i++) { let id = data[i].id metricsData[id] = data[i].docs[0].ok } // console.log(metricsData); for (let i = 0; i < path.length; i++) { for (const node of path[i]) { let maxDelay = conditionalDelay for (const dependency of chainData.path.dependency[node.functionName]) { if (delayMap.get(dependency) > maxDelay) maxDelay = delayMap.get(dependency) } if (maxDelay == 0) { maxDelay += (node.type === "function")? metricsData[node.id][node.runtime].coldstart: 0 delayMap.set(node.functionName, maxDelay) } else { if (node.type === "function") node.invokeTime = maxDelay - metricsData[node.id][node.runtime].starttime maxDelay += (node.type === "function")? metricsData[node.id][node.runtime].warmstart: 0 delayMap.set(node.functionName, maxDelay) } if (node.type === "condition") conditionalDelay = maxDelay } } console.log("delay map", delayMap); console.log("notifcation plan", plan); } await fetch(`http://${constants.master_address}:${constants.rm_port}`, { method: 'post', body: JSON.stringify({ plan }), headers: { 'Content-Type': 'application/json' }, }) let counter = 0, maxCount = plan.length * constants.aggressivity for (const node of plan) { if (counter > maxCount) break console.log("notification set for", node.functionName); setTimeout(notify, node.invokeTime, node.runtime, node.id) counter++ } } } function readMap(filename, alias = false) { return new Promise((resolve, reject) => { fs.readFile(filename, (err, blob) => { if (err) reject(err) else { const data = JSON.parse(blob) if (alias) { for (const [key, functionHash] of Object.entries(data)) { data[key] = { alias: functionHash, status: "waiting" } // libSupport.fetchData(metricsDB + functionHash, null) // .then(metrics => { // data[key] // }) } } resolve(data) } }) }) } function notify(runtime, functionHash) { // console.log("check map: ", functionToResource.has(functionHash + runtime)); if (!functionToResource.has(functionHash + runtime) && !db.has(functionHash + runtime)) { let payload = [{ topic: constants.topics.hscale, messages: JSON.stringify({ runtime, functionHash }) }] libSupport.producer.send(payload, function () { }) } else { console.log("resource already present: skipping speculation"); } } function createDirectory(path) { return new Promise((resolve, reject) => { if (!fs.existsSync(path)) { fs.mkdir(path, err => { if (err) reject(); resolve(); }) } else { resolve(); } }) } module.exports = { router, notify }