Commit f079dce2 authored by NILANJAN DAW's avatar NILANJAN DAW

tested workflow scheduler on a 4 node system

-- fixed multiple bugs
-- tested rr and random schedulers
parent 26078f36
{ {
"registry_url": "10.129.6.5:5000/", "registry_url": "10.129.6.5:5000/",
"master_port": 8080, "master_port": 8080,
"master_address": "localhost", "master_address": "10.129.6.5",
"rm_port": 8081, "rm_port": 8081,
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984", "couchdb_host": "10.129.6.5:5984",
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
"test": "test" "test": "test"
}, },
"resource_scheduler": { "resource_scheduler": {
"select": "random", "select": "workflow",
"options": { "options": {
"rr": "rr", "random": "random", "jsq": "jsq", "workflow": "workflow" "rr": "rr", "random": "random", "jsq": "jsq", "workflow": "workflow"
} }
...@@ -49,4 +49,4 @@ ...@@ -49,4 +49,4 @@
"JIT_deployment": true, "JIT_deployment": true,
"aggressivity": 1, "aggressivity": 1,
"id_size": 20 "id_size": 20
} }
\ No newline at end of file
...@@ -309,7 +309,7 @@ async function orchestrator(chain_id, res, payload, map, aliases, result, path, ...@@ -309,7 +309,7 @@ async function orchestrator(chain_id, res, payload, map, aliases, result, path,
sharedMeta.client.sadd(request_id, aliases[functionName].alias) sharedMeta.client.sadd(request_id, aliases[functionName].alias)
libSupport.fetchData(url, data) libSupport.fetchData(url, data)
.then(json => { .then(json => {
// console.log(json);
result[functionName] = json result[functionName] = json
sharedMeta.client.srem(request_id, aliases[functionName].alias) sharedMeta.client.srem(request_id, aliases[functionName].alias)
aliases[functionName].status = "done" aliases[functionName].status = "done"
...@@ -448,13 +448,21 @@ async function speculative_deployment(chain_id, aliases, chainData, request_id) ...@@ -448,13 +448,21 @@ async function speculative_deployment(chain_id, aliases, chainData, request_id)
console.log("delay map", delayMap); console.log("delay map", delayMap);
console.log("notifcation plan", plan); console.log("notifcation plan", plan);
} }
await fetch(`http://${constants.master_address}:${constants.rm_port}`, {
method: 'post', /**
body: JSON.stringify({ * If the scheduler is workflow aware send the workflow plan to RM
plan */
}), if (constants.resource_scheduler.select === constants.resource_scheduler.options.workflow) {
headers: { 'Content-Type': 'application/json' }, await fetch(`http://${constants.master_address}:${constants.rm_port}/resource/plan`, {
}) method: 'post',
body: JSON.stringify({
plan
}),
headers: { 'Content-Type': 'application/json' },
})
console.log("Spec plan sent")
}
let counter = 0, maxCount = plan.length * constants.aggressivity let counter = 0, maxCount = plan.length * constants.aggressivity
for (const node of plan) { for (const node of plan) {
if (counter > maxCount) if (counter > maxCount)
......
...@@ -57,8 +57,8 @@ let kafka = require('kafka-node'), ...@@ -57,8 +57,8 @@ let kafka = require('kafka-node'),
app.use(morgan('combined', { app.use(morgan('combined', {
skip: function (req, res) { return res.statusCode < 400 } skip: function (req, res) { return res.statusCode < 400 }
})) }))
app.use(express.json()); app.use(express.json({ limit: '2gb' }));
app.use(express.urlencoded({ extended: true })); app.use(express.urlencoded({ extended: true, limit: '2gb' }));
const file_path = __dirname + "/repository/" const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path)); // file server hosting deployed functions app.use('/repository', express.static(file_path)); // file server hosting deployed functions
app.use(fileUpload()) app.use(fileUpload())
......
...@@ -19,8 +19,10 @@ ...@@ -19,8 +19,10 @@
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"nanoid": "^3.1.12",
"node-fetch": "^2.6.0", "node-fetch": "^2.6.0",
"node-interval-tree": "^1.3.3", "node-interval-tree": "^1.3.3",
"nodemon": "^2.0.4",
"prom-client": "^12.0.0", "prom-client": "^12.0.0",
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.88.0", "request": "^2.88.0",
......
...@@ -23,8 +23,8 @@ let kafka = require('kafka-node'), ...@@ -23,8 +23,8 @@ let kafka = require('kafka-node'),
}), }),
producer = new Producer(client) producer = new Producer(client)
app.use(express.json()); app.use(express.json({ limit: '2gb' }));
app.use(express.urlencoded({ extended: true })); app.use(express.urlencoded({ extended: true, limit: '2gb' }));
let lastRequest = Date.now(), totalRequest = 0 let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => { app.post('/serverless/function/execute/', (req, res) => {
......
...@@ -54,12 +54,12 @@ function workflow(plan, functionHash) { ...@@ -54,12 +54,12 @@ function workflow(plan, functionHash) {
* adding a tolerance of 2s * adding a tolerance of 2s
*/ */
if (spec_plan[functionHash]) { if (spec_plan[functionHash]) {
console.log(spec_plan[functionHash]); // console.log(spec_plan[functionHash]);
let timeline = spec_plan[functionHash].timeline let timeline = spec_plan[functionHash].timeline
for (const specs of timeline) { for (const specs of timeline) {
console.log(time - specs, time, specs); // console.log(time - specs, time, specs);
if (time > specs - tolerance && time < specs + tolerance) { if (time > specs - tolerance && time < specs + tolerance) {
console.log(spec_plan, "found"); console.log("found");
return spec_plan[functionHash].address return spec_plan[functionHash].address
} else if (time < specs - 2 || time > specs + 2) { } else if (time < specs - 2 || time > specs + 2) {
delete specs delete specs
...@@ -105,16 +105,19 @@ app.post('/resource/plan', (req, res) => { ...@@ -105,16 +105,19 @@ app.post('/resource/plan', (req, res) => {
let addresslist = Object.keys(workerNodes) let addresslist = Object.keys(workerNodes)
// console.log(workerNodes, addresslist, addresslist.length, Math.floor(Math.random() * addresslist.length)); // console.log(workerNodes, addresslist, addresslist.length, Math.floor(Math.random() * addresslist.length));
let address = addresslist[Math.floor(Math.random() * addresslist.length)] let address = addresslist[Math.floor(Math.random() * addresslist.length)]
let plan = req.plan let plan = req.body.plan
const time = Date.now()
console.log(plan)
if (plan) { if (plan) {
for (const node in plan) { for (const node in plan) {
if (plan.hasOwnProperty(node)) { if (plan.hasOwnProperty(node)) {
if (!spec_plan[node]) let functionHash = plan[node].id
spec_plan[node] = { if (!spec_plan[functionHash])
spec_plan[functionHash] = {
address, address,
timeline: [] timeline: []
} }
spec_plan[node].timeline.push(plan[node] + time) spec_plan[functionHash].timeline.push(plan[node].invokeTime + time)
} }
} }
console.log("updated spec_plan", spec_plan); console.log("updated spec_plan", spec_plan);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment