Commit 96b1f069 authored by Nilanjan Daw's avatar Nilanjan Daw

Added Dispatcher side instrumentation

Added different dispatcher side instrumentation. More to be added.
parent 74de83a5
......@@ -6,3 +6,4 @@ firecracker*
secrets.json
resource_system/bin/**
resource_system/version.linux
local_experiments/
......@@ -18,5 +18,6 @@
"autoscalar_metrics": {
"open_request_threshold": 100
},
"speculative_deployment": true
"speculative_deployment": true,
"id_size": 20
}
\ No newline at end of file
"use strict";
const express = require('express')
const bodyParser = require('body-parser')
const express = require('express');
const bodyParser = require('body-parser');
const fileUpload = require('express-fileupload');
const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const fs = require('fs')
const constants = require('.././constants.json');
const chainHandler = require('./explicit_chain_handler');
const secrets = require('./secrets.json');
const fs = require('fs');
const { spawn } = require('child_process');
const morgan = require('morgan')
const heap = require('heap')
const morgan = require('morgan');
const heap = require('heap');
const fetch = require('node-fetch');
const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json');
......@@ -53,14 +54,14 @@ let kafka = require('kafka-node'),
app.use(morgan('combined', {
skip: function (req, res) { return res.statusCode < 400 }
}))
app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json())
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path));
app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec }));
app.use('/serverless/chain', chainHandler);
let requestQueue = []
const WINDOW_SIZE = 10
......@@ -207,7 +208,7 @@ app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
res.timestamp = Date.now()
if (functionToResource.has(id)) {
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
} else {
......@@ -244,7 +245,7 @@ function dispatch() {
if (!db.has(functionHash + runtime)) {
db.set(functionHash + runtime, [])
db.get(functionHash + runtime).push({ req, res })
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
let resource_id = libSupport.makeid(constants.id_size) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
logger.info("Requesting RM " + JSON.stringify({
......@@ -256,7 +257,7 @@ function dispatch() {
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null,
deployed: false
deployed: false, deploy_request_time: Date.now()
})
......@@ -314,7 +315,10 @@ function dispatch() {
}
}
/**
* Handles post deployment metadata updates and starts reverse-proxying
* @param {string} message Message received from DD after deployment
*/
function postDeploy(message) {
logger.info("Deployed Resource: " + JSON.stringify(message));
......@@ -358,24 +362,12 @@ function postDeploy(message) {
try {
let resource = resourceMap.get(message.resource_id)
resource.deployed = true
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
// type: "deployment_launch",
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
entity_id: message.entity_id,
"reason": "deployment",
"status": true,
"timestamp": Date.now()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
libSupport.logBroadcast({
entity_id: message.entity_id,
"reason": "deployment",
"status": true,
coldstart_time: (Date.now() - resource.deploy_request_time)
}, message.resource_id, resourceMap)
if (db.has(message.functionHash + message.runtime)) {
let sendQueue = db.get(message.functionHash + message.runtime)
......@@ -437,17 +429,25 @@ consumer.on('message', function (message) {
}
heap.heapify(resourceArray, libSupport.compare)
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
functionToResource.delete(message.functionHash + message.runtime)
libSupport.logBroadcast({
entity_id: message.entity_id,
"reason": "terminate",
"total_request": message.total_request,
"status": true
}, message.resource_id, resourceMap)
.then(() => {
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
functionToResource.delete(message.functionHash + message.runtime)
})
}
} else if (topic == constants.topics.hscale) {
message = JSON.parse(message)
let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID
let resource_id = libSupport.makeid(constants.id_size), // each function resource request is associated with an unique ID
runtime = message.runtime,
functionHash = message.functionHash
console.log("Resource Status: ", functionToResource);
......@@ -461,7 +461,7 @@ consumer.on('message', function (message) {
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null,
deployed: false
deployed: false, deploy_request_time: Date.now()
})
......@@ -531,7 +531,21 @@ function autoscalar() {
}
function periodicMetricBroadcast() {
let message = {}, flag = false
message.reason = "resource_per_function"
functionToResource.forEach((functionHeap, functionHash) => {
if (functionHeap.length > 0) {
message[functionHash] = functionHeap.length
flag = true
}
})
if (flag)
libSupport.logBroadcast(message)
}
setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
setInterval(periodicMetricBroadcast, 5000)
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
......@@ -8,6 +8,14 @@ const constants = require('.././constants.json')
const { createLogger, format, transports } = winston;
const heap = require('heap')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
autoConnect: true
}),
producer = new Producer(client)
/**
* Generates unique IDs of arbitrary length
* @param {Length of the ID} length
......@@ -77,6 +85,8 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
rp(options)
.then(function (parsedBody) {
let serviceTime = Date.now() - res.timestamp
console.log(serviceTime, res.timestamp);
res.json(parsedBody)
forwardTo.open_request_count -= 1
......@@ -257,12 +267,40 @@ function viterbi(functionBranchTree) {
metadata.mle_path = path
}
});
}
function logBroadcast(message, resource_id, resourceMap) {
return new Promise((resolve, reject) => {
try {
message.timestamp = Date.now()
if (resource_id && resourceMap.has(resource_id)) {
let resource = resourceMap.get(resource_id)
message.resource_id = resource_id
message.node_id = resource.node_id
message.runtime = resource.runtime
message.function_id = resource.functionHash
}
let log = [{
topic: constants.log_channel,
messages: JSON.stringify({
message
}),
partition: 0
}]
producer.send(log, () => {
resolve()
})
} catch (err) {
console.log(err);
reject()
}
})
}
module.exports = {
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi
viterbi, logBroadcast
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment