Commit 088d0e1a authored by Nilanjan Daw's avatar Nilanjan Daw

changed metrics format for better readability

parent f11341bb
...@@ -324,26 +324,26 @@ function dispatch() { ...@@ -324,26 +324,26 @@ function dispatch() {
*/ */
function postDeploy(message) { function postDeploy(message) {
logger.info("Deployed Resource: " + JSON.stringify(message)); logger.info("Deployed Resource: " + JSON.stringify(message));
let id = message.functionHash + message.runtime
if (message.status == false) { if (message.status == false) {
let sendQueue = db.get(message.functionHash + message.runtime) let sendQueue = db.get(id)
// TODO: handle failure // TODO: handle failure
while (sendQueue && sendQueue.length != 0) { while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift() let { req, res } = sendQueue.shift()
res.status(400).json({ reason: message.reason }) res.status(400).json({ reason: message.reason })
} }
db.delete(message.functionHash + message.runtime) db.delete(id)
return; return;
} }
if (functionToResource.has(message.functionHash + message.runtime)) { if (functionToResource.has(id)) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime) let resourceHeap = functionToResource.get(id)
heap.push(resourceHeap, { heap.push(resourceHeap, {
resource_id: message.resource_id, resource_id: message.resource_id,
open_request_count: 0 open_request_count: 0
}, libSupport.compare) }, libSupport.compare)
logger.warn("Horizontally scaling up: " + logger.warn("Horizontally scaling up: " +
JSON.stringify(functionToResource.get(message.functionHash + message.runtime))); JSON.stringify(functionToResource.get(id)));
} else { } else {
/** /**
...@@ -356,9 +356,9 @@ function postDeploy(message) { ...@@ -356,9 +356,9 @@ function postDeploy(message) {
resource_id: message.resource_id, resource_id: message.resource_id,
open_request_count: 0 open_request_count: 0
}, libSupport.compare) }, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap) functionToResource.set(id, resourceHeap)
logger.warn("Creating new resource pool" logger.warn("Creating new resource pool"
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime))); + JSON.stringify(functionToResource.get(id)));
} }
...@@ -372,8 +372,8 @@ function postDeploy(message) { ...@@ -372,8 +372,8 @@ function postDeploy(message) {
starttime: (Date.now() - resource.deploy_request_time) starttime: (Date.now() - resource.deploy_request_time)
}, message.resource_id, resourceMap) }, message.resource_id, resourceMap)
if (db.has(message.functionHash + message.runtime)) { if (db.has(id)) {
let sendQueue = db.get(message.functionHash + message.runtime) let sendQueue = db.get(id)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource)); logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) { while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift() let { req, res } = sendQueue.shift()
...@@ -382,8 +382,12 @@ function postDeploy(message) { ...@@ -382,8 +382,12 @@ function postDeploy(message) {
}) })
} }
db.delete(message.functionHash + message.runtime) db.delete(id)
} }
libSupport.metrics.collectMetrics({type: "scale", value:
functionToResource.get(id).length,
functionHash: id})
} catch (e) { } catch (e) {
logger.error(e.message) logger.error(e.message)
} }
...@@ -423,8 +427,9 @@ consumer.on('message', function (message) { ...@@ -423,8 +427,9 @@ consumer.on('message', function (message) {
// process.exit(0) // process.exit(0)
} }
usedPort.delete(message.port) usedPort.delete(message.port)
if (functionToResource.has(message.functionHash + message.runtime)) { let id = message.functionHash + message.runtime
let resourceArray = functionToResource.get(message.functionHash + message.runtime) if (functionToResource.has(id)) {
let resourceArray = functionToResource.get(id)
for (let i = 0; i < resourceArray.length; i++) for (let i = 0; i < resourceArray.length; i++)
if (resourceArray[i].resource_id === message.resource_id) { if (resourceArray[i].resource_id === message.resource_id) {
resourceArray.splice(i, 1); resourceArray.splice(i, 1);
...@@ -432,6 +437,9 @@ consumer.on('message', function (message) { ...@@ -432,6 +437,9 @@ consumer.on('message', function (message) {
} }
heap.heapify(resourceArray, libSupport.compare) heap.heapify(resourceArray, libSupport.compare)
libSupport.metrics.collectMetrics({type: "scale", value:
resourceArray.length,
functionHash: id})
libSupport.logBroadcast({ libSupport.logBroadcast({
entity_id: message.entity_id, entity_id: message.entity_id,
"reason": "terminate", "reason": "terminate",
...@@ -441,7 +449,7 @@ consumer.on('message', function (message) { ...@@ -441,7 +449,7 @@ consumer.on('message', function (message) {
.then(() => { .then(() => {
resourceMap.delete(message.resource_id) resourceMap.delete(message.resource_id)
if (resourceArray.length == 0) if (resourceArray.length == 0)
functionToResource.delete(message.functionHash + message.runtime) functionToResource.delete(id)
}) })
} }
...@@ -547,5 +555,5 @@ function periodicMetricBroadcast() { ...@@ -547,5 +555,5 @@ function periodicMetricBroadcast() {
setInterval(libSupport.viterbi, 1000, functionBranchTree) setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000); setInterval(autoscalar, 1000);
setInterval(dispatch, 1000); setInterval(dispatch, 1000);
setInterval(periodicMetricBroadcast, 5000) // setInterval(periodicMetricBroadcast, 5000)
app.listen(port, () => logger.info(`Server listening on port ${port}!`)) app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
const constants = require('.././constants.json'); const constants = require('.././constants.json');
let log_channel = constants.log_channel, let log_channel = constants.log_channel,
metrics = { longterm: {}, shortterm: {} } metrics = { }
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -13,66 +13,64 @@ let kafka = require('kafka-node'), ...@@ -13,66 +13,64 @@ let kafka = require('kafka-node'),
function collectMetrics(metric) { function collectMetrics(metric) {
if (!(metric.functionHash in metrics.shortterm)) { if (!(metric.functionHash in metrics)) {
metrics[metric.functionHash] = {
metrics.shortterm[metric.functionHash] = { shortterm: {
coldstart: 0, coldstart: 0,
coldstart_total_request: 0, coldstart_total_request: 0,
warm_total_request: 0, warm_total_request: 0,
warmstart: 0, warmstart: 0,
worker_count: 0 worker_count: 0
}
} }
} }
if (metric.type === 'coldstart') { if (metric.type === 'coldstart') {
metrics.shortterm[metric.functionHash].coldstart += metric.value metrics[metric.functionHash].shortterm.coldstart += metric.value
metrics.shortterm[metric.functionHash].coldstart_total_request += 1 metrics[metric.functionHash].shortterm.coldstart_total_request += 1
} else if (metric.type === 'warmstart') { } else if (metric.type === 'warmstart') {
metrics.shortterm[metric.functionHash].warmstart += metric.value metrics[metric.functionHash].shortterm.warmstart += metric.value
metrics.shortterm[metric.functionHash].warm_total_request += 1 metrics[metric.functionHash].shortterm.warm_total_request += 1
} else if (metric.type === 'scale') { } else if (metric.type === 'scale') {
metrics.shortterm[metric.functionHash].worker_count = metric.value metrics[metric.functionHash].worker_count = metric.value
} }
// console.log(metrics);
} }
/**
* FIXME: Some error causing longterm metrics to be flushed.
*/
function broadcastMetrics() { function broadcastMetrics() {
if (Object.keys(metrics.shortterm).length !== 0) { if (Object.keys(metrics).length !== 0) {
for (let [functionHash, metric] of Object.entries(metrics.shortterm)) { for (let [functionHash, metric] of Object.entries(metrics)) {
if (metrics.longterm[functionHash] === undefined) { if (metric.longterm === undefined) {
metric.longterm = {
metrics.longterm[functionHash] = { coldstart: 0,
coldstart: 0, coldstart_total_request: 0,
coldstart_total_request: 0, warm_total_request: 0,
warm_total_request: 0, warmstart: 0
warmstart: 0,
} }
} }
metrics.longterm[functionHash].coldstart = metrics.longterm[functionHash].coldstart metric.longterm.coldstart = metric.longterm.coldstart
* metrics.longterm[functionHash].coldstart_total_request * metric.longterm.coldstart_total_request
+ metric.coldstart + metric.shortterm.coldstart
metrics.longterm[functionHash].coldstart_total_request += metric.coldstart_total_request metric.longterm.coldstart_total_request += metric.shortterm.coldstart_total_request
metrics.longterm[functionHash].coldstart /= (metrics.longterm[functionHash].coldstart_total_request != 0)? metric.longterm.coldstart /= (metric.longterm.coldstart_total_request != 0)?
metrics.longterm[functionHash].coldstart_total_request: 1 metric.longterm.coldstart_total_request: 1
metrics.longterm[functionHash].warmstart = metrics.longterm[functionHash].warmstart metric.longterm.warmstart = metric.longterm.warmstart
* metrics.longterm[functionHash].warm_total_request * metric.longterm.warm_total_request
+ metric.warmstart + metric.shortterm.warmstart
metrics.longterm[functionHash].warm_total_request += metric.warm_total_request metric.longterm.warm_total_request += metric.shortterm.warm_total_request
metrics.longterm[functionHash].warmstart /= (metrics.longterm[functionHash].warm_total_request != 0)? metric.longterm.warmstart /= (metric.longterm.warm_total_request != 0)?
metrics.longterm[functionHash].warm_total_request: 1 metric.longterm.warm_total_request: 1
metric.coldstart /= (metric.coldstart_total_request != 0)? metric.coldstart_total_request: 1 metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0)?
metric.warmstart /= (metric.warm_total_request != 0)? metric.warm_total_request: 1 metric.shortterm.coldstart_total_request: 1
metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0)?
metric.shortterm.warm_total_request: 1
metric.timestamp = Date.now()
} }
metrics.timestamp = Date.now()
console.log(metrics);
let log = [{ let log = [{
topic: log_channel, topic: log_channel,
...@@ -82,7 +80,16 @@ function broadcastMetrics() { ...@@ -82,7 +80,16 @@ function broadcastMetrics() {
partition: 0 partition: 0
}] }]
producer.send(log, () => { }) producer.send(log, () => { })
metrics.shortterm = {}
for (let [functionHash, metric] of Object.entries(metrics)) {
metric.shortterm = {
coldstart: 0,
coldstart_total_request: 0,
warm_total_request: 0,
warmstart: 0,
worker_count: 0
}
}
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment