Commit f11341bb authored by Nilanjan Daw's avatar Nilanjan Daw

Added better metrics

Metrics module added for periodic metrics push to log channel
parent ce252d85
{"id":"192.168.31.51","master_node":"10.129.6.5"} {"id":"10.196.6.51","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
...@@ -31,6 +31,7 @@ let usedPort = new Map(), // TODO: remove after integration with RM ...@@ -31,6 +31,7 @@ let usedPort = new Map(), // TODO: remove after integration with RM
workerNodes = new Map(), // list of worker nodes currently known to the DM workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions functionBranchTree = new Map() // a tree to store function branch predictions
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -210,8 +211,10 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -210,8 +211,10 @@ app.post('/serverless/execute/:id', (req, res) => {
let id = req.params.id + runtime let id = req.params.id + runtime
res.timestamp = Date.now() res.timestamp = Date.now()
if (functionToResource.has(id)) { if (functionToResource.has(id)) {
res.start = 'warmstart'
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
} else { } else {
res.start = 'coldstart'
/** /**
* Requests are queued up before being dispatched. To prevent requests coming in for the * Requests are queued up before being dispatched. To prevent requests coming in for the
* same function from starting too many workers, they are grouped together * same function from starting too many workers, they are grouped together
...@@ -366,7 +369,7 @@ function postDeploy(message) { ...@@ -366,7 +369,7 @@ function postDeploy(message) {
entity_id: message.entity_id, entity_id: message.entity_id,
"reason": "deployment", "reason": "deployment",
"status": true, "status": true,
coldstart_time: (Date.now() - resource.deploy_request_time) starttime: (Date.now() - resource.deploy_request_time)
}, message.resource_id, resourceMap) }, message.resource_id, resourceMap)
if (db.has(message.functionHash + message.runtime)) { if (db.has(message.functionHash + message.runtime)) {
...@@ -533,15 +536,12 @@ function autoscalar() { ...@@ -533,15 +536,12 @@ function autoscalar() {
function periodicMetricBroadcast() { function periodicMetricBroadcast() {
let message = {}, flag = false let message = {}, flag = false
message.reason = "resource_per_function"
functionToResource.forEach((functionHeap, functionHash) => { functionToResource.forEach((functionHeap, functionHash) => {
if (functionHeap.length > 0) { if (functionHeap.length > 0) {
message[functionHash] = functionHeap.length message[functionHash] = functionHeap.length
flag = true libSupport.metrics.collectMetrics({type: "scale", value: functionHeap.length, functionHash: functionHash})
} }
}) })
if (flag)
libSupport.logBroadcast(message)
} }
setInterval(libSupport.viterbi, 1000, functionBranchTree) setInterval(libSupport.viterbi, 1000, functionBranchTree)
......
...@@ -4,7 +4,7 @@ const rp = require('request-promise'); ...@@ -4,7 +4,7 @@ const rp = require('request-promise');
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const winston = require('winston') const winston = require('winston')
const constants = require('.././constants.json') const constants = require('.././constants.json')
const metrics = require('./metrics')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap') const heap = require('heap')
...@@ -86,17 +86,18 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT ...@@ -86,17 +86,18 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
rp(options) rp(options)
.then(function (parsedBody) { .then(function (parsedBody) {
let serviceTime = Date.now() - res.timestamp let serviceTime = Date.now() - res.timestamp
console.log(serviceTime, res.timestamp);
res.json(parsedBody) res.json(parsedBody)
forwardTo.open_request_count -= 1 forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: id})
resolve() resolve()
}) })
.catch(function (err) { .catch(function (err) {
forwardTo.open_request_count -= 1 forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
logger.error("error" + err.error.errno); logger.error("error" + err);
res.json(err.message).status(err.statusCode) res.json(err.message).status(err.statusCode)
resolve() resolve()
}); });
...@@ -283,9 +284,7 @@ function logBroadcast(message, resource_id, resourceMap) { ...@@ -283,9 +284,7 @@ function logBroadcast(message, resource_id, resourceMap) {
} }
let log = [{ let log = [{
topic: constants.log_channel, topic: constants.log_channel,
messages: JSON.stringify({ messages: JSON.stringify(message),
message
}),
partition: 0 partition: 0
}] }]
producer.send(log, () => { producer.send(log, () => {
...@@ -299,8 +298,10 @@ function logBroadcast(message, resource_id, resourceMap) { ...@@ -299,8 +298,10 @@ function logBroadcast(message, resource_id, resourceMap) {
} }
setInterval(metrics.broadcastMetrics, 5000)
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, makeid, generateExecutor, reverseProxy,
getPort, logger, compare, getPort, logger, compare,
viterbi, logBroadcast viterbi, logBroadcast, metrics
} }
\ No newline at end of file
const constants = require('.././constants.json');
let log_channel = constants.log_channel,
metrics = { longterm: {}, shortterm: {} }
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
autoConnect: true
}),
producer = new Producer(client)
function collectMetrics(metric) {
if (!(metric.functionHash in metrics.shortterm)) {
metrics.shortterm[metric.functionHash] = {
coldstart: 0,
coldstart_total_request: 0,
warm_total_request: 0,
warmstart: 0,
worker_count: 0
}
}
if (metric.type === 'coldstart') {
metrics.shortterm[metric.functionHash].coldstart += metric.value
metrics.shortterm[metric.functionHash].coldstart_total_request += 1
} else if (metric.type === 'warmstart') {
metrics.shortterm[metric.functionHash].warmstart += metric.value
metrics.shortterm[metric.functionHash].warm_total_request += 1
} else if (metric.type === 'scale') {
metrics.shortterm[metric.functionHash].worker_count = metric.value
}
// console.log(metrics);
}
/**
* FIXME: Some error causing longterm metrics to be flushed.
*/
function broadcastMetrics() {
if (Object.keys(metrics.shortterm).length !== 0) {
for (let [functionHash, metric] of Object.entries(metrics.shortterm)) {
if (metrics.longterm[functionHash] === undefined) {
metrics.longterm[functionHash] = {
coldstart: 0,
coldstart_total_request: 0,
warm_total_request: 0,
warmstart: 0,
}
}
metrics.longterm[functionHash].coldstart = metrics.longterm[functionHash].coldstart
* metrics.longterm[functionHash].coldstart_total_request
+ metric.coldstart
metrics.longterm[functionHash].coldstart_total_request += metric.coldstart_total_request
metrics.longterm[functionHash].coldstart /= (metrics.longterm[functionHash].coldstart_total_request != 0)?
metrics.longterm[functionHash].coldstart_total_request: 1
metrics.longterm[functionHash].warmstart = metrics.longterm[functionHash].warmstart
* metrics.longterm[functionHash].warm_total_request
+ metric.warmstart
metrics.longterm[functionHash].warm_total_request += metric.warm_total_request
metrics.longterm[functionHash].warmstart /= (metrics.longterm[functionHash].warm_total_request != 0)?
metrics.longterm[functionHash].warm_total_request: 1
metric.coldstart /= (metric.coldstart_total_request != 0)? metric.coldstart_total_request: 1
metric.warmstart /= (metric.warm_total_request != 0)? metric.warm_total_request: 1
}
metrics.timestamp = Date.now()
console.log(metrics);
let log = [{
topic: log_channel,
messages: JSON.stringify({
metrics
}),
partition: 0
}]
producer.send(log, () => { })
metrics.shortterm = {}
}
}
module.exports = {
collectMetrics, broadcastMetrics
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment