Commit 63be6dbb authored by NILANJAN DAW's avatar NILANJAN DAW

added planning stage to RM

added a workflow level preplanner to the RM
parent ce3d339d
<mxfile host="Electron" modified="2020-06-15T14:30:41.474Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/12.6.5 Chrome/80.0.3987.86 Electron/8.0.0 Safari/537.36" etag="nqMv0_VMV6sYaUiKd7LX" version="12.6.5" type="device"><diagram id="_y9gK2tty0O1r2PkF82Q" name="Page-1">5Vtbd6I6FP41Ps5ZQADxsdVOz8OZa9ecmT5GiJppJC6IVfvrJ5Egl0TrVCQV27XasBMC+fbOvnzRHhjO1/cJXMw+0QiRnmNF6x4Y9Rwn8AP+Vwg2mcCzQSaYJjjKRHYheMAvSAotKV3iCKWVgYxSwvCiKgxpHKOQVWQwSeiqOmxCSfWpCzhFiuAhhESV/sQRm0mp77lFx78IT2fy0Q4AftYzh/louZR0BiO6KonAXQ8ME0pZ1pqvh4gI8HJgsvs+7undvVmCYnbMDf+HT2v723c7de/6L/Fk8vW3Az705buxTb5iFHEA5CVN2IxOaQzJXSG9TegyjpCY1eJXxZj/KF1woc2FvxFjG6lNuGSUi2ZsTmQvf+Fk80vev714FBf/ePnlaF3uHG3klbpiCUJKl0mIDiwzNx2YTBE7MM7LxgkMSg+QeN4jOkf8ffiABBHI8HPVSKC0teluXKEN3pAK+QvlyHmfIVnKJ6naIoRvBaGV1Qwz9LCAWxxWfDNWMYfpItsfE7wWutsP5jNKGFofXH7eO5CbWW5uO9+1q2KrACmalTZJLmscMEcBbGQ1CNkEEzKkhCbbicDEE79cnrKEPqFSj7/9EXfQmJXk2U8z4APPqoAfGMYeXBP2gyr2xg3fvSbwa4Zvu4bB91Tw7TOCP0F+GOrAj/qDsXUwTv4FyJY5C/9Bf44WLGbkPrIenuDjpy8B1sTCyTIOGaaxAjVfIqthSvA05u2QA4I4WrcCCMyzvBvZMcdRlOU2KMUvcLydSkC5oDhm27V4tz1vJObi6UyaZTb2DmsFWA38+8NoP3jVoH0N1s65sFbD6CrhNnvxQDtuzagtFWi3TaBVz6FgzKuGhWhOCFrfiIJmmxVHsjkKCUxTHFa10NMlsUcBVQLC0wCRy45Of+UTvgrlFnpw6x68DnCWz8u7yiVNbSIPvDJRlvArE22VtVv22/UXtOr5Iw8Fkavz/IEzBtuw28Am8S1jeaUW40EXMa47IuPR1eoiyvUcpsVEUY+yq0B6OpeyxuxXqf0opxLtgkYRF1UW5fXoUKVV9AvS8Cr7HWULNIr+LT3VkkskV0zjZlgtnSYyTmuvLk5jwtrV4cCoDn1jOrQ7o0HbMqpCtYR7F3XySTHGq6eyxiO5ysOlyzBEaapAfWnlW71sGBguk+0LPDxpxuEERzqcrJ415nDU8qxBd9OAQdfPT3amasx5aIqt98wjn4S+wiNr/Emr6DuaIqy76NeOUIzbvqNJT7qLfs32d8yeMfQdFf2LTw7rhyjmbVxNDrvJ7DuaNLxVZt9RT2QVkK+B2lds+a3UvjLRmal9R3Oq29lw4Ae1VMg06Q9UpvScdE/O6hxD2R27A4+ooxxfr5UT96Bf34Nuy1vHv6Kt07ffW4zvXxH6fr2K8DzD6AcK1Cd7qSY9jtdJjwM0Bt7BcNHvpvIO06hNKe94RbxKa4JTPxX+RkXsyZ0bU0TLDEgQIn0NPg48UUec5YDGeHwGKtPxEWKyTC6/Cnfz6Jtjrfl8na72PFsVDlS+o7OfGtWlPq0ehwE17R/SOEVxurz8s0dPSTRV0w5aRVs93PpCIi4YQQZ74Ea0+LN9ImAeJ7w1Fa3PaJWP4f9UhvXS9FLfBRq1DJrx7vyy+K5eFnKLbzyCuz8=</diagram></mxfile>
\ No newline at end of file
...@@ -261,13 +261,13 @@ function dispatch() { ...@@ -261,13 +261,13 @@ function dispatch() {
db.set(functionHash + runtime, []) db.set(functionHash + runtime, [])
db.get(functionHash + runtime).push({ req, res }) db.get(functionHash + runtime).push({ req, res })
let payload = [{ // let payload = [{
topic: constants.topics.hscale, // topic: constants.topics.hscale,
messages: JSON.stringify({ runtime, functionHash }) // messages: JSON.stringify({ runtime, functionHash })
}] // }]
producer.send(payload, function () { }) // producer.send(payload, function () { })
speculative_deployment(req, runtime) deployment(req, runtime)
} else { } else {
logger.info("deployment process already started waiting") logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res }) db.get(functionHash + runtime).push({ req, res })
...@@ -444,6 +444,8 @@ consumer.on('message', function (message) { ...@@ -444,6 +444,8 @@ consumer.on('message', function (message) {
messages: JSON.stringify({ messages: JSON.stringify({
resource_id, resource_id,
"memory": 332, "memory": 332,
plan: message.plan,
functionHash
}), }),
partition: 0 partition: 0
}] }]
...@@ -515,9 +517,8 @@ function autoscalar() { ...@@ -515,9 +517,8 @@ function autoscalar() {
* *
* FIXME: Currently supports homogenous runtime chain i.e takes runtime as a param. * FIXME: Currently supports homogenous runtime chain i.e takes runtime as a param.
* Change it to also profile runtime * Change it to also profile runtime
* FIXME: Hardcoded container as a runtime. Make dynamic.
*/ */
async function speculative_deployment(req, runtime) { async function deployment(req, runtime) {
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) { if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
// console.log(functionBranchTree, req.params.id); // console.log(functionBranchTree, req.params.id);
...@@ -529,10 +530,10 @@ async function speculative_deployment(req, runtime) { ...@@ -529,10 +530,10 @@ async function speculative_deployment(req, runtime) {
} }
} }
console.log(util.inspect(functionBranchTree, false, null, true /* enable colors */)); console.log("branches", util.inspect(functionBranchTree, false, null, true /* enable colors */));
if (functionBranchTree.has(req.params.id)) { if (functionBranchTree.has(req.params.id) && functionBranchTree.get(req.params.id).mle_path) {
let branchInfo = functionBranchTree.get(req.params.id) let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path); console.log("mle_path", branchInfo.mle_path);
...@@ -581,6 +582,8 @@ async function speculative_deployment(req, runtime) { ...@@ -581,6 +582,8 @@ async function speculative_deployment(req, runtime) {
} }
currentDelay = metrics[branchInfo.mle_path[0].id].container.starttime currentDelay = metrics[branchInfo.mle_path[0].id].container.starttime
let plan = {}
plan[req.params.id] = 0
for (let i = 1; i < deployDepth; i++) { for (let i = 1; i < deployDepth; i++) {
let parent = chainData[branchInfo.mle_path[i - 1].id] let parent = chainData[branchInfo.mle_path[i - 1].id]
let self = branchInfo.mle_path[i].id let self = branchInfo.mle_path[i].id
...@@ -589,9 +592,19 @@ async function speculative_deployment(req, runtime) { ...@@ -589,9 +592,19 @@ async function speculative_deployment(req, runtime) {
let invokeTime = currentDelay - metrics[self].container.starttime let invokeTime = currentDelay - metrics[self].container.starttime
invokeTime = (invokeTime < 0)? 0: invokeTime invokeTime = (invokeTime < 0)? 0: invokeTime
console.log(self, "current delay", currentDelay, "invoke time:", currentDelay - metrics[self].container.starttime); console.log(self, "current delay", currentDelay, "invoke time:", currentDelay - metrics[self].container.starttime);
setTimeout(chainHandler.notify, invokeTime, "container", self) plan[self] = invokeTime
setTimeout(chainHandler.notify, invokeTime, runtime, self)
} }
/**
* plan is ready, notify parent and start execution
*/
console.log("plan", plan);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ runtime, functionHash: req.params.id, plan })
}]
producer.send(payload, function () { })
}) })
} else { } else {
...@@ -608,7 +621,7 @@ async function speculative_deployment(req, runtime) { ...@@ -608,7 +621,7 @@ async function speculative_deployment(req, runtime) {
let payload = [{ let payload = [{
topic: constants.topics.hscale, topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node }) messages: JSON.stringify({ "runtime": runtime, "functionHash": node.node })
}] }]
producer.send(payload, function () { }) producer.send(payload, function () { })
db.set(node.node + runtime, []) db.set(node.node + runtime, [])
...@@ -617,7 +630,19 @@ async function speculative_deployment(req, runtime) { ...@@ -617,7 +630,19 @@ async function speculative_deployment(req, runtime) {
} }
} }
} }
} else {
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ runtime, functionHash: req.params.id })
}]
producer.send(payload, function () { })
} }
} else {
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ runtime, functionHash: req.params.id })
}]
producer.send(payload, function () { })
} }
} }
setInterval(libSupport.metrics.broadcastMetrics, 5000) setInterval(libSupport.metrics.broadcastMetrics, 5000)
......
...@@ -107,7 +107,7 @@ async function reverseProxy(req, res) { ...@@ -107,7 +107,7 @@ async function reverseProxy(req, res) {
let functionHash = req.params.id let functionHash = req.params.id
let functionData = functionBranchTree.get(functionHash) let functionData = functionBranchTree.get(functionHash)
if (functionData && functionData.req_count % 5 == 0) { if (functionData && functionData.req_count % 1 == 0) {
if (functionData.parent) if (functionData.parent)
viterbi(functionHash, functionData) viterbi(functionHash, functionData)
else { else {
...@@ -335,7 +335,7 @@ async function viterbi(node, metadata) { ...@@ -335,7 +335,7 @@ async function viterbi(node, metadata) {
let head = await fetch(implicitChainDB + node, { let head = await fetch(implicitChainDB + node, {
method: "head" method: "head"
}) })
if (head.headers.get("etag"))
metadata._rev = head.headers.get("etag").substring(1, head.headers.get("etag").length - 1) metadata._rev = head.headers.get("etag").substring(1, head.headers.get("etag").length - 1)
let payload = { let payload = {
......
...@@ -17,27 +17,67 @@ let kafka = require('kafka-node'), ...@@ -17,27 +17,67 @@ let kafka = require('kafka-node'),
{ autoCommit: true } { autoCommit: true }
]) ])
function getAddress() { /**
* speculative resource allocation plan for workflows
*/
let spec_plan = {}
function getAddress(plan, functionHash) {
let time = Date.now()
let tolerance = 2 * 1000
/**
* adding a tolerance of 2s
*/
if (spec_plan[functionHash]) {
console.log(spec_plan[functionHash]);
let timeline = spec_plan[functionHash].timeline
for (const specs of timeline) {
console.log(time - specs, time, specs);
if (time > specs - tolerance && time < specs + tolerance) {
console.log(spec_plan, "found");
return spec_plan[functionHash].address
} else if (time < specs - 2 || time > specs + 2) {
delete specs
}
}
}
let addresslist = Object.keys(workerNodes)
// console.log(workerNodes, addresslist, addresslist.length, Math.floor(Math.random() * addresslist.length));
let address = addresslist[Math.floor(Math.random() * addresslist.length)]
if (plan) {
for (const node in plan) {
if (plan.hasOwnProperty(node)) {
if (!spec_plan[node])
spec_plan[node] = {
address,
timeline: []
}
spec_plan[node].timeline.push(plan[node] + time)
}
}
console.log("updated spec_plan", spec_plan);
}
// console.log("address", address, spec_plan);
return address;
} }
consumer.on('message', function (message) { consumer.on('message', function (message) {
let topic = message.topic let topic = message.topic
message = message.value message = message.value
if (topic !== "heartbeat") // console.log(message);
console.log(message);
if (topic === "heartbeat") { if (topic === "heartbeat") {
message = JSON.parse(message) message = JSON.parse(message)
if (Date.now() - message.timestamp < 1000)
if (!workerNodes[message.address]) { if ((Date.now() - message.timestamp < 1000) && !workerNodes[message.address]) {
workerNodes[message.address] = message workerNodes[message.address] = message
console.log("New worker discovered. Worker List: ") console.log("New worker discovered. Worker List: ")
console.log(workerNodes); console.log(workerNodes);
// console.log(Object.keys(workerNodes));
} }
} else if (topic === "request") { } else if (topic === "request") {
message = JSON.parse(message) message = JSON.parse(message)
console.log(message); console.log("request", message);
let payload = [{ let payload = [{
topic: "RESPONSE_RM_2_DM_DUMMY", topic: "RESPONSE_RM_2_DM_DUMMY",
...@@ -45,7 +85,7 @@ consumer.on('message', function (message) { ...@@ -45,7 +85,7 @@ consumer.on('message', function (message) {
"resource_id": message.resource_id, "resource_id": message.resource_id,
"timestamp": Date.now(), "timestamp": Date.now(),
"nodes": [ "nodes": [
{ node_id: getAddress(), port: null }] { node_id: getAddress(message.plan, message.functionHash), port: null }]
}), }),
partition: 0 partition: 0
}] }]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment