diff --git a/.gitignore b/.gitignore index ef02131422ef608bfa1ea78b9c78ee4337d49803..3cbcc57f383c96490f6b8c956f0609e0f4c4afaf 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ firecracker* secrets.json resource_system/bin/** resource_system/version.linux +local_experiments/ diff --git a/dispatch_system/constants.json b/dispatch_system/constants.json index dd179aad76df40a4e33ffd05003b89101fcb9406..6fa3cf2d77d82c8dd768b2f98618ffb96b3dfeee 100644 --- a/dispatch_system/constants.json +++ b/dispatch_system/constants.json @@ -18,5 +18,6 @@ "autoscalar_metrics": { "open_request_threshold": 100 }, - "speculative_deployment": true + "speculative_deployment": true, + "id_size": 20 } \ No newline at end of file diff --git a/dispatch_system/dispatch_daemon/config.json b/dispatch_system/dispatch_daemon/config.json index 56e568599cd2d673c5adda89df9d5db3be08483b..85e797b6bf7666aa6bdcd686133d9e1a40be0541 100644 --- a/dispatch_system/dispatch_daemon/config.json +++ b/dispatch_system/dispatch_daemon/config.json @@ -1 +1 @@ -{"id":"192.168.31.51","master_node":"10.129.6.5"} \ No newline at end of file +{"id":"10.196.6.51","master_node":"10.129.6.5"} \ No newline at end of file diff --git a/dispatch_system/dispatch_daemon/index.js b/dispatch_system/dispatch_daemon/index.js index 8f910be92ac573c07d2fc32882cbae006e4fe1c9..aff9313d182157037afa2ea78a75a0b7b6d11913 100644 --- a/dispatch_system/dispatch_daemon/index.js +++ b/dispatch_system/dispatch_daemon/index.js @@ -77,9 +77,9 @@ libSupport.makeTopic(node_id).then(() => { /** * download and start grunt */ -libSupport.download(constants.grunt_host, "grunt").then(() => { +libSupport.download(constants.grunt_host, "grunt", false).then(() => { logger.info("Downloaded grunt binary from repository") - fs.chmod('grunt', 0o555, (err) => { + fs.chmod('grunt', 0o755, (err) => { logger.info("grunt made executable. Starting grunt") let grunt = spawn('./grunt', [node_id]) grunt.stdout.on('data', data => { diff --git a/dispatch_system/dispatch_daemon/lib.js b/dispatch_system/dispatch_daemon/lib.js index ea607fd9d9f98d8841afe3e19f4a44ace3778922..167de7e6ededf919a84b06f8010b69f11c910f8a 100644 --- a/dispatch_system/dispatch_daemon/lib.js +++ b/dispatch_system/dispatch_daemon/lib.js @@ -1,4 +1,4 @@ -const http = require('http'); +const fetch = require('node-fetch'); const fs = require('fs'); const process = require('process') const { spawnSync } = require('child_process'); @@ -50,28 +50,48 @@ function makeTopic(id) { }) } -var download = function (url, dest, cb) { - return new Promise((resolve, reject) => { +// var download = function (url, dest, check = true, cb) { +// return new Promise((resolve, reject) => { +// console.log(url); +// if (!check || !fs.existsSync(dest)) { +// var file = fs.createWriteStream(dest); +// var request = https.get(url, function (response) { +// response.pipe(file); +// file.on('finish', function () { +// file.close(cb); // close() is async, call cb after close completes. +// resolve(); +// }); +// }).on('error', function (err) { // Handle errors +// fs.unlink(dest); // Delete the file async. (But we don't check the result) +// logger.error("download failed" + err.message); + +// if (cb) cb(err.message); +// reject(err); +// }); +// } else { +// resolve(); +// } +// }) + +// }; + +const download = (async (url, path, check = true) => { + + if (!check || !fs.existsSync(path)) { console.log(url); - if (!fs.existsSync(dest)) { - var file = fs.createWriteStream(dest); - var request = https.get(url, function (response) { - response.pipe(file); - file.on('finish', function () { - file.close(cb); // close() is async, call cb after close completes. - resolve(); - }); - }).on('error', function (err) { // Handle errors - fs.unlink(dest); // Delete the file async. (But we don't check the result) - if (cb) cb(err.message); + const res = await fetch(url); + const fileStream = fs.createWriteStream(path); + await new Promise((resolve, reject) => { + res.body.pipe(fileStream); + res.body.on("error", (err) => { reject(err); }); - } else { - resolve(); - } - }) - -}; + fileStream.on("finish", function () { + resolve(); + }); + }); + } +}); function makeid(length) { var result = ''; diff --git a/dispatch_system/dispatch_daemon/package.json b/dispatch_system/dispatch_daemon/package.json index ada679c36fd79ee2a0e253165509095c12afe4ed..e22c51acbedc006875a442d18d89b3e34b684b90 100644 --- a/dispatch_system/dispatch_daemon/package.json +++ b/dispatch_system/dispatch_daemon/package.json @@ -16,6 +16,7 @@ "kafka-node": "^5.0.0", "morgan": "^1.9.1", "mqtt": "^3.0.0", + "node-fetch": "^2.6.0", "redis": "^2.8.0", "request": "^2.88.2", "winston": "^3.2.1" diff --git a/dispatch_system/dispatch_manager/explicit_chain_handler.js b/dispatch_system/dispatch_manager/explicit_chain_handler.js new file mode 100644 index 0000000000000000000000000000000000000000..5b0959c1693b4ced74268578b3272575e1bf7bb4 --- /dev/null +++ b/dispatch_system/dispatch_manager/explicit_chain_handler.js @@ -0,0 +1,293 @@ +'use strict'; +const express = require('express') +const libSupport = require('./lib') +const router = express.Router() +const fs = require('fs') +const { spawn } = require('child_process') +const fetch = require('node-fetch') +const constants = require('../constants.json') +const secrets = require('./secrets.json') +let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` +metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/" +const logger = libSupport.logger + +const registry_url = constants.registry_url + +router.post('/deploy', (req, res) => { + + // let runtime = req.body.runtime + let files = req.files + + const chain_id = libSupport.makeid(constants.id_size) + const file_path = __dirname + "/repository/" + let aliases = {} + let deployHandles = [] + createDirectory(file_path).then(() => { + + for (const [file_alias, file] of Object.entries(files)) { + let functionHash = file.md5 + if (file_alias === 'map') { + file.mv(file_path + 'map' + chain_id + ".json") + continue + } + // aliases[file_alias] = functionHash + deployHandles.push(deploy(file_path, functionHash, file, aliases, file_alias)) + } + + console.log("aliases", aliases); + Promise.all(deployHandles).then(() => { + console.log("done"); + fs.writeFile(file_path + `aliases${chain_id}.json`, JSON.stringify(aliases, null, 2), function(err) { + res.json({ + status: "success", + function_id: chain_id + }) + }) + + }).catch(err => { + res.json({ + status: "error", + reason: err + }).status(400) + }) + }) + +}) + +async function deploy(file_path, functionHash, file, aliases, file_alias) { + let runtime = "container", memory = 330 + try { + await moveFile(file, file_path, functionHash) + functionHash = libSupport.generateExecutor(file_path, functionHash) + aliases[file_alias] = functionHash + /** + * Adding meta caching via couchdb + * This will create / update function related metadata like resource limits etc + * on a database named "serverless". + */ + let res = await fetch(metadataDB + functionHash) + let json = await res.json() + console.log(json); + + if (json.error === "not_found") { + logger.warn("New function, creating metadata") + await fetch(metadataDB + functionHash, { + method: 'put', + body: JSON.stringify({ + memory: memory + }), + headers: { 'Content-Type': 'application/json' }, + }) + // let json = await res.json() + // console.log(json) + } else { + logger.warn('Repeat deployment, updating metadata') + try { + await fetch(metadataDB + functionHash, { + method: 'put', + body: JSON.stringify({ + memory: memory, + _rev: json._rev + }), + headers: { 'Content-Type': 'application/json' }, + }) + // let json = await res.json() + // console.log(json) + } catch (err) { + console.log(err); + + } + } + + if (runtime === "container") { + try { + await deployContainer(file_path, functionHash) + console.log("called"); + return Promise.resolve(functionHash) + } catch(err) { + return Promise.reject(err) + } + } else { + return Promise.resolve(functionHash) + } + } catch (err) { + logger.error(err) + return Promise.reject(err) + } +} + +function moveFile(file, file_path, functionHash) { + return new Promise((resolve, reject) =>{ + file.mv(file_path + functionHash, function (err) { + if (err) + reject(err) + resolve() + }) + }) +} + +async function deployContainer(path, imageName) { + return new Promise((resolve, reject) => { + let buildStart = Date.now() + + fs.writeFile('./repository/Dockerfile' + imageName, + `FROM node:latest + WORKDIR /app + COPY ./worker_env/package.json /app + ADD ./worker_env/node_modules /app/node_modules + COPY ${imageName}.js /app + ENTRYPOINT ["node", "${imageName}.js"]` + , function (err) { + if (err) { + logger.error("failed", err); + + + reject(err); + } + else { + logger.info('Dockerfile created'); + const process = spawn('docker', ["build", "-t", registry_url + imageName, path, "-f", path + `Dockerfile${imageName}`]); + + process.stdout.on('data', (data) => { + logger.info(`stdout: ${data}`); + + }); + + process.stderr.on('data', (data) => { + logger.error(`stderr: ${data}`); + }); + + process.on('close', (code) => { + logger.warn(`child process exited with code ${code}`); + let timeDifference = Math.ceil((Date.now() - buildStart)) + logger.info("image build time taken: ", timeDifference); + const process_push = spawn('docker', ["push", registry_url + imageName]); + + process_push.stdout.on('data', (data) => { + console.log(`stdout: ${data}`); + + }); + + process_push.stderr.on('data', (data) => { + logger.error(`stderr: ${data}`); + }); + + process_push.on('close', (code) => { + logger.info("image pushed to repository"); + resolve(); + }) + + }); + } + }); + }) +} + +router.post('/execute/:id', (req, res) => { + let map, aliases + // if (req.body.map) + // map = req.body.map + // else { + if (req.files && req.files.map) { + map = JSON.parse(req.files.map.data.toString()); + readMap(`./repository/aliases${req.params.id}.json`) + .then(data => { + aliases = data + let payload = JSON.parse(req.body.data) + console.log(payload); + + orchestrator(res, payload, map, aliases, []) + }) + } else { + readMap(`./repository/map${req.params.id}.json`) + .then(data => { + map = data + readMap(`./repository/aliases${req.params.id}.json`) + .then(data => { + aliases = data + let payload = JSON.parse(req.body.data) + console.log(payload); + + orchestrator(res, payload, map, aliases, []) + }) + }) + } + +}) + +function orchestrator(res, payload, map, aliases, result) { + return new Promise((resolve, reject) => { + console.log("result before run", result); + + if (Object.keys(map).length == 0) { + console.log("time to resolve"); + res.json(result) + // return resolve(result) + } + + else { + for (const [functionName, metadata] of Object.entries(map)) { + // console.log(functionName, metadata, aliases[functionName]); + if (metadata.wait_for.length == 0) { + let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName]}` + console.log(url); + let data = { + method: 'post', + body: JSON.stringify({ + runtime: metadata.runtime, + payload + }), + headers: { 'Content-Type': 'application/json' } + } + delete map[functionName] + fetch(url, data).then(res => res.json()) + .then(json => { + console.log(json); + result.push(json) + for (const [_key, metadata] of Object.entries(map)) { + let index = metadata.wait_for.indexOf(functionName) + + if (index >= 0) + metadata.wait_for.splice(index, 1); + } + console.log(map, "after run"); + orchestrator(res, payload, map, aliases, result) + }) + } + } + // return resolve(result) + } + + // await fetch(constants.master_address) + }) + +} + +function readMap(filename) { + return new Promise((resolve, reject) => { + fs.readFile(filename, (err, data) => { + if (err) + reject(err) + else { + const object = JSON.parse(data) + resolve(object) + } + }) + }) +} + +function createDirectory(path) { + return new Promise((resolve, reject) => { + if (!fs.existsSync(path)) { + fs.mkdir(path, err => { + if (err) + reject(); + resolve(); + }) + } else { + resolve(); + } + }) +} + +module.exports = router; diff --git a/dispatch_system/dispatch_manager/index.js b/dispatch_system/dispatch_manager/index.js index 2b072e6cfc2bc29f9e187f8299a22d7629769557..861ae2c1755189ff6ee3c13000804424e8f9b68c 100644 --- a/dispatch_system/dispatch_manager/index.js +++ b/dispatch_system/dispatch_manager/index.js @@ -1,14 +1,15 @@ "use strict"; -const express = require('express') -const bodyParser = require('body-parser') +const express = require('express'); +const bodyParser = require('body-parser'); const fileUpload = require('express-fileupload'); -const constants = require('.././constants.json') -const secrets = require('./secrets.json') -const fs = require('fs') +const constants = require('.././constants.json'); +const chainHandler = require('./explicit_chain_handler'); +const secrets = require('./secrets.json'); +const fs = require('fs'); const { spawn } = require('child_process'); -const morgan = require('morgan') -const heap = require('heap') +const morgan = require('morgan'); +const heap = require('heap'); const fetch = require('node-fetch'); const swStats = require('swagger-stats'); const apiSpec = require('./swagger.json'); @@ -29,6 +30,7 @@ let usedPort = new Map(), // TODO: remove after integration with RM // resources associated with the function workerNodes = new Map(), // list of worker nodes currently known to the DM functionBranchTree = new Map() // a tree to store function branch predictions + let kafka = require('kafka-node'), Producer = kafka.Producer, @@ -53,14 +55,14 @@ let kafka = require('kafka-node'), app.use(morgan('combined', { skip: function (req, res) { return res.statusCode < 400 } })) -app.use(bodyParser.urlencoded({ extended: true })) -app.use(bodyParser.json()) +app.use(express.json()); +app.use(express.urlencoded({ extended: true })); const file_path = __dirname + "/repository/" app.use('/repository', express.static(file_path)); app.use(fileUpload()) app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); - +app.use('/serverless/chain', chainHandler); let requestQueue = [] const WINDOW_SIZE = 10 @@ -207,10 +209,12 @@ app.post('/serverless/execute/:id', (req, res) => { let runtime = req.body.runtime let id = req.params.id + runtime - + res.timestamp = Date.now() if (functionToResource.has(id)) { + res.start = 'warmstart' libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) } else { + res.start = 'coldstart' /** * Requests are queued up before being dispatched. To prevent requests coming in for the * same function from starting too many workers, they are grouped together @@ -244,7 +248,7 @@ function dispatch() { if (!db.has(functionHash + runtime)) { db.set(functionHash + runtime, []) db.get(functionHash + runtime).push({ req, res }) - let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID + let resource_id = libSupport.makeid(constants.id_size) // each function resource request is associated with an unique ID logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`); logger.info("Requesting RM " + JSON.stringify({ @@ -256,7 +260,7 @@ function dispatch() { resourceMap.set(resource_id, { runtime, functionHash, port: null, node_id: null, - deployed: false + deployed: false, deploy_request_time: Date.now() }) @@ -314,29 +318,32 @@ function dispatch() { } } - +/** + * Handles post deployment metadata updates and starts reverse-proxying + * @param {string} message Message received from DD after deployment + */ function postDeploy(message) { logger.info("Deployed Resource: " + JSON.stringify(message)); - + let id = message.functionHash + message.runtime if (message.status == false) { - let sendQueue = db.get(message.functionHash + message.runtime) + let sendQueue = db.get(id) // TODO: handle failure while (sendQueue && sendQueue.length != 0) { let { req, res } = sendQueue.shift() res.status(400).json({ reason: message.reason }) } - db.delete(message.functionHash + message.runtime) + db.delete(id) return; } - if (functionToResource.has(message.functionHash + message.runtime)) { - let resourceHeap = functionToResource.get(message.functionHash + message.runtime) + if (functionToResource.has(id)) { + let resourceHeap = functionToResource.get(id) heap.push(resourceHeap, { resource_id: message.resource_id, open_request_count: 0 }, libSupport.compare) logger.warn("Horizontally scaling up: " + - JSON.stringify(functionToResource.get(message.functionHash + message.runtime))); + JSON.stringify(functionToResource.get(id))); } else { /** @@ -349,36 +356,24 @@ function postDeploy(message) { resource_id: message.resource_id, open_request_count: 0 }, libSupport.compare) - functionToResource.set(message.functionHash + message.runtime, resourceHeap) + functionToResource.set(id, resourceHeap) logger.warn("Creating new resource pool" - + JSON.stringify(functionToResource.get(message.functionHash + message.runtime))); + + JSON.stringify(functionToResource.get(id))); } try { let resource = resourceMap.get(message.resource_id) resource.deployed = true - let confirmRM = [{ - topic: log_channel, - messages: JSON.stringify({ - resource_id: message.resource_id, - // type: "deployment_launch", - node_id: resource.node_id, - runtime: resource.runtime, - function_id: resource.functionHash, - entity_id: message.entity_id, - "reason": "deployment", - "status": true, - "timestamp": Date.now() - }), - partition: 0 - }] - producer.send(confirmRM, () => { - logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`) - }) + libSupport.logBroadcast({ + entity_id: message.entity_id, + "reason": "deployment", + "status": true, + starttime: (Date.now() - resource.deploy_request_time) + }, message.resource_id, resourceMap) - if (db.has(message.functionHash + message.runtime)) { - let sendQueue = db.get(message.functionHash + message.runtime) + if (db.has(id)) { + let sendQueue = db.get(id) logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource)); while (sendQueue && sendQueue.length != 0) { let { req, res } = sendQueue.shift() @@ -387,8 +382,12 @@ function postDeploy(message) { }) } - db.delete(message.functionHash + message.runtime) + db.delete(id) } + + libSupport.metrics.collectMetrics({type: "scale", value: + functionToResource.get(id).length, + functionHash: id}) } catch (e) { logger.error(e.message) } @@ -428,8 +427,9 @@ consumer.on('message', function (message) { // process.exit(0) } usedPort.delete(message.port) - if (functionToResource.has(message.functionHash + message.runtime)) { - let resourceArray = functionToResource.get(message.functionHash + message.runtime) + let id = message.functionHash + message.runtime + if (functionToResource.has(id)) { + let resourceArray = functionToResource.get(id) for (let i = 0; i < resourceArray.length; i++) if (resourceArray[i].resource_id === message.resource_id) { resourceArray.splice(i, 1); @@ -437,17 +437,28 @@ consumer.on('message', function (message) { } heap.heapify(resourceArray, libSupport.compare) - - resourceMap.delete(message.resource_id) - if (resourceArray.length == 0) - functionToResource.delete(message.functionHash + message.runtime) + libSupport.metrics.collectMetrics({type: "scale", value: + resourceArray.length, + functionHash: id}) + libSupport.logBroadcast({ + entity_id: message.entity_id, + "reason": "terminate", + "total_request": message.total_request, + "status": true + }, message.resource_id, resourceMap) + .then(() => { + resourceMap.delete(message.resource_id) + if (resourceArray.length == 0) + functionToResource.delete(id) + }) } + } else if (topic == constants.topics.hscale) { message = JSON.parse(message) - let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID + let resource_id = libSupport.makeid(constants.id_size), // each function resource request is associated with an unique ID runtime = message.runtime, functionHash = message.functionHash console.log("Resource Status: ", functionToResource); @@ -461,7 +472,7 @@ consumer.on('message', function (message) { resourceMap.set(resource_id, { runtime, functionHash, port: null, node_id: null, - deployed: false + deployed: false, deploy_request_time: Date.now() }) @@ -531,7 +542,18 @@ function autoscalar() { } +function periodicMetricBroadcast() { + let message = {}, flag = false + functionToResource.forEach((functionHeap, functionHash) => { + if (functionHeap.length > 0) { + message[functionHash] = functionHeap.length + libSupport.metrics.collectMetrics({type: "scale", value: functionHeap.length, functionHash: functionHash}) + } + }) +} + setInterval(libSupport.viterbi, 1000, functionBranchTree) setInterval(autoscalar, 1000); setInterval(dispatch, 1000); +// setInterval(periodicMetricBroadcast, 5000) app.listen(port, () => logger.info(`Server listening on port ${port}!`)) \ No newline at end of file diff --git a/dispatch_system/dispatch_manager/lib.js b/dispatch_system/dispatch_manager/lib.js index 8a26d2efe2b9584fbcabc2cab77591a0a4e6b4a0..c46ced910000a9abbbc3b20625635866a9e56528 100644 --- a/dispatch_system/dispatch_manager/lib.js +++ b/dispatch_system/dispatch_manager/lib.js @@ -4,10 +4,18 @@ const rp = require('request-promise'); const fetch = require('node-fetch'); const winston = require('winston') const constants = require('.././constants.json') - +const metrics = require('./metrics') const { createLogger, format, transports } = winston; const heap = require('heap') +let kafka = require('kafka-node'), + Producer = kafka.Producer, + client = new kafka.KafkaClient({ + kafkaHost: constants.kafka_host, + autoConnect: true + }), + producer = new Producer(client) + /** * Generates unique IDs of arbitrary length * @param {Length of the ID} length @@ -77,16 +85,19 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT rp(options) .then(function (parsedBody) { + let serviceTime = Date.now() - res.timestamp res.json(parsedBody) forwardTo.open_request_count -= 1 heap.heapify(functionHeap, compare) + + metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: id}) resolve() }) .catch(function (err) { forwardTo.open_request_count -= 1 heap.heapify(functionHeap, compare) - logger.error("error" + err.error.errno); + logger.error("error" + err); res.json(err.message).status(err.statusCode) resolve() }); @@ -257,12 +268,40 @@ function viterbi(functionBranchTree) { metadata.mle_path = path } }); - +} + +function logBroadcast(message, resource_id, resourceMap) { + return new Promise((resolve, reject) => { + try { + + message.timestamp = Date.now() + if (resource_id && resourceMap.has(resource_id)) { + let resource = resourceMap.get(resource_id) + message.resource_id = resource_id + message.node_id = resource.node_id + message.runtime = resource.runtime + message.function_id = resource.functionHash + } + let log = [{ + topic: constants.log_channel, + messages: JSON.stringify(message), + partition: 0 + }] + producer.send(log, () => { + resolve() + }) + } catch (err) { + console.log(err); + reject() + } + }) } +setInterval(metrics.broadcastMetrics, 5000) + module.exports = { makeid, generateExecutor, reverseProxy, getPort, logger, compare, - viterbi + viterbi, logBroadcast, metrics } \ No newline at end of file diff --git a/dispatch_system/dispatch_manager/metrics.js b/dispatch_system/dispatch_manager/metrics.js new file mode 100644 index 0000000000000000000000000000000000000000..dcaecf5e04974a4817f9dae7ce1b9542eeb019b9 --- /dev/null +++ b/dispatch_system/dispatch_manager/metrics.js @@ -0,0 +1,98 @@ +const constants = require('.././constants.json'); + +let log_channel = constants.log_channel, + metrics = { } + +let kafka = require('kafka-node'), + Producer = kafka.Producer, + client = new kafka.KafkaClient({ + kafkaHost: constants.kafka_host, + autoConnect: true + }), + producer = new Producer(client) + +function collectMetrics(metric) { + + if (!(metric.functionHash in metrics)) { + metrics[metric.functionHash] = { + shortterm: { + coldstart: 0, + coldstart_total_request: 0, + warm_total_request: 0, + warmstart: 0, + worker_count: 0 + } + } + } + if (metric.type === 'coldstart') { + metrics[metric.functionHash].shortterm.coldstart += metric.value + metrics[metric.functionHash].shortterm.coldstart_total_request += 1 + } else if (metric.type === 'warmstart') { + metrics[metric.functionHash].shortterm.warmstart += metric.value + metrics[metric.functionHash].shortterm.warm_total_request += 1 + } else if (metric.type === 'scale') { + metrics[metric.functionHash].worker_count = metric.value + } + +} + +function broadcastMetrics() { + + if (Object.keys(metrics).length !== 0) { + for (let [functionHash, metric] of Object.entries(metrics)) { + + if (metric.longterm === undefined) { + metric.longterm = { + coldstart: 0, + coldstart_total_request: 0, + warm_total_request: 0, + warmstart: 0 + } + } + + metric.longterm.coldstart = metric.longterm.coldstart + * metric.longterm.coldstart_total_request + + metric.shortterm.coldstart + + metric.longterm.coldstart_total_request += metric.shortterm.coldstart_total_request + metric.longterm.coldstart /= (metric.longterm.coldstart_total_request != 0)? + metric.longterm.coldstart_total_request: 1 + + metric.longterm.warmstart = metric.longterm.warmstart + * metric.longterm.warm_total_request + + metric.shortterm.warmstart + metric.longterm.warm_total_request += metric.shortterm.warm_total_request + metric.longterm.warmstart /= (metric.longterm.warm_total_request != 0)? + metric.longterm.warm_total_request: 1 + + metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0)? + metric.shortterm.coldstart_total_request: 1 + metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0)? + metric.shortterm.warm_total_request: 1 + metric.timestamp = Date.now() + } + + let log = [{ + topic: log_channel, + messages: JSON.stringify({ + metrics + }), + partition: 0 + }] + producer.send(log, () => { }) + + for (let [functionHash, metric] of Object.entries(metrics)) { + metric.shortterm = { + coldstart: 0, + coldstart_total_request: 0, + warm_total_request: 0, + warmstart: 0, + worker_count: 0 + } + } + } +} + +module.exports = { + collectMetrics, broadcastMetrics +} \ No newline at end of file diff --git a/dispatch_system/dispatch_manager/repository/worker_env/env.js b/dispatch_system/dispatch_manager/repository/worker_env/env.js index 176314426e5dbad0ea9ba5ba646829e51e9afdde..d64caa969514394deee484dce1a7b030c7c0ab98 100644 --- a/dispatch_system/dispatch_manager/repository/worker_env/env.js +++ b/dispatch_system/dispatch_manager/repository/worker_env/env.js @@ -5,7 +5,7 @@ let request = require('request') const process = require('process') const app = express() -let port = 5000, resource_id, functionHash, runtime, idleTime = 30 +let port = 5000, resource_id, functionHash, runtime, idleTime = 300 resource_id = process.argv[2] functionHash = process.argv[3] @@ -36,13 +36,15 @@ app.post('/serverless/function/execute/', (req, res) => { }) }) -app.post('/serverless/worker/timeout', (req, res) => { +app.post('/serverless/function/timeout', (req, res) => { idleTime = req.body.timeout console.log("Idle time set to: ", idleTime); - + res.json({ + status: "success" + }) }) -function executor(payload) { +async function executor(payload) { return new Promise((resolve, reject) => { })