Commit 26078f36 authored by NILANJAN DAW's avatar NILANJAN DAW

Added rr/random schedulers to RM

parent 63be6dbb
...@@ -2,8 +2,10 @@ ...@@ -2,8 +2,10 @@
"registry_url": "10.129.6.5:5000/", "registry_url": "10.129.6.5:5000/",
"master_port": 8080, "master_port": 8080,
"master_address": "localhost", "master_address": "localhost",
"rm_port": 8081,
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984", "couchdb_host": "10.129.6.5:5984",
"redis_host": "redis://10.129.6.5:6379",
"env": "env.js", "env": "env.js",
"db": { "db": {
"function_meta": "serverless", "function_meta": "serverless",
...@@ -31,6 +33,12 @@ ...@@ -31,6 +33,12 @@
"log_channel": "LOG_COMMON", "log_channel": "LOG_COMMON",
"test": "test" "test": "test"
}, },
"resource_scheduler": {
"select": "random",
"options": {
"rr": "rr", "random": "random", "jsq": "jsq", "workflow": "workflow"
}
},
"autoscalar_metrics": { "autoscalar_metrics": {
"open_request_threshold": 100 "open_request_threshold": 100
}, },
......
...@@ -9,6 +9,7 @@ const constants = require('../constants.json') ...@@ -9,6 +9,7 @@ const constants = require('../constants.json')
const operator = require('./operator') const operator = require('./operator')
const sharedMeta = require('./shared_meta') const sharedMeta = require('./shared_meta')
const util = require('util') const util = require('util')
const nanoid = require('nanoid')
const logger = libSupport.logger const logger = libSupport.logger
...@@ -192,9 +193,12 @@ async function deployContainer(path, imageName) { ...@@ -192,9 +193,12 @@ async function deployContainer(path, imageName) {
}) })
} }
router.post('/execute/:id', (req, res) => { router.post('/execute/:id', async (req, res) => {
let map, aliases let map, aliases
let chain_id = req.params.id let chain_id = req.params.id
let request_id = nanoid.nanoid()
console.log("Request ID", request_id);
libSupport.fetchData(explicitChainDB + chain_id) libSupport.fetchData(explicitChainDB + chain_id)
.then(chainData => { .then(chainData => {
console.log(chainData); console.log(chainData);
...@@ -211,12 +215,12 @@ router.post('/execute/:id', (req, res) => { ...@@ -211,12 +215,12 @@ router.post('/execute/:id', (req, res) => {
if (req.files && req.files.map) { if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString()); map = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${chain_id}.json`, true) readMap(`./repository/aliases${chain_id}.json`, true)
.then(data => { .then(async data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
if (chainData.error != "not_found") if (chainData.error != "not_found")
speculative_deployment(chain_id, aliases, chainData); await speculative_deployment(chain_id, aliases, chainData, request_id);
orchestrator(chain_id, res, payload, map, aliases, {}, path) orchestrator(chain_id, res, payload, map, aliases, {}, path. request_id);
}) })
} else { } else {
readMap(`./repository/map${chain_id}.json`) readMap(`./repository/map${chain_id}.json`)
...@@ -224,12 +228,12 @@ router.post('/execute/:id', (req, res) => { ...@@ -224,12 +228,12 @@ router.post('/execute/:id', (req, res) => {
map = data map = data
readMap(`./repository/aliases${chain_id}.json`, true) readMap(`./repository/aliases${chain_id}.json`, true)
.then(data => { .then(async data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
if (chainData.error != "not_found") if (chainData.error != "not_found")
speculative_deployment(chain_id, aliases, chainData); await speculative_deployment(chain_id, aliases, chainData, request_id);
orchestrator(chain_id, res, payload, map, aliases, {}, path) orchestrator(chain_id, res, payload, map, aliases, {}, path, request_id);
}) })
}) })
} }
...@@ -246,7 +250,7 @@ router.post('/execute/:id', (req, res) => { ...@@ -246,7 +250,7 @@ router.post('/execute/:id', (req, res) => {
* @param {JSON} aliases internal alias to function chain mapping * @param {JSON} aliases internal alias to function chain mapping
* @param {JSON} result result obtained after chain executes * @param {JSON} result result obtained after chain executes
*/ */
async function orchestrator(chain_id, res, payload, map, aliases, result, path) { async function orchestrator(chain_id, res, payload, map, aliases, result, path, request_id) {
/** /**
* Adding dependencies on MLE path to a map * Adding dependencies on MLE path to a map
...@@ -295,13 +299,21 @@ async function orchestrator(chain_id, res, payload, map, aliases, result, path) ...@@ -295,13 +299,21 @@ async function orchestrator(chain_id, res, payload, map, aliases, result, path)
path.path[path.level] = [] path.path[path.level] = []
} }
path.path[path.level].push({functionName, type: "function", runtime: metadata.runtime}) path.path[path.level].push({functionName, type: "function", runtime: metadata.runtime, id: aliases[functionName].alias})
/**
* The function node is cleared to run.
* Setting status in Redis.
* Dispatching function
*/
sharedMeta.client.sadd(request_id, aliases[functionName].alias)
libSupport.fetchData(url, data) libSupport.fetchData(url, data)
.then(json => { .then(json => {
// console.log(json); // console.log(json);
result[functionName] = json result[functionName] = json
sharedMeta.client.srem(request_id, aliases[functionName].alias)
aliases[functionName].status = "done" aliases[functionName].status = "done"
let branchMap = null, flag = false let branchMap = null, flag = false
for (const [_key, metadata] of Object.entries(map)) { for (const [_key, metadata] of Object.entries(map)) {
...@@ -349,7 +361,7 @@ async function orchestrator(chain_id, res, payload, map, aliases, result, path) ...@@ -349,7 +361,7 @@ async function orchestrator(chain_id, res, payload, map, aliases, result, path)
} }
if (flag) if (flag)
path.level++ path.level++
orchestrator(chain_id, res, payload, (branchMap == null)? map: branchMap, aliases, result, path) orchestrator(chain_id, res, payload, (branchMap == null)? map: branchMap, aliases, result, path, request_id)
}) })
} }
} }
...@@ -382,7 +394,7 @@ function checkCondition(op1, op2, op, result) { ...@@ -382,7 +394,7 @@ function checkCondition(op1, op2, op, result) {
return (operator[op](data, op2))? "success": "fail" return (operator[op](data, op2))? "success": "fail"
} }
async function speculative_deployment(chain_id, aliases, chainData) { async function speculative_deployment(chain_id, aliases, chainData, request_id) {
// console.log("chainData", util.inspect(chainData, false, null, true /* enable colors */)); // console.log("chainData", util.inspect(chainData, false, null, true /* enable colors */));
let plan = [] let plan = []
let path = chainData.path.path let path = chainData.path.path
...@@ -436,6 +448,13 @@ async function speculative_deployment(chain_id, aliases, chainData) { ...@@ -436,6 +448,13 @@ async function speculative_deployment(chain_id, aliases, chainData) {
console.log("delay map", delayMap); console.log("delay map", delayMap);
console.log("notifcation plan", plan); console.log("notifcation plan", plan);
} }
await fetch(`http://${constants.master_address}:${constants.rm_port}`, {
method: 'post',
body: JSON.stringify({
plan
}),
headers: { 'Content-Type': 'application/json' },
})
let counter = 0, maxCount = plan.length * constants.aggressivity let counter = 0, maxCount = plan.length * constants.aggressivity
for (const node of plan) { for (const node of plan) {
if (counter > maxCount) if (counter > maxCount)
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"node-fetch": "^2.6.0", "node-fetch": "^2.6.0",
"node-interval-tree": "^1.3.3",
"prom-client": "^12.0.0", "prom-client": "^12.0.0",
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.88.0", "request": "^2.88.0",
......
...@@ -4,7 +4,7 @@ let request = require('request') ...@@ -4,7 +4,7 @@ let request = require('request')
const process = require('process') const process = require('process')
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 60, flagFirstRequest = true let port = 5000, resource_id, functionHash, runtime, idleTime = 120, flagFirstRequest = true
let waitTime let waitTime
resource_id = process.argv[2] resource_id = process.argv[2]
......
const express = require('express');
const interval_tree = require('node-interval-tree');
const constants = require('../constants.json');
const morgan = require('morgan');
let port = constants.rm_port
let workerNodes = {}, timeline = {} let workerNodes = {}, timeline = {}
const constants = require('../constants.json') let addresslist = []
let rr_ptr = 0;
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -17,12 +24,30 @@ let kafka = require('kafka-node'), ...@@ -17,12 +24,30 @@ let kafka = require('kafka-node'),
{ autoCommit: true } { autoCommit: true }
]) ])
const app = express()
app.use(morgan('combined', {
skip: function (req, res) { return res.statusCode < 400 }
}))
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
/** /**
* speculative resource allocation plan for workflows * speculative resource allocation plan for workflows
*/ */
let spec_plan = {} let spec_plan = {}
function getAddress(plan, functionHash) { function rr() {
return addresslist[(rr_ptr++) % addresslist.length]
}
function random() {
return addresslist[Math.floor(Math.random() * addresslist.length)]
}
function workflow(plan, functionHash) {
let time = Date.now() let time = Date.now()
let tolerance = 2 * 1000 let tolerance = 2 * 1000
/** /**
...@@ -41,9 +66,10 @@ function getAddress(plan, functionHash) { ...@@ -41,9 +66,10 @@ function getAddress(plan, functionHash) {
} }
} }
} }
let addresslist = Object.keys(workerNodes)
// console.log(workerNodes, addresslist, addresslist.length, Math.floor(Math.random() * addresslist.length)); // console.log(workerNodes, addresslist, addresslist.length, Math.floor(Math.random() * addresslist.length));
let address = addresslist[Math.floor(Math.random() * addresslist.length)] let address = addresslist[Math.floor(Math.random() * addresslist.length)]
if (plan) { if (plan) {
for (const node in plan) { for (const node in plan) {
if (plan.hasOwnProperty(node)) { if (plan.hasOwnProperty(node)) {
...@@ -62,6 +88,40 @@ function getAddress(plan, functionHash) { ...@@ -62,6 +88,40 @@ function getAddress(plan, functionHash) {
return address; return address;
} }
function getAddress(plan, functionHash) {
let address = ""
if (constants.resource_scheduler.select === constants.resource_scheduler.options.rr)
address = rr()
else if (constants.resource_scheduler.select === constants.resource_scheduler.options.random)
address = random()
else if (constants.resource_scheduler.select === constants.resource_scheduler.options.workflow)
address = workflow(plan, functionHash)
console.log(functionHash, address);
return address
}
app.post('/resource/plan', (req, res) => {
let addresslist = Object.keys(workerNodes)
// console.log(workerNodes, addresslist, addresslist.length, Math.floor(Math.random() * addresslist.length));
let address = addresslist[Math.floor(Math.random() * addresslist.length)]
let plan = req.plan
if (plan) {
for (const node in plan) {
if (plan.hasOwnProperty(node)) {
if (!spec_plan[node])
spec_plan[node] = {
address,
timeline: []
}
spec_plan[node].timeline.push(plan[node] + time)
}
}
console.log("updated spec_plan", spec_plan);
}
res.sendStatus(200)
})
consumer.on('message', function (message) { consumer.on('message', function (message) {
let topic = message.topic let topic = message.topic
message = message.value message = message.value
...@@ -71,6 +131,7 @@ consumer.on('message', function (message) { ...@@ -71,6 +131,7 @@ consumer.on('message', function (message) {
if ((Date.now() - message.timestamp < 1000) && !workerNodes[message.address]) { if ((Date.now() - message.timestamp < 1000) && !workerNodes[message.address]) {
workerNodes[message.address] = message workerNodes[message.address] = message
addresslist = Object.keys(workerNodes)
console.log("New worker discovered. Worker List: ") console.log("New worker discovered. Worker List: ")
console.log(workerNodes); console.log(workerNodes);
// console.log(Object.keys(workerNodes)); // console.log(Object.keys(workerNodes));
...@@ -94,3 +155,5 @@ consumer.on('message', function (message) { ...@@ -94,3 +155,5 @@ consumer.on('message', function (message) {
}) })
} }
}) })
app.listen(port, () => console.log(`Server listening on port ${port}!`))
\ No newline at end of file
const secrets = require('./secrets.json') const secrets = require('./secrets.json')
const constants = require('.././constants.json') const constants = require('.././constants.json')
const redis = require('redis')
const client = redis.createClient(constants.redis_host);
let db = new Map(), // queue holding request to be dispatched let db = new Map(), // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
...@@ -27,6 +29,6 @@ explicitChainDB = explicitChainDB + "/" + constants.db.explicit_chain_meta + "/" ...@@ -27,6 +29,6 @@ explicitChainDB = explicitChainDB + "/" + constants.db.explicit_chain_meta + "/"
module.exports = { module.exports = {
db, functionBranchTree, functionToResource, workerNodes, resourceMap, db, functionBranchTree, functionToResource, workerNodes, resourceMap,
conditionProbabilityExplicit, conditionProbabilityExplicit, metadataDB, metricsDB, implicitChainDB,
metadataDB, metricsDB, implicitChainDB, explicitChainDB explicitChainDB, client, redis
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment