Commit 0198dbc7 authored by Nilanjan Daw's avatar Nilanjan Daw

Merge branch 'explicit_function_chaining'

parents 9d5f6ec9 db8d9fb6
...@@ -3,9 +3,10 @@ ...@@ -3,9 +3,10 @@
"master_port": 8080, "master_port": 8080,
"master_address": "localhost", "master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON",
"couchdb_host": "localhost:5984", "couchdb_host": "localhost:5984",
"couchdb_db_name": "serverless", "function_db_name": "serverless",
"metrics_db_name": "metrics",
"implicit_chain_db_name": "implicit_chain",
"network": { "network": {
"network_bridge": "hybrid_kafka-serverless", "network_bridge": "hybrid_kafka-serverless",
"internal": { "internal": {
...@@ -21,11 +22,13 @@ ...@@ -21,11 +22,13 @@
"deployed": "deployed", "deployed": "deployed",
"remove_worker": "removeWorker", "remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY", "response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale" "hscale": "hscale",
"log_channel": "LOG_COMMON"
}, },
"autoscalar_metrics": { "autoscalar_metrics": {
"open_request_threshold": 100 "open_request_threshold": 100
}, },
"speculative_deployment": true, "speculative_deployment": false,
"JIT_deployment": false,
"id_size": 20 "id_size": 20
} }
...@@ -11,7 +11,7 @@ const fs = require('fs') ...@@ -11,7 +11,7 @@ const fs = require('fs')
const fetch = require('node-fetch'); const fetch = require('node-fetch');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/" metadataDB = metadataDB + "/" + constants.function_db_name + "/"
const kafka = require('kafka-node') const kafka = require('kafka-node')
const logger = libSupport.logger const logger = libSupport.logger
......
...@@ -7,8 +7,13 @@ const { spawn } = require('child_process') ...@@ -7,8 +7,13 @@ const { spawn } = require('child_process')
const fetch = require('node-fetch') const fetch = require('node-fetch')
const constants = require('../constants.json') const constants = require('../constants.json')
const secrets = require('./secrets.json') const secrets = require('./secrets.json')
const operator = require('./operator')
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/" metadataDB = metadataDB + "/" + constants.function_db_name + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const logger = libSupport.logger const logger = libSupport.logger
const registry_url = constants.registry_url const registry_url = constants.registry_url
...@@ -190,92 +195,220 @@ router.post('/execute/:id', (req, res) => { ...@@ -190,92 +195,220 @@ router.post('/execute/:id', (req, res) => {
// else { // else {
if (req.files && req.files.map) { if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString()); map = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${req.params.id}.json`) let mapPlanner = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${req.params.id}.json`, true)
.then(data => { .then(data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
console.log(payload); console.log(payload);
speculative_deployment(aliases, mapPlanner);
orchestrator(res, payload, map, aliases, []) orchestrator(res, payload, map, aliases, {})
}) })
} else { } else {
readMap(`./repository/map${req.params.id}.json`) readMap(`./repository/map${req.params.id}.json`)
.then(data => { .then(data => {
map = data map = data
readMap(`./repository/aliases${req.params.id}.json`) let mapPlanner = JSON.parse(JSON.stringify(map))
readMap(`./repository/aliases${req.params.id}.json`, true)
.then(data => { .then(data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
console.log(payload); speculative_deployment(aliases, mapPlanner);
orchestrator(res, payload, map, aliases, {})
orchestrator(res, payload, map, aliases, [])
}) })
}) })
} }
}) })
function orchestrator(res, payload, map, aliases, result) { async function orchestrator(res, payload, map, aliases, result) {
return new Promise((resolve, reject) => {
console.log("result before run", result);
if (Object.keys(map).length == 0) { if (Object.keys(map).length == 0) {
console.log("time to resolve"); console.log("time to resolve", result);
res.json(result) res.json(result)
// return resolve(result) // return resolve(result)
} }
else {
for (const [functionName, metadata] of Object.entries(map)) {
// console.log(functionName, metadata, aliases[functionName]);
// console.log(metadata);
else { if (metadata.type === "function" && metadata.wait_for.length == 0) {
for (const [functionName, metadata] of Object.entries(map)) { let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}`
// console.log(functionName, metadata, aliases[functionName]); console.log(url);
if (metadata.wait_for.length == 0) { let data = {
let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName]}` method: 'post',
console.log(url); body: JSON.stringify({
let data = { runtime: metadata.runtime,
method: 'post', payload
body: JSON.stringify({ }),
runtime: metadata.runtime, headers: { 'Content-Type': 'application/json' }
payload }
}), delete map[functionName]
headers: { 'Content-Type': 'application/json' } aliases[functionName].status = "running"
}
delete map[functionName] fetch(url, data).then(res => res.json())
fetch(url, data).then(res => res.json()) .then(json => {
.then(json => { // console.log(json);
console.log(json); result[functionName] = json
result.push(json)
for (const [_key, metadata] of Object.entries(map)) { aliases[functionName].status = "done"
let index = metadata.wait_for.indexOf(functionName) let branchMap = null
for (const [_key, metadata] of Object.entries(map)) {
if (metadata.type === "function" || metadata.type === "conditional") {
let index = metadata.wait_for.indexOf(functionName)
if (index >= 0) if (index >= 0)
metadata.wait_for.splice(index, 1); metadata.wait_for.splice(index, 1);
} }
console.log(map, "after run");
orchestrator(res, payload, map, aliases, result) if (metadata.type === "conditional" && metadata.wait_for.length == 0) {
})
} let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result)
console.log(conditionResult, "aliases", aliases);
let branchToTake = metadata[conditionResult]
branchMap = map[branchToTake]
delete map[_key]
makeBranchRunnable(branchMap, aliases)
}
}
orchestrator(res, payload, (branchMap == null)? map: branchMap, aliases, result)
})
} }
// return resolve(result)
} }
}
// await fetch(constants.master_address) }
})
function makeBranchRunnable(branchMap, aliases) {
delete branchMap['type']
for (const [_key, metadata] of Object.entries(branchMap)) {
if (metadata.type === "function" || metadata.type === "conditional") {
let wait_for = []
for (const dependent of metadata.wait_for) {
if (aliases[dependent].status !== "done")
wait_for.push(dependent)
metadata.wait_for = wait_for
}
}
}
}
function checkCondition(op1, op2, op, result) {
op1 = op1.split(".")
let data = result[op1[0]][op1[1]]
return (operator[op](data, op2))? "success": "fail"
}
async function speculative_deployment(aliases, map) {
if (constants.speculative_deployment) {
console.log(aliases);
let getData = []
for (const [mod, metadata] of Object.entries(map)) {
if (constants.JIT_deployment) {
console.log(mod, metadata, aliases[mod].alias);
let url = metricsDB + aliases[mod].alias
console.log(url);
let data = libSupport.fetchData(url)
console.log(data);
getData.push(data)
} else {
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias })
}]
notify(payload)
}
}
if (constants.JIT_deployment) {
Promise.all(getData).then((values) => {
let dataMap = new Map()
for (const data of values) {
dataMap[data._id] = data
}
let done = new Map()
let toBeDone = new Set()
// var plannerMap = new Map(map)
do {
for (const [mod, metadata] of Object.entries(map)) {
if (metadata.wait_for.length == 0 && done[mod] === undefined) {
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart // expecting the first ones to run
// to be hit by coldstarts
// delete plannerMap[mod];
} else if (done[mod] === undefined) {
let flag = true
let maxWait = 0
for (const dependency of metadata.wait_for) {
console.log(dependency);
if (done[dependency] === undefined) {
flag = false
break
} else if (maxWait < done[dependency])
maxWait = done[dependency]
}
if (flag) {
console.log("notifying", mod);
let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ?
maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0
console.log(mod, "max wait", maxWait, "notify time:", notifyTime);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias })
}]
setTimeout(notify, notifyTime, payload)
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart
if (toBeDone.has(mod))
delete toBeDone[mod]
// delete plannerMap[mod]
} else {
toBeDone.add(mod)
}
}
console.log(done, toBeDone);
}
} while (toBeDone.size != 0)
})
}
}
} }
function readMap(filename) { function readMap(filename, alias = false) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
fs.readFile(filename, (err, data) => { fs.readFile(filename, (err, blob) => {
if (err) if (err)
reject(err) reject(err)
else { else {
const object = JSON.parse(data) const data = JSON.parse(blob)
resolve(object) if (alias) {
for (const [key, functionHash] of Object.entries(data)) {
data[key] = {
alias: functionHash,
status: "waiting"
}
// libSupport.fetchData(metricsDB + functionHash, null)
// .then(metrics => {
// data[key]
// })
}
}
resolve(data)
} }
}) })
}) })
} }
function notify(payload) {
libSupport.producer.send(payload, function () { })
}
function createDirectory(path) { function createDirectory(path) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!fs.existsSync(path)) { if (!fs.existsSync(path)) {
......
...@@ -12,18 +12,22 @@ const heap = require('heap'); ...@@ -12,18 +12,22 @@ const heap = require('heap');
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const swStats = require('swagger-stats'); const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json'); const apiSpec = require('./swagger.json');
const util = require('util')
/** /**
* URL to the couchdb database server used to store function metadata * URL to the couchdb database server used to store function metadata
*/ */
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/" metadataDB = metadataDB + "/" + constants.function_db_name + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
const logger = libSupport.logger const logger = libSupport.logger
let date = new Date(); let date = new Date();
let log_channel = constants.log_channel let log_channel = constants.topics.log_channel
let usedPort = new Map(), // TODO: remove after integration with RM let usedPort = new Map(), // TODO: remove after integration with RM
db = new Map(), // queue holding request to be dispatched db = new Map(), // queue holding request to be dispatched
...@@ -71,6 +75,11 @@ const WINDOW_SIZE = 10 ...@@ -71,6 +75,11 @@ const WINDOW_SIZE = 10
const port = constants.master_port const port = constants.master_port
const registry_url = constants.registry_url const registry_url = constants.registry_url
app.get('/metrics', (req, res) => {
res.set('Content-Type', libSupport.metrics.register.contentType);
res.end(libSupport.metrics.register.metrics());
});
/** /**
* REST API to receive deployment requests * REST API to receive deployment requests
*/ */
...@@ -256,68 +265,14 @@ function dispatch() { ...@@ -256,68 +265,14 @@ function dispatch() {
if (!db.has(functionHash + runtime)) { if (!db.has(functionHash + runtime)) {
db.set(functionHash + runtime, []) db.set(functionHash + runtime, [])
db.get(functionHash + runtime).push({ req, res }) db.get(functionHash + runtime).push({ req, res })
let resource_id = libSupport.makeid(constants.id_size) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
/**
* Request RM for resource
*/
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
}))
resourceMap.set(resource_id, { let payload = [{
runtime, functionHash, port: null, node_id: null, topic: constants.topics.hscale,
deployed: false, deploy_request_time: Date.now() messages: JSON.stringify({ runtime, functionHash })
})
let payloadToRM = [{
topic: constants.topics.request_dm_2_rm, // changing from REQUEST_DM_2_RM
messages: JSON.stringify({
resource_id,
"memory": 332,
timestamp: Date.now()
}),
partition: 0
}] }]
producer.send(payloadToRM, () => { producer.send(payload, function () { })
// db.set(functionHash + runtime, { req, res })
console.log("sent rm");
})
/**
* Speculative deployment:
* If function MLE path is present then deploy those parts of the path which are
* not already running
*/
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node })
}]
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
}
}
}
}
speculative_deployment(req, runtime)
} else { } else {
logger.info("deployment process already started waiting") logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res }) db.get(functionHash + runtime).push({ req, res })
...@@ -380,7 +335,7 @@ function postDeploy(message) { ...@@ -380,7 +335,7 @@ function postDeploy(message) {
"status": true, "status": true,
starttime: (Date.now() - resource.deploy_request_time) starttime: (Date.now() - resource.deploy_request_time)
}, message.resource_id, resourceMap) }, message.resource_id, resourceMap)
if (db.has(id)) { if (db.has(id)) {
let sendQueue = db.get(id) let sendQueue = db.get(id)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource)); logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
...@@ -395,8 +350,9 @@ function postDeploy(message) { ...@@ -395,8 +350,9 @@ function postDeploy(message) {
} }
libSupport.metrics.collectMetrics({type: "scale", value: libSupport.metrics.collectMetrics({type: "scale", value:
functionToResource.get(id).length, functionToResource.get(id).length,
functionHash: id}) functionHash: message.functionHash, runtime: message.runtime,
starttime: (Date.now() - resource.deploy_request_time)})
} catch (e) { } catch (e) {
logger.error(e.message) logger.error(e.message)
} }
...@@ -448,7 +404,7 @@ consumer.on('message', function (message) { ...@@ -448,7 +404,7 @@ consumer.on('message', function (message) {
heap.heapify(resourceArray, libSupport.compare) heap.heapify(resourceArray, libSupport.compare)
libSupport.metrics.collectMetrics({type: "scale", value: libSupport.metrics.collectMetrics({type: "scale", value:
resourceArray.length, resourceArray.length,
functionHash: id}) functionHash: message.functionHash, runtime: message.runtime})
libSupport.logBroadcast({ libSupport.logBroadcast({
entity_id: message.entity_id, entity_id: message.entity_id,
"reason": "terminate", "reason": "terminate",
...@@ -470,15 +426,17 @@ consumer.on('message', function (message) { ...@@ -470,15 +426,17 @@ consumer.on('message', function (message) {
let resource_id = libSupport.makeid(constants.id_size), // each function resource request is associated with an unique ID let resource_id = libSupport.makeid(constants.id_size), // each function resource request is associated with an unique ID
runtime = message.runtime, runtime = message.runtime,
functionHash = message.functionHash functionHash = message.functionHash
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
console.log("Resource Status: ", functionToResource); console.log("Resource Status: ", functionToResource);
/**
* Request RM for resource
*/
logger.info("Requesting RM " + JSON.stringify({ logger.info("Requesting RM " + JSON.stringify({
resource_id, resource_id,
"memory": 332, "memory": 332,
})) }))
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, { resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null, runtime, functionHash, port: null, node_id: null,
deployed: false, deploy_request_time: Date.now() deployed: false, deploy_request_time: Date.now()
...@@ -525,7 +483,7 @@ consumer.on('message', function (message) { ...@@ -525,7 +483,7 @@ consumer.on('message', function (message) {
}), }),
partition: 0 partition: 0
}] }]
logger.info(resourceMap); // logger.info(resourceMap);
producer.send(payload, () => { producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`) logger.info(`Resource Deployment request sent to Dispatch Agent`)
}) })
...@@ -553,18 +511,54 @@ function autoscalar() { ...@@ -553,18 +511,54 @@ function autoscalar() {
} }
function periodicMetricBroadcast() { /**
let message = {}, flag = false * Speculative deployment:
functionToResource.forEach((functionHeap, functionHash) => { * If function MLE path is present then deploy those parts of the path which are
if (functionHeap.length > 0) { * not already running
message[functionHash] = functionHeap.length *
libSupport.metrics.collectMetrics({type: "scale", value: functionHeap.length, functionHash: functionHash}) * FIXME: Currently supports homogenous runtime chain i.e takes runtime as a param.
* Change it to also profile runtime
*/
async function speculative_deployment(req, runtime) {
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
for (let node of branchInfo.mle_path)
node.id = node.node
let metrics = await libSupport.fetchData(metricsDB + "_bulk_get", {
method: 'post',
body: JSON.stringify({
docs: branchInfo.mle_path
}),
headers: { 'Content-Type': 'application/json' },
})
console.log(util.inspect(metrics, false, null, true /* enable colors */))
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node })
}]
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
}
}
} }
}) }
} }
setInterval(libSupport.metrics.broadcastMetrics, 5000)
setInterval(libSupport.viterbi, 1000, functionBranchTree) setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000); setInterval(autoscalar, 1000);
setInterval(dispatch, 1000); setInterval(dispatch, 1000);
// setInterval(periodicMetricBroadcast, 5000)
app.listen(port, () => logger.info(`Server listening on port ${port}!`)) app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
...@@ -4,10 +4,12 @@ const rp = require('request-promise'); ...@@ -4,10 +4,12 @@ const rp = require('request-promise');
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const winston = require('winston') const winston = require('winston')
const constants = require('.././constants.json') const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const metrics = require('./metrics') const metrics = require('./metrics')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap') const heap = require('heap')
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -16,6 +18,9 @@ let kafka = require('kafka-node'), ...@@ -16,6 +18,9 @@ let kafka = require('kafka-node'),
}), }),
producer = new Producer(client) producer = new Producer(client)
let implicitChainDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
implicitChainDB = implicitChainDB + "/" + constants.implicit_chain_db_names + "/"
/** /**
* Generates unique IDs of arbitrary length * Generates unique IDs of arbitrary length
* @param {Length of the ID} length * @param {Length of the ID} length
...@@ -53,6 +58,14 @@ function generateExecutor(functionPath, functionHash) { ...@@ -53,6 +58,14 @@ function generateExecutor(functionPath, functionHash) {
return hash return hash
} }
/**
* Reverse proxy to take user requests and forward them to appropriate workers using a loadbalacer
* @param {JSON} req the user request to be forwarded to the worker
* @param {JSON} res Object to use to return the response to the user
* @param {Map} functionToResource Function to resource Map
* @param {Map} resourceMap Map from resource ID to resource metadata
* @param {Map} functionBranchTree Holds the function path's and related probability distribution
*/
function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) { function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) {
branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree) branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree)
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
...@@ -62,6 +75,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT ...@@ -62,6 +75,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
* Bypass deployment pipeline if resource available * Bypass deployment pipeline if resource available
*/ */
let functionHeap = functionToResource.get(id) let functionHeap = functionToResource.get(id)
// loadbalancing by choosing worker with lowest load
let forwardTo = functionHeap[0] let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id) let resource = resourceMap.get(forwardTo.resource_id)
// logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` + // logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
...@@ -70,7 +84,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT ...@@ -70,7 +84,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
// logger.info("Request received at reverseproxy. Forwarding to: " + url); // logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1 forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// logger.info(functionHeap); // logger.info(functionHeap);
var options = { var options = {
...@@ -79,8 +93,6 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT ...@@ -79,8 +93,6 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
body: req.body, body: req.body,
json: true // Automatically stringifies the body to JSON json: true // Automatically stringifies the body to JSON
}; };
// console.log(options);
rp(options) rp(options)
...@@ -91,7 +103,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT ...@@ -91,7 +103,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
forwardTo.open_request_count -= 1 forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: id}) metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
resolve() resolve()
}) })
.catch(function (err) { .catch(function (err) {
...@@ -120,7 +132,6 @@ function getPort(usedPort) { ...@@ -120,7 +132,6 @@ function getPort(usedPort) {
return port return port
} }
const logger = winston.createLogger({ const logger = winston.createLogger({
level: 'info', level: 'info',
format: winston.format.combine( format: winston.format.combine(
...@@ -263,9 +274,17 @@ function viterbi(functionBranchTree) { ...@@ -263,9 +274,17 @@ function viterbi(functionBranchTree) {
path.push({node: maxSibling, probability: maxProb}) path.push({node: maxSibling, probability: maxProb})
siblings = new Map() siblings = new Map()
} }
if (path.length > 0) // if (path.length > 0)
console.log("path", path); // console.log("path", path);
metadata.mle_path = path metadata.mle_path = path
if (path.length > 1) {
let payload = {
method: 'put',
body: JSON.stringify(path),
headers: { 'Content-Type': 'application/json' }
}
fetch(implicitChainDB + functionHash, payload)
}
} }
}); });
} }
...@@ -283,7 +302,7 @@ function logBroadcast(message, resource_id, resourceMap) { ...@@ -283,7 +302,7 @@ function logBroadcast(message, resource_id, resourceMap) {
message.function_id = resource.functionHash message.function_id = resource.functionHash
} }
let log = [{ let log = [{
topic: constants.log_channel, topic: constants.topics.log_channel,
messages: JSON.stringify(message), messages: JSON.stringify(message),
partition: 0 partition: 0
}] }]
...@@ -298,10 +317,18 @@ function logBroadcast(message, resource_id, resourceMap) { ...@@ -298,10 +317,18 @@ function logBroadcast(message, resource_id, resourceMap) {
} }
setInterval(metrics.broadcastMetrics, 5000) async function fetchData(url, data = null) {
let res
if (data === undefined || data === null)
res = await fetch(url)
else
res = await fetch(url, data)
return await res.json()
}
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, makeid, generateExecutor, reverseProxy,
getPort, logger, compare, getPort, logger, compare,
viterbi, logBroadcast, metrics viterbi, logBroadcast, fetchData, metrics,
producer
} }
\ No newline at end of file
'use strict';
const constants = require('.././constants.json'); const constants = require('.././constants.json');
const secrets = require('./secrets.json')
const fetch = require('node-fetch');
const util = require('util')
const prom = require('prom-client');
const Registry = prom.Registry;
const register = new Registry();
let log_channel = constants.log_channel, const alpha = 0.99
let log_channel = constants.topics.log_channel,
metrics = { } metrics = { }
const intervalCollector = prom.collectDefaultMetrics({ prefix: 'xanadu', timeout: 5000, register });
const workerCountMetric = new prom.Gauge({ name: "worker_count", help: "worker count" });
const warmstartMetric = new prom.Histogram({ name: "warmstart", help: "warm start latency" });
const coldstartMetric = new prom.Histogram({ name: "coldstart", help: "cold start latency"});
const starttimeMetric = new prom.Histogram({ name: "starttime", help: "worker start times" });
const requestMetric = new prom.Summary({ name: "requests", help: "request RTT times",
percentiles: [0.01, 0.05, 0.5, 0.9, 0.95, 0.99, 0.999]
});
register.registerMetric(workerCountMetric);
register.registerMetric(warmstartMetric);
register.registerMetric(coldstartMetric);
register.registerMetric(starttimeMetric);
register.registerMetric(requestMetric);
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -11,65 +39,106 @@ let kafka = require('kafka-node'), ...@@ -11,65 +39,106 @@ let kafka = require('kafka-node'),
}), }),
producer = new Producer(client) producer = new Producer(client)
/**
* Function called to report metric data related to functions
* @param {JSON} metric
*/
function collectMetrics(metric) { function collectMetrics(metric) {
/**
* If metrics for a new function comes in,
* provision required structure for the function
*/
if (!(metric.functionHash in metrics)) { if (!(metric.functionHash in metrics)) {
metrics[metric.functionHash] = { metrics[metric.functionHash] = {}
}
if (!(metric.runtime in metrics[metric.functionHash])) {
metrics[metric.functionHash][metric.runtime] = {
shortterm: { shortterm: {
coldstart: 0, coldstart: 0,
coldstart_total_request: 0, coldstart_total_request: 0,
warm_total_request: 0, warm_total_request: 0,
scale_count: 0,
warmstart: 0, warmstart: 0,
worker_count: 0 worker_count: 0,
starttime: 0
} }
} }
} }
if (metric.type === 'coldstart') { if (metric.type === 'coldstart') {
metrics[metric.functionHash].shortterm.coldstart += metric.value metrics[metric.functionHash][metric.runtime].shortterm.coldstart += metric.value
metrics[metric.functionHash].shortterm.coldstart_total_request += 1 metrics[metric.functionHash][metric.runtime].shortterm.coldstart_total_request += 1
coldstartMetric.observe(metric.value)
requestMetric.observe(metric.value)
} else if (metric.type === 'warmstart') { } else if (metric.type === 'warmstart') {
metrics[metric.functionHash].shortterm.warmstart += metric.value metrics[metric.functionHash][metric.runtime].shortterm.warmstart += metric.value
metrics[metric.functionHash].shortterm.warm_total_request += 1 metrics[metric.functionHash][metric.runtime].shortterm.warm_total_request += 1
warmstartMetric.observe(metric.value)
requestMetric.observe(metric.value)
} else if (metric.type === 'scale') { } else if (metric.type === 'scale') {
metrics[metric.functionHash].worker_count = metric.value metrics[metric.functionHash][metric.runtime].shortterm.worker_count = metric.value
workerCountMetric.set(metric.value)
if (metric.starttime !== undefined) {
metrics[metric.functionHash][metric.runtime].shortterm.starttime += metric.starttime
metrics[metric.functionHash][metric.runtime].shortterm.scale_count += 1
starttimeMetric.observe(metric.starttime)
}
} }
} }
function broadcastMetrics() {
/**
* Run periodically to calculate average runtime metrics like coldstart and
* warmstart latencies.
* The module provides two granularities for metrics - shortterm and longterm
* shortterm - realtime data at a granularity of 5s (set in dispatch_manager/lib.js)
* shortterm data is calculated using Simple Moving Average (SMA)
* longterm - longterm data is held and averaged out over a period of time.
* longterm data is calculated using Expontential Moving Average (EMA)
*/
async function broadcastMetrics() {
if (Object.keys(metrics).length !== 0) { if (Object.keys(metrics).length !== 0) {
for (let [functionHash, metric] of Object.entries(metrics)) { for (let [functionHash, data] of Object.entries(metrics)) {
for (let [runtime, metricData] of Object.entries(data)) {
if (metric.longterm === undefined) { if (metricData.shortterm.coldstart != 0 || metricData.shortterm.longterm != 0) {
metric.longterm = { let { metric, dbData } = await fetchData(functionHash, metricData, runtime)
coldstart: 0, /**
coldstart_total_request: 0, * Shortterm moving average
warm_total_request: 0, */
warmstart: 0 metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0) ?
metric.shortterm.coldstart_total_request : 1
metric.shortterm.starttime /= (metric.shortterm.scale_count != 0) ?
metric.shortterm.scale_count : 1
metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0) ?
metric.shortterm.warm_total_request : 1
/**
* Longterm exponential moving average
*/
if (metric.shortterm.coldstart != 0)
metric.longterm.coldstart = (metric.longterm.coldstart != 0) ? metric.longterm.coldstart * alpha
+ metric.shortterm.coldstart * (1 - alpha) : metric.shortterm.coldstart
if (metric.shortterm.starttime && metric.shortterm.starttime != 0)
metric.longterm.starttime = (metric.longterm.starttime != 0) ? metric.longterm.starttime * alpha
+ metric.shortterm.starttime * (1 - alpha) : metric.shortterm.starttime
if (metric.shortterm.warmstart != 0)
metric.longterm.warmstart = (metric.longterm.warmstart != 0) ? metric.longterm.warmstart * alpha
+ metric.shortterm.warmstart * (1 - alpha) : metric.shortterm.warmstart
dbData[runtime] = {
coldstart: metric.longterm.coldstart,
warmstart: metric.longterm.warmstart,
starttime: metric.longterm.starttime
}
let payload = {
method: 'put',
body: JSON.stringify(dbData),
headers: { 'Content-Type': 'application/json' }
}
await fetch(metricsDB + functionHash, payload)
metric.timestamp = Date.now()
} }
} }
metric.longterm.coldstart = metric.longterm.coldstart
* metric.longterm.coldstart_total_request
+ metric.shortterm.coldstart
metric.longterm.coldstart_total_request += metric.shortterm.coldstart_total_request
metric.longterm.coldstart /= (metric.longterm.coldstart_total_request != 0)?
metric.longterm.coldstart_total_request: 1
metric.longterm.warmstart = metric.longterm.warmstart
* metric.longterm.warm_total_request
+ metric.shortterm.warmstart
metric.longterm.warm_total_request += metric.shortterm.warm_total_request
metric.longterm.warmstart /= (metric.longterm.warm_total_request != 0)?
metric.longterm.warm_total_request: 1
metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0)?
metric.shortterm.coldstart_total_request: 1
metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0)?
metric.shortterm.warm_total_request: 1
metric.timestamp = Date.now()
} }
let log = [{ let log = [{
...@@ -81,18 +150,50 @@ function broadcastMetrics() { ...@@ -81,18 +150,50 @@ function broadcastMetrics() {
}] }]
producer.send(log, () => { }) producer.send(log, () => { })
for (let [functionHash, metric] of Object.entries(metrics)) { for (let [functionHash, data] of Object.entries(metrics)) {
metric.shortterm = { for (let [runtime, metric] of Object.entries(data)) {
coldstart: 0, metric.shortterm = {
coldstart_total_request: 0, coldstart: 0,
warm_total_request: 0, coldstart_total_request: 0,
warmstart: 0, warm_total_request: 0,
worker_count: 0 warmstart: 0,
worker_count: 0,
starttime: 0,
scale_count: 0
}
} }
} }
} }
} }
/**
* Function to fetch the latest data from metric DB
* @param {String} functionHash
* @param {JSON} metric
*/
async function fetchData(functionHash, metric, runtime) {
let res = await fetch(metricsDB + functionHash)
let json = await res.json()
if (json.error === "not_found" || json[runtime] === undefined) {
metric.longterm = {
coldstart: 0,
warmstart: 0,
starttime: 0
}
} else {
metric.longterm = {
coldstart: json[runtime].coldstart,
warmstart: json[runtime].warmstart,
starttime: (json[runtime].starttime) ? json[runtime].starttime: 0
}
}
return {
metric,
dbData: (json.error === "not_found")? {}: json
}
}
module.exports = { module.exports = {
collectMetrics, broadcastMetrics collectMetrics, broadcastMetrics, register
} }
\ No newline at end of file
const op = {
'lt': function (x, y) { return x < y },
'gt': function (x, y) { return x > y },
'lte': function (x, y) { return x <= y },
'gte': function (x, y) { return x >= y },
'eq': function (x, y) { return x === y },
'neq': function (x, y) { return x !== y },
};
module.exports = op
\ No newline at end of file
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"node-fetch": "^2.6.0", "node-fetch": "^2.6.0",
"prom-client": "^12.0.0",
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.88.0", "request": "^2.88.0",
"request-promise": "^4.2.5", "request-promise": "^4.2.5",
......
...@@ -5,7 +5,8 @@ let request = require('request') ...@@ -5,7 +5,8 @@ let request = require('request')
const process = require('process') const process = require('process')
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 30 let port = 5000, resource_id, functionHash, runtime, idleTime = 60, flagFirstRequest = true
let waitTime
resource_id = process.argv[2] resource_id = process.argv[2]
functionHash = process.argv[3] functionHash = process.argv[3]
...@@ -28,6 +29,10 @@ app.use(bodyParser.json()) ...@@ -28,6 +29,10 @@ app.use(bodyParser.json())
let lastRequest = Date.now(), totalRequest = 0 let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => { app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body let payload = req.body
lastRequest = Date.now() lastRequest = Date.now()
totalRequest++ totalRequest++
...@@ -59,6 +64,7 @@ app.listen(port, () => { ...@@ -59,6 +64,7 @@ app.listen(port, () => {
runtime, resource_id, entity_id: process.pid}), runtime, resource_id, entity_id: process.pid}),
"status": true "status": true
}], () => { }) }], () => { })
waitTime = Date.now()
}) })
function shouldDie() { function shouldDie() {
...@@ -67,7 +73,7 @@ function shouldDie() { ...@@ -67,7 +73,7 @@ function shouldDie() {
let message = JSON.stringify({ let message = JSON.stringify({
functionHash, portExternal: port, functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid, runtime, resource_id, entity_id: process.pid,
total_request: totalRequest total_request: totalRequest, wait_time: waitTime
}) })
console.log("Idle for too long. Exiting"); console.log("Idle for too long. Exiting");
......
const constants = require('./constants.json')
const util = require('util')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client),
Consumer = kafka.Consumer,
consumer = new Consumer(client,
[
{ topic: constants.topics.log_channel }
])
consumer.on('message', function (message) {
message = JSON.parse(message.value)
console.log(util.inspect(message, false, null, true /* enable colors */))
})
\ No newline at end of file
# my global config
global:
scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
# Attach these labels to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager).
external_labels:
monitor: 'codelab-monitor'
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first.rules"
# - "second.rules"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['localhost:9090']
- job_name: 'docker'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['localhost:9323']
- job_name: 'xanadu'
static_configs:
- targets: ['localhost:8080']
...@@ -5,6 +5,7 @@ networks: ...@@ -5,6 +5,7 @@ networks:
services: services:
zookeeper: zookeeper:
image: 'bitnami/zookeeper:3' image: 'bitnami/zookeeper:3'
restart: unless-stopped
networks: networks:
- kafka-serverless - kafka-serverless
ports: ports:
...@@ -15,6 +16,7 @@ services: ...@@ -15,6 +16,7 @@ services:
- ALLOW_ANONYMOUS_LOGIN=yes - ALLOW_ANONYMOUS_LOGIN=yes
kafka: kafka:
image: 'bitnami/kafka:2' image: 'bitnami/kafka:2'
restart: unless-stopped
networks: networks:
- kafka-serverless - kafka-serverless
ports: ports:
......
<mxfile host="Electron" modified="2020-03-09T11:38:25.308Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/12.6.5 Chrome/80.0.3987.86 Electron/8.0.0 Safari/537.36" etag="MPKhp0QP3HoRdc4h3Pxj" version="12.6.5" type="device"><diagram id="_vj1HQFHM5RY5pbnoe9b" name="Page-1">5Zlbb5swGIZ/DZebwJzSy+bQbFNTVY20db2JPHDBHcHIOAn018+ACcdkzZoFRG8if58P2M/38kISSZ2sozmFgbsgNvIkINuRpE4lAEbGiH8miThL6IqaJRyK7SylFIklfkUiKYvsBtsorAxkhHgMB9WkRXwfWaySg5SSXXXYM/GqVw2ggxqJpQW9ZvYHtpkrjgXMIv8FYcfNr6wYV1nPGuaDxUlCF9pkV0qpM0mdUEJY1lpHE+Ql7HIu2bybA737jVHks7dMWDy8zFbfngC5vlvZ7sPcBNf4kyH2xuL8wJRsfBslc2RJHRPKXOIQH3q3hAQ8qfDkC2IsFqWCG0Z4ymVrT/SiCLPHUvunWCppT6NyEOeBz2j8WA7SOZ/1PCympVE+L2SU/N6XhQMdN5kITCHZUAsdAZFrC1IHsSPjhHqRXZGNID5HZI34DvkAijzI8LaqIijE6OzH7afeE8y3DGRx3wBNqEbcNkquonyJbKNiVlF13ihto0ilWjhBF+ZldKEMRhfg3LqoVPTU8ol1t9DbiCvdKI2K8nW4i/JgvHMxQ8sApiR23MerlYNhkFnrM44SBZwCeIsoQ9FRJKJXryl+JOJdyXZFyi057kh+P8On8I5tHKB/XdxOv9s0Nlexld/pZWC8uEsR+sRPyL3/rjizxmsC/DfRgzeKXruQxlvro3VaHwP8rUJpdI8o5sdFtE9l07u0JtC0JtB3awJG1ZtA197UYuYl7RcynxXZgVqV2jPNt5arKfCPXYxLPTeObbJsQC2P+n4b0AX953ABqwz1vjPUOzTxw5ZUhdjyTtNviF0/CJsMTX6pphb58ViNl4cdn7ctDiN5PRsnELAFvWvRsca2nT04UYhf4a90qcRCg+RLd3oQfSzp02Qtbs9h5tTKnnMDagv6g5wVtfZtSGuC1lpA138mOBtoY6CgNaUGWu4YtPlBQKugY9CjBmh1EKDr1tHm0RcFfTVQ0JpZA921R+d32PBI1946FP2/keZh8ZdM9mt68b+WOvsD</diagram></mxfile>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment