Commit fb27d6a9 authored by Nilanjan Daw's avatar Nilanjan Daw

Adding support for permanent implicit chain metadata

parent 8a33c26a
{
"registry_url" :"10.129.6.5:5000/",
"master_port": 8080,
"master_address": "10.129.6.5",
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"db": {
......@@ -12,8 +12,9 @@
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "kafka:9092"
"kafka_host": "10.129.6.5:9092"
},
"external": {
"kafka_host": "10.129.6.5:9092"
......@@ -34,7 +35,7 @@
"metrics": {
"alpha": 0.7
},
"speculative_deployment": false,
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
{
"registry_url" :"10.129.6.5:5000/",
"master_port": 8080,
"master_address": "10.129.6.5",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "10.129.6.5:9092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": false,
"JIT_deployment": true,
"id_size": 20
}
......@@ -92,9 +92,15 @@ function runContainer(metadata) {
if (code != 0)
reject("error")
else {
const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`,
"--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let process = null;
if (constants.network.use_bridge)
process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`,
"--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
else
process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`,
"--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
......@@ -119,10 +125,15 @@ function runContainer(metadata) {
})
} else {
logger.info("container starting at port", port);
const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`,
"-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let process = null;
if (constants.network.use_bridge)
process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`,
"-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
else
process = spawn('docker', ["run", "--rm",
"-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
......
......@@ -158,7 +158,7 @@ function heartbeat() {
topic: "heartbeat",
messages: JSON.stringify({"address": node_id, "timestamp": Date.now()})
}]
producer.send(payload, function() {})
producer.send(payload, function(cb) {})
}
......
......@@ -27,7 +27,7 @@ function updateConfig() {
}
function makeTopic(id) {
console.log("Using Primary IP", id, "as topic");
console.log("Using Primary IP", id, "as topic", "publishing to:", constants.network.external.kafka_host);
let client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
......
......@@ -254,7 +254,7 @@ async function orchestrator(chain_id, res, payload, map, aliases, result) {
runtime: metadata.runtime,
payload
}),
headers: { 'Content-Type': 'application/json' }
headers: { 'Content-Type': 'application/json', 'x-chain-type': 'explicit' }
}
delete map[functionName]
aliases[functionName].status = "running"
......
......@@ -219,7 +219,7 @@ app.post('/serverless/execute/:id', (req, res) => {
res.timestamp = Date.now()
if (functionToResource.has(id)) {
res.start = 'warmstart'
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
libSupport.reverseProxy(req, res)
} else {
res.start = 'coldstart'
/**
......@@ -328,17 +328,14 @@ function postDeploy(message) {
"reason": "deployment",
"status": true,
starttime: (Date.now() - resource.deploy_request_time)
}, message.resource_id, resourceMap)
}, message.resource_id)
if (db.has(id)) {
let sendQueue = db.get(id)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
.then(() => {
})
libSupport.reverseProxy(req, res)
}
db.delete(id)
}
......@@ -404,7 +401,7 @@ consumer.on('message', function (message) {
"reason": "terminate",
"total_request": message.total_request,
"status": true
}, message.resource_id, resourceMap)
}, message.resource_id)
.then(() => {
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
......@@ -557,7 +554,7 @@ async function speculative_deployment(req, runtime) {
}
}
setInterval(libSupport.metrics.broadcastMetrics, 5000)
setInterval(libSupport.viterbi, 1000, functionBranchTree)
// setInterval(libSupport.viterbi, 1000)
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
......@@ -6,10 +6,18 @@ const winston = require('winston')
const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const metrics = require('./metrics')
const sharedMeta = require('./shared_meta')
const { createLogger, format, transports } = winston;
const heap = require('heap')
let db = sharedMeta.db, // queue holding request to be dispatched
resourceMap = sharedMeta.resourceMap, // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = sharedMeta.functionToResource, // a function to resource map. Each map contains a minheap of
// resources associated with the function
workerNodes = sharedMeta.workerNodes, // list of worker nodes currently known to the DM
functionBranchTree = sharedMeta.functionBranchTree // Holds the function path's and related probability distribution
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
......@@ -18,9 +26,7 @@ let kafka = require('kafka-node'),
}),
producer = new Producer(client)
let implicitChainDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
implicitChainDB = implicitChainDB + "/" + constants.implicit_chain_db_names + "/"
let implicitChainDB = sharedMeta.implicitChainDB
/**
* Generates unique IDs of arbitrary length
* @param {Length of the ID} length
......@@ -62,59 +68,73 @@ function generateExecutor(functionPath, functionHash) {
* Reverse proxy to take user requests and forward them to appropriate workers using a loadbalacer
* @param {JSON} req the user request to be forwarded to the worker
* @param {JSON} res Object to use to return the response to the user
* @param {Map} functionToResource Function to resource Map
* @param {Map} resourceMap Map from resource ID to resource metadata
* @param {Map} functionBranchTree Holds the function path's and related probability distribution
*/
function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) {
branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree)
return new Promise((resolve, reject) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
/**
* Bypass deployment pipeline if resource available
*/
let functionHeap = functionToResource.get(id)
// loadbalancing by choosing worker with lowest load
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
// logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
// "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
function reverseProxy(req, res) {
if (req.headers['x-chain-type'] !== 'explicit')
branchChainPredictor(req)
let runtime = req.body.runtime
let id = req.params.id + runtime
/**
* Bypass deployment pipeline if resource available
*/
let functionHeap = functionToResource.get(id)
// loadbalancing by choosing worker with lowest load
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
// logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
// "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// logger.info(functionHeap);
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// logger.info(functionHeap);
var options = {
method: 'POST',
uri: url,
body: req.body,
json: true // Automatically stringifies the body to JSON
};
rp(options)
.then(function (parsedBody) {
let serviceTime = Date.now() - res.timestamp
res.json(parsedBody)
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
resolve()
})
.catch(function (err) {
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
logger.error("error" + err);
res.json(err.message).status(err.statusCode)
resolve()
});
})
var options = {
method: 'POST',
uri: url,
body: req.body,
json: true // Automatically stringifies the body to JSON
};
rp(options)
.then(function (parsedBody) {
let serviceTime = Date.now() - res.timestamp
res.json(parsedBody)
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
let functionHash = req.params.id
let functionData = functionBranchTree.get(functionHash)
if (functionData && functionData.req_count % 5 == 0) {
if (functionData.parent)
viterbi(functionHash, functionData)
else {
functionData.branches = Array.from(functionData.branches.entries())
let payload = {
method: 'put',
body: JSON.stringify(functionBranchTree.get(functionHash)),
headers: { 'Content-Type': 'application/json' }
}
fetchData(implicitChainDB + functionHash, payload)
.then((updateStatus) => {
console.log(updateStatus);
if (updateStatus.error === undefined)
functionData._rev = updateStatus.rev
})
functionData.branches = new Map(functionData.branches)
}
}
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
})
.catch(function (err) {
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
logger.error("error" + err);
res.json(err.message).status(err.statusCode)
});
}
function getPort(usedPort) {
......@@ -161,10 +181,20 @@ function compare(a, b) {
return a.open_request_count - b.open_request_count
}
function branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree) {
async function branchChainPredictor(req) {
// console.log(req.headers['x-resource-id']);
if (!functionBranchTree.has(req.params.id)) {
let data = await fetchData(implicitChainDB + req.params.id)
if (data.error === "not_found")
console.log("no data", req.params.id);
else {
data.branches = new Map(data.branches)
functionBranchTree.set(req.params.id, data)
}
}
if (req.headers['x-resource-id'] === undefined) {
let functionHash = req.params.id
if (functionBranchTree.has(functionHash)) {
let branchInfo = functionBranchTree.get(functionHash)
......@@ -215,81 +245,87 @@ function branchChainPredictor(req, resourceMap, functionToResource, functionBran
// console.log("branch tree", functionBranchTree);
}
function viterbi(functionBranchTree) {
functionBranchTree.forEach((metadata, node) => {
if (metadata.parent && metadata.req_count % 5 == 0) {
let path = []
let parents = [[node, {
prob: 1,
metadata
}]]
path.push({node, probability: 1})
let siblings = new Map()
while(parents.length > 0) {
// console.log("parent_group", parents);
for (const parent of parents) {
// console.log("=========begin==========\n",parent, "\n=============end============");
// console.log(parent[1].metadata);
if (parent[1].metadata === undefined)
continue
let forwardBranches = parent[1].metadata.branches
// console.log(forwardBranches);
let parentProbability = parent[1].prob
forwardBranches.forEach((branchProb, subNode) => {
let probability = 0
if (siblings.has(subNode))
probability = siblings.get(subNode)
probability += branchProb * parentProbability
// console.log("prob", probability);
siblings.set(subNode, probability)
})
// console.log("siblings", siblings);
}
parents = []
let maxSibling, maxProb = 0
siblings.forEach((prob, sibling) => {
if (prob > maxProb) {
maxSibling = sibling
maxProb = prob
}
})
parentIDs = Array.from( siblings.keys() );
for (const id of parentIDs) {
let metadata = functionBranchTree.get(id)
parents.push([
id, {
prob: siblings.get(id),
metadata
}
])
}
if (maxSibling !== undefined)
path.push({node: maxSibling, probability: maxProb})
siblings = new Map()
async function viterbi(node, metadata) {
console.log("function branch tree", functionBranchTree.get(node));
let path = []
let parents = [[node, {
prob: 1,
metadata
}]]
path.push({ node, probability: 1 })
let siblings = new Map()
while (parents.length > 0) {
// console.log("parent_group", parents);
for (const parent of parents) {
// console.log("=========begin==========\n",parent, "\n=============end============");
// console.log(parent[1].metadata);
if (parent[1].metadata === undefined)
continue
let forwardBranches = parent[1].metadata.branches
// console.log(forwardBranches);
let parentProbability = parent[1].prob
forwardBranches.forEach((branchProb, subNode) => {
let probability = 0
if (siblings.has(subNode))
probability = siblings.get(subNode)
probability += branchProb * parentProbability
// console.log("prob", probability);
siblings.set(subNode, probability)
})
// console.log("siblings", siblings);
}
parents = []
let maxSibling, maxProb = 0
siblings.forEach((prob, sibling) => {
if (prob > maxProb) {
maxSibling = sibling
maxProb = prob
}
// if (path.length > 0)
// console.log("path", path);
metadata.mle_path = path
if (path.length > 1) {
let payload = {
method: 'put',
body: JSON.stringify(path),
headers: { 'Content-Type': 'application/json' }
})
parentIDs = Array.from(siblings.keys());
for (const id of parentIDs) {
let metadata = functionBranchTree.get(id)
parents.push([
id, {
prob: siblings.get(id),
metadata
}
fetch(implicitChainDB + functionHash, payload)
}
])
}
if (maxSibling !== undefined)
path.push({ node: maxSibling, probability: maxProb })
siblings = new Map()
}
if (path.length > 1)
console.log("path", path);
metadata.mle_path = path
if (path.length > 1) {
metadata.branches = Array.from(metadata.branches.entries())
let payload = {
method: 'put',
body: JSON.stringify(functionBranchTree.get(node)),
headers: { 'Content-Type': 'application/json' }
}
});
fetchData(implicitChainDB + node, payload)
.then((updateStatus) => {
console.log(updateStatus);
if (updateStatus.error === undefined)
metadata._rev = updateStatus.rev
})
metadata.branches = new Map(metadata.branches)
}
}
function logBroadcast(message, resource_id, resourceMap) {
function logBroadcast(message, resource_id) {
return new Promise((resolve, reject) => {
try {
......@@ -329,6 +365,6 @@ async function fetchData(url, data = null) {
module.exports = {
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi, logBroadcast, fetchData, metrics,
logBroadcast, fetchData, metrics,
producer
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment