Commit 7f78ed74 authored by Nilanjan Daw's avatar Nilanjan Daw

Restructed metrics

Better support for different runtimes added
parent 30066437
...@@ -10,6 +10,10 @@ const secrets = require('./secrets.json') ...@@ -10,6 +10,10 @@ const secrets = require('./secrets.json')
const operator = require('./operator') const operator = require('./operator')
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.function_db_name + "/" metadataDB = metadataDB + "/" + constants.function_db_name + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const logger = libSupport.logger const logger = libSupport.logger
const registry_url = constants.registry_url const registry_url = constants.registry_url
...@@ -196,7 +200,7 @@ router.post('/execute/:id', (req, res) => { ...@@ -196,7 +200,7 @@ router.post('/execute/:id', (req, res) => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
console.log(payload); console.log(payload);
// getTimeLines(map); // getTimeLines(aliases, map);
orchestrator(res, payload, map, aliases, {}) orchestrator(res, payload, map, aliases, {})
}) })
} else { } else {
...@@ -207,7 +211,7 @@ router.post('/execute/:id', (req, res) => { ...@@ -207,7 +211,7 @@ router.post('/execute/:id', (req, res) => {
.then(data => { .then(data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
// getTimeLines(map); // getTimeLines(aliases, map);
orchestrator(res, payload, map, aliases, {}) orchestrator(res, payload, map, aliases, {})
}) })
}) })
...@@ -278,11 +282,13 @@ async function orchestrator(res, payload, map, aliases, result) { ...@@ -278,11 +282,13 @@ async function orchestrator(res, payload, map, aliases, result) {
function makeBranchRunnable(branchMap, aliases) { function makeBranchRunnable(branchMap, aliases) {
delete branchMap['type'] delete branchMap['type']
for (const [_key, metadata] of Object.entries(branchMap)) { for (const [_key, metadata] of Object.entries(branchMap)) {
let wait_for = [] if (metadata.type === "function" || metadata.type === "conditional") {
for (const dependent of metadata.wait_for) { let wait_for = []
if (aliases[dependent].status !== "done") for (const dependent of metadata.wait_for) {
wait_for.push(dependent) if (aliases[dependent].status !== "done")
metadata.wait_for = wait_for wait_for.push(dependent)
metadata.wait_for = wait_for
}
} }
} }
} }
...@@ -293,12 +299,20 @@ function checkCondition(op1, op2, op, result) { ...@@ -293,12 +299,20 @@ function checkCondition(op1, op2, op, result) {
return (operator[op](data, op2))? "success": "fail" return (operator[op](data, op2))? "success": "fail"
} }
function getTimeLines(map) { function getTimeLines(aliases) {
console.log(map); console.log(aliases);
let getData = []
for (const [functionName, metadata] of Object.entries(map)) { for (const [functionName, metadata] of Object.entries(aliases)) {
let url = metricsDB + `_design/designdoc/_view/testview?startkey=[${metadata.alias}]&endkey=[{}]`
console.log(url);
getData.push(libSupport.fetchData(url))
} }
Promise.all(getData).then((values) => {
console.log(values);
})
} }
function readMap(filename, alias = false) { function readMap(filename, alias = false) {
......
...@@ -396,7 +396,7 @@ function postDeploy(message) { ...@@ -396,7 +396,7 @@ function postDeploy(message) {
libSupport.metrics.collectMetrics({type: "scale", value: libSupport.metrics.collectMetrics({type: "scale", value:
functionToResource.get(id).length, functionToResource.get(id).length,
functionHash: id}) functionHash: message.functionHash, runtime: message.runtime})
} catch (e) { } catch (e) {
logger.error(e.message) logger.error(e.message)
} }
...@@ -448,7 +448,7 @@ consumer.on('message', function (message) { ...@@ -448,7 +448,7 @@ consumer.on('message', function (message) {
heap.heapify(resourceArray, libSupport.compare) heap.heapify(resourceArray, libSupport.compare)
libSupport.metrics.collectMetrics({type: "scale", value: libSupport.metrics.collectMetrics({type: "scale", value:
resourceArray.length, resourceArray.length,
functionHash: id}) functionHash: message.functionHash, runtime: message.runtime})
libSupport.logBroadcast({ libSupport.logBroadcast({
entity_id: message.entity_id, entity_id: message.entity_id,
"reason": "terminate", "reason": "terminate",
......
...@@ -98,7 +98,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT ...@@ -98,7 +98,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
forwardTo.open_request_count -= 1 forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: id}) metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
resolve() resolve()
}) })
.catch(function (err) { .catch(function (err) {
......
'use strict';
const constants = require('.././constants.json'); const constants = require('.././constants.json');
const secrets = require('./secrets.json') const secrets = require('./secrets.json')
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const util = require('util')
const alpha = 0.99 const alpha = 0.99
let log_channel = constants.topics.log_channel, let log_channel = constants.topics.log_channel,
metrics = { } metrics = { }
...@@ -26,7 +29,10 @@ function collectMetrics(metric) { ...@@ -26,7 +29,10 @@ function collectMetrics(metric) {
* provision required structure for the function * provision required structure for the function
*/ */
if (!(metric.functionHash in metrics)) { if (!(metric.functionHash in metrics)) {
metrics[metric.functionHash] = { metrics[metric.functionHash] = {}
}
if (!(metric.runtime in metrics[metric.functionHash])) {
metrics[metric.functionHash][metric.runtime] = {
shortterm: { shortterm: {
coldstart: 0, coldstart: 0,
coldstart_total_request: 0, coldstart_total_request: 0,
...@@ -37,15 +43,16 @@ function collectMetrics(metric) { ...@@ -37,15 +43,16 @@ function collectMetrics(metric) {
} }
} }
if (metric.type === 'coldstart') { if (metric.type === 'coldstart') {
metrics[metric.functionHash].shortterm.coldstart += metric.value metrics[metric.functionHash][metric.runtime].shortterm.coldstart += metric.value
metrics[metric.functionHash].shortterm.coldstart_total_request += 1 metrics[metric.functionHash][metric.runtime].shortterm.coldstart_total_request += 1
} else if (metric.type === 'warmstart') { } else if (metric.type === 'warmstart') {
metrics[metric.functionHash].shortterm.warmstart += metric.value metrics[metric.functionHash][metric.runtime].shortterm.warmstart += metric.value
metrics[metric.functionHash].shortterm.warm_total_request += 1 metrics[metric.functionHash][metric.runtime].shortterm.warm_total_request += 1
} else if (metric.type === 'scale') { } else if (metric.type === 'scale') {
metrics[metric.functionHash].worker_count = metric.value metrics[metric.functionHash][metric.runtime].shortterm.worker_count = metric.value
} }
} }
...@@ -60,38 +67,40 @@ function collectMetrics(metric) { ...@@ -60,38 +67,40 @@ function collectMetrics(metric) {
*/ */
async function broadcastMetrics() { async function broadcastMetrics() {
if (Object.keys(metrics).length !== 0) { if (Object.keys(metrics).length !== 0) {
for (let [functionHash, metricData] of Object.entries(metrics)) { for (let [functionHash, data] of Object.entries(metrics)) {
if (metricData.shortterm.coldstart != 0 || metricData.shortterm.longterm != 0) { for (let [runtime, metricData] of Object.entries(data)) {
let { metric, revision } = await fetchData(functionHash, metricData) if (metricData.shortterm.coldstart != 0 || metricData.shortterm.longterm != 0) {
let { metric, dbData } = await fetchData(functionHash, metricData, runtime)
/**
* Shortterm moving average /**
*/ * Shortterm moving average
metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0) ? */
metric.shortterm.coldstart_total_request : 1 metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0) ?
metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0) ? metric.shortterm.coldstart_total_request : 1
metric.shortterm.warm_total_request : 1 metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0) ?
/** metric.shortterm.warm_total_request : 1
* Longterm exponential moving average /**
*/ * Longterm exponential moving average
if (metric.shortterm.coldstart != 0) */
metric.longterm.coldstart = (metric.longterm.coldstart != 0) ? metric.longterm.coldstart * alpha if (metric.shortterm.coldstart != 0)
+ metric.shortterm.coldstart * (1 - alpha) : metric.shortterm.coldstart metric.longterm.coldstart = (metric.longterm.coldstart != 0) ? metric.longterm.coldstart * alpha
if (metric.shortterm.warmstart != 0) + metric.shortterm.coldstart * (1 - alpha) : metric.shortterm.coldstart
metric.longterm.warmstart = (metric.longterm.warmstart != 0) ? metric.longterm.warmstart * alpha if (metric.shortterm.warmstart != 0)
+ metric.shortterm.warmstart * (1 - alpha) : metric.shortterm.warmstart metric.longterm.warmstart = (metric.longterm.warmstart != 0) ? metric.longterm.warmstart * alpha
+ metric.shortterm.warmstart * (1 - alpha) : metric.shortterm.warmstart
let payload = { dbData[runtime] = {
method: 'put',
body: JSON.stringify({
coldstart: metric.longterm.coldstart, coldstart: metric.longterm.coldstart,
warmstart: metric.longterm.warmstart, warmstart: metric.longterm.warmstart,
_rev: revision }
}), let payload = {
headers: { 'Content-Type': 'application/json' } method: 'put',
body: JSON.stringify(dbData),
headers: { 'Content-Type': 'application/json' }
}
await fetch(metricsDB + functionHash, payload)
metric.timestamp = Date.now()
} }
await fetch(metricsDB + functionHash, payload)
metric.timestamp = Date.now()
} }
} }
...@@ -104,13 +113,15 @@ async function broadcastMetrics() { ...@@ -104,13 +113,15 @@ async function broadcastMetrics() {
}] }]
producer.send(log, () => { }) producer.send(log, () => { })
for (let [functionHash, metric] of Object.entries(metrics)) { for (let [functionHash, data] of Object.entries(metrics)) {
metric.shortterm = { for (let [runtime, metric] of Object.entries(data)) {
coldstart: 0, metric.shortterm = {
coldstart_total_request: 0, coldstart: 0,
warm_total_request: 0, coldstart_total_request: 0,
warmstart: 0, warm_total_request: 0,
worker_count: 0 warmstart: 0,
worker_count: 0
}
} }
} }
} }
...@@ -121,28 +132,24 @@ async function broadcastMetrics() { ...@@ -121,28 +132,24 @@ async function broadcastMetrics() {
* @param {String} functionHash * @param {String} functionHash
* @param {JSON} metric * @param {JSON} metric
*/ */
async function fetchData(functionHash, metric) { async function fetchData(functionHash, metric, runtime) {
let revision
let res = await fetch(metricsDB + functionHash) let res = await fetch(metricsDB + functionHash)
let json = await res.json() let json = await res.json()
if (json.error === "not_found") { if (json.error === "not_found" || json[runtime] === undefined) {
metric.longterm = { metric.longterm = {
coldstart: 0, coldstart: 0,
warmstart: 0 warmstart: 0
} }
} else { } else {
metric.longterm = { metric.longterm = {
coldstart: json.coldstart, coldstart: json[runtime].coldstart,
warmstart: json.warmstart, warmstart: json[runtime].warmstart,
} }
revision = json._rev
} }
return { return {
metric, metric,
revision dbData: (json.error === "not_found")? {}: json
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment