Commit f5d6e780 authored by Nilanjan Daw's avatar Nilanjan Daw

Merge branch 'explicit_function_chaining'

parents 4d074b06 a6391a8c
{
"registry_url" :"10.129.6.5:5000/",
"registry_url": "10.129.6.5:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
......@@ -37,5 +37,6 @@
},
"speculative_deployment": true,
"JIT_deployment": true,
"aggressivity": 0.8,
"id_size": 20
}
\ No newline at end of file
{
"registry_url": "localhost:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "localhost:5984",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": true,
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "localhost:29092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
\ No newline at end of file
{
"registry_url" :"10.129.6.5:5000/",
"registry_url": "10.129.6.5:5000/",
"master_port": 8080,
"master_address": "10.129.6.5",
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"db": {
......@@ -12,8 +12,9 @@
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "kafka:9092"
"kafka_host": "10.129.6.5:9092"
},
"external": {
"kafka_host": "10.129.6.5:9092"
......@@ -34,7 +35,7 @@
"metrics": {
"alpha": 0.7
},
"speculative_deployment": false,
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
\ No newline at end of file
......@@ -31,7 +31,8 @@ let usedPort = new Map(), // TODO: remove after integration with RM
functionBranchTree = sharedMeta.functionBranchTree, // a tree to store function branch predictions
metricsDB = sharedMeta.metricsDB,
metadataDB = sharedMeta.metadataDB
metadataDB = sharedMeta.metadataDB,
implicitChainDB = sharedMeta.implicitChainDB
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -514,30 +515,94 @@ function autoscalar() {
*
* FIXME: Currently supports homogenous runtime chain i.e takes runtime as a param.
* Change it to also profile runtime
* FIXME: Hardcoded container as a runtime. Make dynamic.
*/
async function speculative_deployment(req, runtime) {
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
// console.log(functionBranchTree, req.params.id);
if (!functionBranchTree.has(req.params.id)) {
let data = await libSupport.fetchData(implicitChainDB + req.params.id)
if (data.error !== "not_found") {
data.branches = new Map(data.branches)
functionBranchTree.set(req.params.id, data)
}
}
console.log(util.inspect(functionBranchTree, false, null, true /* enable colors */));
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
/**
* calculating the depth upto which speculative deployment will work
*/
let deployDepth = branchInfo.mle_path.length * constants.aggressivity
if (constants.JIT_deployment) {
/**
* Perform Speculation with JIT
*/
for (let node of branchInfo.mle_path)
node.id = node.node
let metrics = await libSupport.fetchData(metricsDB + "_bulk_get", {
let metricsPromise = libSupport.fetchData(metricsDB + "_bulk_get", {
method: 'post',
body: JSON.stringify({
docs: branchInfo.mle_path
}),
headers: { 'Content-Type': 'application/json' },
})
let chainDataPromise = libSupport.fetchData(implicitChainDB + "_bulk_get", {
method: 'post',
body: JSON.stringify({
docs: branchInfo.mle_path
}),
headers: { 'Content-Type': 'application/json' },
})
console.log(util.inspect(metrics, false, null, true /* enable colors */))
/**
* Get the branch chain and the metrics data related to the MLE path
*/
Promise.all([metricsPromise, chainDataPromise])
.then(data => {
let metrics = new Map(), chainData = new Map()
let currentDelay = 0
data[0] = data[0].results, data[1] = data[1].results
for (let i = 0; i < deployDepth; i++) {
let id = data[0][i].id
metrics[id] = data[0][i].docs[0].ok
id = data[1][i].id
chainData[id] = data[1][i].docs[0].ok
if (chainData[id])
chainData[id].branches = new Map(chainData[id].branches)
}
currentDelay = metrics[branchInfo.mle_path[0].id].container.starttime
for (let i = 1; i < deployDepth; i++) {
let parent = chainData[branchInfo.mle_path[i - 1].id]
let self = branchInfo.mle_path[i].id
console.log(self);
currentDelay += parent.branches.get(self)[1]
let invokeTime = currentDelay - metrics[self].container.starttime
invokeTime = (invokeTime < 0)? 0: invokeTime
console.log(self, "current delay", currentDelay, "invoke time:", currentDelay - metrics[self].container.starttime);
setTimeout(chainHandler.notify, invokeTime, "container", self)
}
})
} else {
/**
* Perform Speculation without JIT
*/
let depthCounter = 0
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (depthCounter > deployDepth)
break
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
......@@ -548,13 +613,14 @@ async function speculative_deployment(req, runtime) {
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
depthCounter++
}
}
}
}
}
}
setInterval(libSupport.metrics.broadcastMetrics, 5000)
// setInterval(libSupport.viterbi, 1000)
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
'use strict';
const crypto = require('crypto');
const fs = require('fs')
const rp = require('request-promise');
......@@ -7,6 +8,7 @@ const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const metrics = require('./metrics')
const sharedMeta = require('./shared_meta')
const util = require('util')
const { createLogger, format, transports } = winston;
const heap = require('heap')
......@@ -15,8 +17,8 @@ let db = sharedMeta.db, // queue holding request to be dispatched
resourceMap = sharedMeta.resourceMap, // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = sharedMeta.functionToResource, // a function to resource map. Each map contains a minheap of
// resources associated with the function
workerNodes = sharedMeta.workerNodes, // list of worker nodes currently known to the DM
functionBranchTree = sharedMeta.functionBranchTree // Holds the function path's and related probability distribution
functionBranchTree = sharedMeta.functionBranchTree, // Holds the function path's and related probability distribution
timelineQueue = new Map() // a temporary map holding request timestamps to be used for calulcating implicit chain invocation delays
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -49,13 +51,13 @@ function makeid(length) {
* @param {string Function Hash value} functionHash
*/
function generateExecutor(functionPath, functionHash) {
input = fs.readFileSync('./repository/worker_env/env.js')
functionFile = fs.readFileSync(functionPath + functionHash)
searchSize = "(resolve, reject) => {".length
let input = fs.readFileSync('./repository/worker_env/env.js')
let functionFile = fs.readFileSync(functionPath + functionHash)
let searchSize = "(resolve, reject) => {".length
insertIndex = input.indexOf("(resolve, reject) => {") + searchSize
let insertIndex = input.indexOf("(resolve, reject) => {") + searchSize
output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
let output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
let hash = crypto.createHash('md5').update(output).digest("hex");
console.log(hash);
......@@ -69,7 +71,7 @@ function generateExecutor(functionPath, functionHash) {
* @param {JSON} req the user request to be forwarded to the worker
* @param {JSON} res Object to use to return the response to the user
*/
function reverseProxy(req, res) {
async function reverseProxy(req, res) {
if (req.headers['x-chain-type'] !== 'explicit')
branchChainPredictor(req)
let runtime = req.body.runtime
......@@ -88,8 +90,6 @@ function reverseProxy(req, res) {
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// logger.info(functionHeap);
var options = {
method: 'POST',
uri: url,
......@@ -97,9 +97,8 @@ function reverseProxy(req, res) {
json: true // Automatically stringifies the body to JSON
};
rp(options)
.then(function (parsedBody) {
try {
let parsedBody = await rp(options)
let serviceTime = Date.now() - res.timestamp
res.json(parsedBody)
......@@ -107,10 +106,16 @@ function reverseProxy(req, res) {
heap.heapify(functionHeap, compare)
let functionHash = req.params.id
let functionData = functionBranchTree.get(functionHash)
if (functionData && functionData.req_count % 5 == 0) {
if (functionData.parent)
viterbi(functionHash, functionData)
else {
let head = await fetch(implicitChainDB + functionHash, {
method: "head"
})
functionData._rev = head.headers.get("etag").substring(1, head.headers.get("etag").length - 1)
functionData.branches = Array.from(functionData.branches.entries())
let payload = {
method: 'put',
......@@ -128,20 +133,20 @@ function reverseProxy(req, res) {
}
}
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
})
.catch(function (err) {
}
catch(err) {
res.json(err.message).status(err.statusCode)
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
logger.error("error" + err);
res.json(err.message).status(err.statusCode)
});
logger.error("error" + err)
}
}
function getPort(usedPort) {
let port = -1, ctr = 0
do {
min = Math.ceil(30000);
max = Math.floor(60000);
let min = Math.ceil(30000);
let max = Math.floor(60000);
port = Math.floor(Math.random() * (max - min + 1)) + min;
ctr += 1;
if (ctr > 30000) {
......@@ -183,16 +188,24 @@ function compare(a, b) {
async function branchChainPredictor(req) {
// console.log(req.headers['x-resource-id']);
let destinationTimestamp = Date.now()
if (!functionBranchTree.has(req.params.id)) {
let data = await fetchData(implicitChainDB + req.params.id)
if (data.error === "not_found")
console.log("no data", req.params.id);
else {
if (data.error !== "not_found") {
data.branches = new Map(data.branches)
functionBranchTree.set(req.params.id, data)
}
}
if (functionBranchTree.has(req.params.id) && functionBranchTree.get(req.params.id).branches.size > 0) {
// console.log(timelineQueue.has(req.params.id), timelineQueue.get(req.params.id));
if (!timelineQueue.has(req.params.id)) {
timelineQueue.set(req.params.id, [])
}
timelineQueue.get(req.params.id).push(destinationTimestamp)
}
if (req.headers['x-resource-id'] === undefined) {
let functionHash = req.params.id
......@@ -213,40 +226,51 @@ async function branchChainPredictor(req) {
} else {
let resource_id = req.headers['x-resource-id']
let resource = resourceMap.get(resource_id)
let forwardBranch = req.params.id
let forwardBranch = req.params.id, callDelay = 0
if (timelineQueue.has(resource.functionHash)) {
let sourceTimestamp = timelineQueue.get(resource.functionHash).shift()
callDelay = destinationTimestamp - sourceTimestamp
// console.log("callDelay", callDelay);
}
if (!functionBranchTree.has(resource.functionHash)) {
let data = {
req_count: 1,
parent: false,
branches: new Map()
}
data.branches.set(forwardBranch, 1)
data.branches.set(forwardBranch, [1, callDelay])
functionBranchTree.set(resource.functionHash, data)
} else {
let branchInfo = functionBranchTree.get(resource.functionHash)
if (!branchInfo.parent)
branchInfo.req_count++
if (branchInfo.branches.has(forwardBranch)) {
let branchProb = branchInfo.branches.get(forwardBranch)
callDelay = constants.metrics.alpha * branchInfo.branches.get(forwardBranch)[1]
+ callDelay * (1 - constants.metrics.alpha)
console.log("call delay", callDelay);
let branchProb = branchInfo.branches.get(forwardBranch)[0]
branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0)
branchInfo.branches.set(forwardBranch, branchProb)
branchInfo.branches.set(forwardBranch, [branchProb, callDelay])
} else {
branchInfo.branches.set(forwardBranch, 1.0)
branchInfo.branches.set(forwardBranch, [1.0, callDelay])
}
for (let [branch, prob] of branchInfo.branches.entries()) {
if (branch !== forwardBranch)
prob *= (branchInfo.req_count - 1)
prob /= branchInfo.req_count
branchInfo.branches.set(branch, prob)
prob[0] *= (branchInfo.req_count - 1)
prob[0] /= branchInfo.req_count
}
}
}
// console.log("timelineQueue", timelineQueue);
// console.log("branch tree", functionBranchTree);
// console.log("branch tree", util.inspect(functionBranchTree, false, null, true /* enable colors */));
}
async function viterbi(node, metadata) {
console.log("function branch tree", functionBranchTree.get(node));
let path = []
let parents = [[node, {
prob: 1,
......@@ -272,7 +296,7 @@ async function viterbi(node, metadata) {
let probability = 0
if (siblings.has(subNode))
probability = siblings.get(subNode)
probability += branchProb * parentProbability
probability += branchProb[0] * parentProbability
// console.log("prob", probability);
siblings.set(subNode, probability)
......@@ -288,7 +312,7 @@ async function viterbi(node, metadata) {
maxProb = prob
}
})
parentIDs = Array.from(siblings.keys());
let parentIDs = Array.from(siblings.keys());
for (const id of parentIDs) {
let metadata = functionBranchTree.get(id)
parents.push([
......@@ -305,10 +329,15 @@ async function viterbi(node, metadata) {
if (path.length > 1)
console.log("path", path);
metadata.mle_path = path
if (path.length > 1) {
metadata.mle_path = path
metadata.branches = Array.from(metadata.branches.entries())
let head = await fetch(implicitChainDB + node, {
method: "head"
})
metadata._rev = head.headers.get("etag").substring(1, head.headers.get("etag").length - 1)
let payload = {
method: 'put',
body: JSON.stringify(functionBranchTree.get(node)),
......
<mxfile host="Electron" modified="2020-04-02T13:40:21.628Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/12.6.5 Chrome/80.0.3987.86 Electron/8.0.0 Safari/537.36" etag="YQW2D_Ds8ta8mPFvP2tW" version="12.6.5" type="device"><diagram id="_vj1HQFHM5RY5pbnoe9b" name="Page-1">3Vldk5owFP01PupAIuI+rh9r21VnZ+207r44qWQhWyROiAL99Y0SBAx1dfyA7YuTe5LA5dxz7w1Yg91FOGBo6Yyohd0a0KywBns1ANqttvjdAFEMGDqMAZsRK4b0FJiQP1iCmkRXxMJ+biGn1OVkmQfn1PPwnOcwxBgN8sveqJu/6xLZWAEmc+Sq6E9icUc+FjBT/AsmtpPcWW/dxTMLlCyWT+I7yKJBBoL9GuwySnk8WoRd7G64S3iJ9z38Y3bnGMMeP2bD6Pm9P/v2Cuj9eGY5zwMT3JN6S/rGo+SBGV15Ft7s0WqwQxl3qE095A4pXQpQF+A75jySoUIrTgXk8IUrZ3FI+DQzfpGX2ox7YdaIEsPjLJpmje2ehpGY6batlezzOaO/d2ERhHZUTiRNPl2xOT5ARKItxGzMD6yT6sVWTjaS8QGmCyw8FAsYdhEn67yKkBSjvVu32/pEiXAZaDJvQFOqRqaNnqgouUTsqNyVRl0MMm6k0FYLxbrwHzszMAoex7+i+WQ4NYPvENSbt9GFfpwu6lpD00FeHBB8JI+t9YQZEWRg9oFm9gJ6XREZZ2omF+0DoT3k5Bq5K3mnB12JtriOqLDC6AQO4XiyRNsHD0SNz0cV+cu47L6RcKOOU5JyjRnH4cE0krPGXja0pR1kSrKEnEw1bmvnc/jqj/nKBsbX0bD3w2KROYvmSRXIEib0M5GmR70Nc+dnTAXqIjhS0s1L18Wz4qOWr1vGp3XZ0nTNsBllha2w6xi3CdsZB5SGZhr5YJsV7EPHRh+W2YeA2ofAJ+tDUC+5D6mE/RdNB1a96RzyOivognNCtQQNWnlF3/BgVdgB2jftAFLVp72MZFLhJTt3Yl5cpNA31UQppNUsMy+aal4UNPpq5YWh7b1xwJITQ/1QU7Wj0WXaxUXSwjilf5R12L0rL6K7sve5Y1q9Umeopa7gVbRapW7/CADNkkudqZJYUP0uTqLCWAGvR/eLK5IozPRPg/h7b/rPC+z/BQ==</diagram></mxfile>
\ No newline at end of file
vanilla 26.69 24.16 23.74 25.39 24.07
speculative 22.07 21.94 22.07 21.94 22.14
jit 23.3 23.25 23.29 23.15 23.79
63 107 68 75 72
61 111 60 79 80
61 79 79 67 75
75 78 63 72 74
47 49 48 49 46
53 51 54 48 51
214 154 144 136 173
125 170 147 188 162
5048 5026 5031 4971 5618
5837 5890 5041 5794 5126
10194 10164 9944 10140 10132
15119 15460 15023 15183 15146
94 78 81 80 110
98 87 72 80 81
81 76 86 145 75
77 74 96 118 74
67 63 68 68 64
181 167 181 89 757
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment