Commit f725ed87 authored by Nilanjan Daw's avatar Nilanjan Daw

Implicit function chain call delay measurements

Added support for measurement of call delays in implicit function chains
parent 3d899293
{ {
"registry_url" :"10.129.6.5:5000/", "registry_url": "10.129.6.5:5000/",
"master_port": 8080, "master_port": 8080,
"master_address": "localhost", "master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
"metrics": { "metrics": {
"alpha": 0.7 "alpha": 0.7
}, },
"speculative_deployment": true, "speculative_deployment": false,
"JIT_deployment": true, "JIT_deployment": true,
"id_size": 20 "id_size": 20
} }
\ No newline at end of file
{
"registry_url": "localhost:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "localhost:5984",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": true,
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "localhost:29092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
\ No newline at end of file
{ {
"registry_url" :"10.129.6.5:5000/", "registry_url": "10.129.6.5:5000/",
"master_port": 8080, "master_port": 8080,
"master_address": "10.129.6.5", "master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984", "couchdb_host": "10.129.6.5:5984",
"db": { "db": {
...@@ -12,8 +12,9 @@ ...@@ -12,8 +12,9 @@
}, },
"network": { "network": {
"network_bridge": "hybrid_kafka-serverless", "network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": { "internal": {
"kafka_host": "kafka:9092" "kafka_host": "10.129.6.5:9092"
}, },
"external": { "external": {
"kafka_host": "10.129.6.5:9092" "kafka_host": "10.129.6.5:9092"
...@@ -34,7 +35,7 @@ ...@@ -34,7 +35,7 @@
"metrics": { "metrics": {
"alpha": 0.7 "alpha": 0.7
}, },
"speculative_deployment": false, "speculative_deployment": true,
"JIT_deployment": true, "JIT_deployment": true,
"id_size": 20 "id_size": 20
} }
\ No newline at end of file
...@@ -31,7 +31,8 @@ let usedPort = new Map(), // TODO: remove after integration with RM ...@@ -31,7 +31,8 @@ let usedPort = new Map(), // TODO: remove after integration with RM
functionBranchTree = sharedMeta.functionBranchTree, // a tree to store function branch predictions functionBranchTree = sharedMeta.functionBranchTree, // a tree to store function branch predictions
metricsDB = sharedMeta.metricsDB, metricsDB = sharedMeta.metricsDB,
metadataDB = sharedMeta.metadataDB metadataDB = sharedMeta.metadataDB,
implicitChainDB = sharedMeta.implicitChainDB
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -518,7 +519,6 @@ function autoscalar() { ...@@ -518,7 +519,6 @@ function autoscalar() {
async function speculative_deployment(req, runtime) { async function speculative_deployment(req, runtime) {
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) { if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
// console.log(functionBranchTree, req.params.id); // console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) { if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id) let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path); console.log("mle_path", branchInfo.mle_path);
...@@ -554,7 +554,6 @@ async function speculative_deployment(req, runtime) { ...@@ -554,7 +554,6 @@ async function speculative_deployment(req, runtime) {
} }
} }
setInterval(libSupport.metrics.broadcastMetrics, 5000) setInterval(libSupport.metrics.broadcastMetrics, 5000)
// setInterval(libSupport.viterbi, 1000)
setInterval(autoscalar, 1000); setInterval(autoscalar, 1000);
setInterval(dispatch, 1000); setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`)) app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
'use strict';
const crypto = require('crypto'); const crypto = require('crypto');
const fs = require('fs') const fs = require('fs')
const rp = require('request-promise'); const rp = require('request-promise');
...@@ -7,6 +8,7 @@ const constants = require('.././constants.json') ...@@ -7,6 +8,7 @@ const constants = require('.././constants.json')
const secrets = require('./secrets.json') const secrets = require('./secrets.json')
const metrics = require('./metrics') const metrics = require('./metrics')
const sharedMeta = require('./shared_meta') const sharedMeta = require('./shared_meta')
const util = require('util')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap') const heap = require('heap')
...@@ -14,9 +16,9 @@ const heap = require('heap') ...@@ -14,9 +16,9 @@ const heap = require('heap')
let db = sharedMeta.db, // queue holding request to be dispatched let db = sharedMeta.db, // queue holding request to be dispatched
resourceMap = sharedMeta.resourceMap, // map between resource_id and resource details like node_id, port, associated function etc resourceMap = sharedMeta.resourceMap, // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = sharedMeta.functionToResource, // a function to resource map. Each map contains a minheap of functionToResource = sharedMeta.functionToResource, // a function to resource map. Each map contains a minheap of
// resources associated with the function // resources associated with the function
workerNodes = sharedMeta.workerNodes, // list of worker nodes currently known to the DM functionBranchTree = sharedMeta.functionBranchTree, // Holds the function path's and related probability distribution
functionBranchTree = sharedMeta.functionBranchTree // Holds the function path's and related probability distribution timelineQueue = new Map() // a temporary map holding request timestamps to be used for calulcating implicit chain invocation delays
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -88,8 +90,6 @@ function reverseProxy(req, res) { ...@@ -88,8 +90,6 @@ function reverseProxy(req, res) {
// logger.info("Request received at reverseproxy. Forwarding to: " + url); // logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1 forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// logger.info(functionHeap);
var options = { var options = {
method: 'POST', method: 'POST',
uri: url, uri: url,
...@@ -107,6 +107,7 @@ function reverseProxy(req, res) { ...@@ -107,6 +107,7 @@ function reverseProxy(req, res) {
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
let functionHash = req.params.id let functionHash = req.params.id
let functionData = functionBranchTree.get(functionHash) let functionData = functionBranchTree.get(functionHash)
if (functionData && functionData.req_count % 5 == 0) { if (functionData && functionData.req_count % 5 == 0) {
if (functionData.parent) if (functionData.parent)
viterbi(functionHash, functionData) viterbi(functionHash, functionData)
...@@ -130,18 +131,18 @@ function reverseProxy(req, res) { ...@@ -130,18 +131,18 @@ function reverseProxy(req, res) {
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime}) metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
}) })
.catch(function (err) { .catch(function (err) {
res.json(err.message).status(err.statusCode)
forwardTo.open_request_count -= 1 forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
logger.error("error" + err); logger.error("error" + err)
res.json(err.message).status(err.statusCode)
}); });
} }
function getPort(usedPort) { function getPort(usedPort) {
let port = -1, ctr = 0 let port = -1, ctr = 0
do { do {
min = Math.ceil(30000); let min = Math.ceil(30000);
max = Math.floor(60000); let max = Math.floor(60000);
port = Math.floor(Math.random() * (max - min + 1)) + min; port = Math.floor(Math.random() * (max - min + 1)) + min;
ctr += 1; ctr += 1;
if (ctr > 30000) { if (ctr > 30000) {
...@@ -183,16 +184,24 @@ function compare(a, b) { ...@@ -183,16 +184,24 @@ function compare(a, b) {
async function branchChainPredictor(req) { async function branchChainPredictor(req) {
// console.log(req.headers['x-resource-id']); // console.log(req.headers['x-resource-id']);
let destinationTimestamp = Date.now()
if (!functionBranchTree.has(req.params.id)) { if (!functionBranchTree.has(req.params.id)) {
let data = await fetchData(implicitChainDB + req.params.id) let data = await fetchData(implicitChainDB + req.params.id)
if (data.error === "not_found") if (data.error !== "not_found") {
console.log("no data", req.params.id);
else {
data.branches = new Map(data.branches) data.branches = new Map(data.branches)
functionBranchTree.set(req.params.id, data) functionBranchTree.set(req.params.id, data)
} }
} }
if (functionBranchTree.has(req.params.id) && functionBranchTree.get(req.params.id).branches.size > 0) {
console.log(timelineQueue.has(req.params.id), timelineQueue.get(req.params.id));
if (!timelineQueue.has(req.params.id)) {
timelineQueue.set(req.params.id, [])
}
timelineQueue.get(req.params.id).push(destinationTimestamp)
}
if (req.headers['x-resource-id'] === undefined) { if (req.headers['x-resource-id'] === undefined) {
let functionHash = req.params.id let functionHash = req.params.id
...@@ -213,40 +222,49 @@ async function branchChainPredictor(req) { ...@@ -213,40 +222,49 @@ async function branchChainPredictor(req) {
} else { } else {
let resource_id = req.headers['x-resource-id'] let resource_id = req.headers['x-resource-id']
let resource = resourceMap.get(resource_id) let resource = resourceMap.get(resource_id)
let forwardBranch = req.params.id let forwardBranch = req.params.id, callDelay
if (timelineQueue.has(resource.functionHash)) {
let sourceTimestamp = timelineQueue.get(resource.functionHash).shift()
callDelay = destinationTimestamp - sourceTimestamp
// console.log("callDelay", callDelay);
}
if (!functionBranchTree.has(resource.functionHash)) { if (!functionBranchTree.has(resource.functionHash)) {
let data = { let data = {
req_count: 1, req_count: 1,
parent: false, parent: false,
branches: new Map() branches: new Map()
} }
data.branches.set(forwardBranch, 1) data.branches.set(forwardBranch, [1, callDelay])
functionBranchTree.set(resource.functionHash, data) functionBranchTree.set(resource.functionHash, data)
} else { } else {
let branchInfo = functionBranchTree.get(resource.functionHash) let branchInfo = functionBranchTree.get(resource.functionHash)
if (!branchInfo.parent) if (!branchInfo.parent)
branchInfo.req_count++ branchInfo.req_count++
if (branchInfo.branches.has(forwardBranch)) { if (branchInfo.branches.has(forwardBranch)) {
let branchProb = branchInfo.branches.get(forwardBranch) callDelay = constants.metrics.alpha * branchInfo.branches.get(forwardBranch)[1]
+ callDelay * (1 - constants.metrics.alpha)
let branchProb = branchInfo.branches.get(forwardBranch)[0]
branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0) branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0)
branchInfo.branches.set(forwardBranch, branchProb) branchInfo.branches.set(forwardBranch, [branchProb, callDelay])
} else { } else {
branchInfo.branches.set(forwardBranch, 1.0) branchInfo.branches.set(forwardBranch, [1.0, callDelay])
} }
for (let [branch, prob] of branchInfo.branches.entries()) { for (let [branch, prob] of branchInfo.branches.entries()) {
if (branch !== forwardBranch) if (branch !== forwardBranch)
prob *= (branchInfo.req_count - 1) prob[0] *= (branchInfo.req_count - 1)
prob /= branchInfo.req_count prob[0] /= branchInfo.req_count
branchInfo.branches.set(branch, prob)
} }
} }
} }
// console.log("timelineQueue", timelineQueue);
// console.log("branch tree", functionBranchTree); // console.log("branch tree", util.inspect(functionBranchTree, false, null, true /* enable colors */));
} }
async function viterbi(node, metadata) { async function viterbi(node, metadata) {
console.log("function branch tree", functionBranchTree.get(node));
let path = [] let path = []
let parents = [[node, { let parents = [[node, {
prob: 1, prob: 1,
...@@ -272,7 +290,7 @@ async function viterbi(node, metadata) { ...@@ -272,7 +290,7 @@ async function viterbi(node, metadata) {
let probability = 0 let probability = 0
if (siblings.has(subNode)) if (siblings.has(subNode))
probability = siblings.get(subNode) probability = siblings.get(subNode)
probability += branchProb * parentProbability probability += branchProb[0] * parentProbability
// console.log("prob", probability); // console.log("prob", probability);
siblings.set(subNode, probability) siblings.set(subNode, probability)
...@@ -288,7 +306,7 @@ async function viterbi(node, metadata) { ...@@ -288,7 +306,7 @@ async function viterbi(node, metadata) {
maxProb = prob maxProb = prob
} }
}) })
parentIDs = Array.from(siblings.keys()); let parentIDs = Array.from(siblings.keys());
for (const id of parentIDs) { for (const id of parentIDs) {
let metadata = functionBranchTree.get(id) let metadata = functionBranchTree.get(id)
parents.push([ parents.push([
...@@ -305,9 +323,8 @@ async function viterbi(node, metadata) { ...@@ -305,9 +323,8 @@ async function viterbi(node, metadata) {
if (path.length > 1) if (path.length > 1)
console.log("path", path); console.log("path", path);
metadata.mle_path = path
if (path.length > 1) { if (path.length > 1) {
metadata.mle_path = path
metadata.branches = Array.from(metadata.branches.entries()) metadata.branches = Array.from(metadata.branches.entries())
let payload = { let payload = {
method: 'put', method: 'put',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment