Commit 7d2d4170 authored by Nilanjan Daw's avatar Nilanjan Daw

Added Loadbalancer based on request count. Ref #14 issue

Added a min-heap based Loadbalancer to balance based on request count.
parent e5c9d0a6
...@@ -3,5 +3,6 @@ ...@@ -3,5 +3,6 @@
"master_port": 8080, "master_port": 8080,
"master_address": "localhost", "master_address": "localhost",
"kafka_host": "10.129.6.5:9092", "kafka_host": "10.129.6.5:9092",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON" "log_channel": "LOG_COMMON"
} }
\ No newline at end of file
...@@ -56,7 +56,7 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -56,7 +56,7 @@ libSupport.makeTopic(node_id).then(() => {
/** /**
* download and start grunt * download and start grunt
*/ */
libSupport.download(host_url + '/repository/grunt', "grunt").then(() => { libSupport.download(constants.grunt_host, "grunt").then(() => {
logger.info("Downloaded grunt binary from repository") logger.info("Downloaded grunt binary from repository")
fs.chmod('grunt', 0o555, (err) => { fs.chmod('grunt', 0o555, (err) => {
logger.info("grunt made executable. Starting grunt") logger.info("grunt made executable. Starting grunt")
......
...@@ -7,6 +7,7 @@ const constants = require('.././constants.json') ...@@ -7,6 +7,7 @@ const constants = require('.././constants.json')
const fs = require('fs') const fs = require('fs')
const { spawn } = require('child_process'); const { spawn } = require('child_process');
const morgan = require('morgan') const morgan = require('morgan')
const heap = require('heap')
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
...@@ -14,11 +15,11 @@ const logger = libSupport.logger ...@@ -14,11 +15,11 @@ const logger = libSupport.logger
let date = new Date(); let date = new Date();
let log_channel = constants.log_channel let log_channel = constants.log_channel
let functionToResource = new Map(), // TODO: make the resource a list for horizontal scale out let usedPort = new Map(), // TODO: remove after integration with RM
usedPort = new Map(), // TODO: remove after integration with RM
rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation
db = new Map(), db = new Map(), // queue holding request to be dispatched
resourceMap = new Map() resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map()
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -163,15 +164,19 @@ function deployContainer(path, imageName) { ...@@ -163,15 +164,19 @@ function deployContainer(path, imageName) {
*/ */
app.post('/serverless/execute/:id', (req, res) => { app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime let runtime = req.body.runtime
if (functionToResource.has(req.params.id + runtime)) { let id = req.params.id + runtime
if (functionToResource.has(id)) {
/** /**
* Bypass deployment pipeline if resource available * Bypass deployment pipeline if resource available
*/ */
let forwardTo = functionToResource.get(req.params.id + runtime) let forwardTo = heap.pop(functionToResource.get(id), libSupport.compare)
let resource = resourceMap.get(forwardTo.resource_id) let resource = resourceMap.get(forwardTo.resource_id)
logger.info("resource found " + JSON.stringify(forwardTo) + logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
" forwarding via reverse proxy to: " + JSON.stringify(resource)); "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`) libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`)
forwardTo.metric += 1
functionToResource.get(id).push(forwardTo)
console.log(functionToResource.get(id));
} else { } else {
/** /**
...@@ -281,9 +286,29 @@ consumer.on('message', function (message) { ...@@ -281,9 +286,29 @@ consumer.on('message', function (message) {
if (db.has(message.resource_id)) { if (db.has(message.resource_id)) {
let { req, res } = db.get(message.resource_id) let { req, res } = db.get(message.resource_id)
functionToResource.set(message.functionHash + message.runtime, { if (functionToResource.has(message.functionHash + message.runtime)) {
resource_id: message.resource_id let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
}) heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
}, libSupport.compare)
console.log("adding to list", functionToResource.get(message.functionHash + message.runtime));
} else {
/**
* function to resource map - holds a min heap of resources associated with a function
* the min heap is sorted based on a metric [TBD] like CPU usage, request count, mem usage etc
* TODO: decide on metric to use for sorting.
*/
let resourceHeap = []
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
}, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap)
console.log("creating new heap", functionToResource.get(message.functionHash + message.runtime));
}
let resource = resourceMap.get(message.resource_id) let resource = resourceMap.get(message.resource_id)
let confirmRM = [{ let confirmRM = [{
topic: log_channel, topic: log_channel,
...@@ -308,6 +333,7 @@ consumer.on('message', function (message) { ...@@ -308,6 +333,7 @@ consumer.on('message', function (message) {
db.delete(message.resource_id) db.delete(message.resource_id)
}) })
} }
} else if (topic == "removeWorker") { } else if (topic == "removeWorker") {
logger.info("Worker blown: Removing Metadata " + message); logger.info("Worker blown: Removing Metadata " + message);
try { try {
...@@ -316,10 +342,27 @@ consumer.on('message', function (message) { ...@@ -316,10 +342,27 @@ consumer.on('message', function (message) {
// process.exit(0) // process.exit(0)
} }
usedPort.delete(message.port) usedPort.delete(message.port)
let resource = functionToResource.get(message.functionHash + message.runtime) if (functionToResource.has(message.functionHash + message.runtime)) {
functionToResource.delete(message.functionHash + message.runtime) let resourceArray = functionToResource.get(message.functionHash + message.runtime)
if (resource != null) for (let i = 0; i < resourceArray.length; i++)
resourceMap.delete(resource.resource_id) if (resourceArray[i].resource_id === message.resource_id) {
console.log("splicing");
resourceArray.splice(i, 1);
break;
}
heap.heapify(resourceArray, libSupport.compare)
console.log(functionToResource.get(message.functionHash + message.runtime));
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
functionToResource.delete(message.functionHash + message.runtime)
console.log(resourceArray);
}
} else if (topic == "RESPONSE_RM_2_DM") { } else if (topic == "RESPONSE_RM_2_DM") {
logger.info("Response from RM: " + message); logger.info("Response from RM: " + message);
......
...@@ -112,6 +112,10 @@ const logger = winston.createLogger({ ...@@ -112,6 +112,10 @@ const logger = winston.createLogger({
] ]
}); });
function compare(a, b) {
return a.metric - b.metric
}
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger makeid, generateExecutor, reverseProxy, getPort, logger, compare
} }
\ No newline at end of file
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"express": "^4.17.1", "express": "^4.17.1",
"express-fileupload": "^1.1.6", "express-fileupload": "^1.1.6",
"heap": "^0.2.6",
"isolated-vm": "^3.0.0", "isolated-vm": "^3.0.0",
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
......
File added
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment