Commit 283b7f36 authored by Nilanjan Daw's avatar Nilanjan Daw

Improved loadbalancer. Ref Issue #14

Pushed loadbalancer to reverseproxy. This transfer of responsibility simplifies the dispatch daemon.
Also changed loadbalancing logic from number of received request to number of open requests
parent ea07e4bc
{"id":"192.168.31.51","master_node":"10.129.6.5"} {"id":"192.168.1.103","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
...@@ -202,18 +202,7 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -202,18 +202,7 @@ app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime let runtime = req.body.runtime
let id = req.params.id + runtime let id = req.params.id + runtime
if (functionToResource.has(id)) { if (functionToResource.has(id)) {
/** libSupport.reverseProxy(req, res, functionToResource, resourceMap)
* Bypass deployment pipeline if resource available
*/
let forwardTo = heap.pop(functionToResource.get(id), libSupport.compare)
let resource = resourceMap.get(forwardTo.resource_id)
logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
"\n forwarding via reverse proxy to: " + JSON.stringify(resource));
libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`)
forwardTo.metric += 1
functionToResource.get(id).push(forwardTo)
console.log(functionToResource.get(id));
} else { } else {
/** /**
* FIXME: Here, every request even for the same function will be queued up potentially launching multiple * FIXME: Here, every request even for the same function will be queued up potentially launching multiple
...@@ -230,7 +219,7 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -230,7 +219,7 @@ app.post('/serverless/execute/:id', (req, res) => {
}) })
/** /**
* Send dispatch signal and deploy resources after consultation with the RM * Send dispatch signal to Worker nodes and deploy resources after consultation with the RM
*/ */
function dispatch() { function dispatch() {
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length) let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
...@@ -368,8 +357,7 @@ consumer.on('message', function (message) { ...@@ -368,8 +357,7 @@ consumer.on('message', function (message) {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`) logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
}) })
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource)); logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
libSupport.reverseProxy(req, res, libSupport.reverseProxy(req, res, functionToResource, resourceMap)
`http://${resource.node_id}:${resource.port}/serverless/function/execute`)
.then(() => { .then(() => {
db.delete(message.resource_id) db.delete(message.resource_id)
}) })
......
const crypto = require('crypto'); const crypto = require('crypto');
const fs = require('fs') const fs = require('fs')
const rp = require('request-promise'); const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston') const winston = require('winston')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap')
/** /**
* Generates unique IDs of arbitrary length * Generates unique IDs of arbitrary length
...@@ -20,13 +21,13 @@ function makeid(length) { ...@@ -20,13 +21,13 @@ function makeid(length) {
} }
/** /**
* generates the runtime executor after inserting the received function * generates the runtime executor after inserting the received function
* TODO: make this asynchronous * TODO: make this asynchronous
* @param {string Path from where to extract the function} functionPath * @param {string Path from where to extract the function} functionPath
* @param {string Function Hash value} functionHash * @param {string Function Hash value} functionHash
*/ */
function generateExecutor(functionPath, functionHash) { function generateExecutor(functionPath, functionHash) {
input = fs.readFileSync('./repository/worker_env/env.js') input = fs.readFileSync('./repository/worker_env/env.js')
functionFile = fs.readFileSync(functionPath + functionHash) functionFile = fs.readFileSync(functionPath + functionHash)
searchSize = "(resolve, reject) => {".length searchSize = "(resolve, reject) => {".length
...@@ -42,36 +43,55 @@ function makeid(length) { ...@@ -42,36 +43,55 @@ function makeid(length) {
return hash return hash
} }
function reverseProxy(req, res, url, tryout) { function reverseProxy(req, res, functionToResource, resourceMap) {
return new Promise((resolve, reject) => {
logger.info("Request received at reverseproxy. Forwarding to: " + url);
var options = {
method: 'POST',
uri: url,
body: req.body,
json: true // Automatically stringifies the body to JSON
};
rp(options) return new Promise((resolve, reject) => {
.then(function (parsedBody) { let runtime = req.body.runtime
// console.log("parsed body:", parsedBody); let id = req.params.id + runtime
res.json(parsedBody) /**
resolve() * Bypass deployment pipeline if resource available
}) */
.catch(function (err) { let functionHeap = functionToResource.get(id)
if (err.error.errno === "ECONNREFUSED") { let forwardTo = functionHeap[0]
reverseProxy(req, res, url, (tryout != null) ? tryout + 1 : 1) let resource = resourceMap.get(forwardTo.resource_id)
} else { logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
logger.error("error", err.error.errno); "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
res.json(err.message).status(err.statusCode) let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
resolve()
} logger.info("Request received at reverseproxy. Forwarding to: " + url);
}); forwardTo.metric += 1
}) heap.heapify(functionHeap, compare)
} console.log(functionHeap);
var options = {
method: 'POST',
uri: url,
body: req.body,
json: true // Automatically stringifies the body to JSON
};
rp(options)
.then(function (parsedBody) {
res.json(parsedBody)
forwardTo.metric -= 1
heap.heapify(functionHeap, compare)
console.log(functionHeap);
resolve()
})
.catch(function (err) {
forwardTo.metric -= 1
heap.heapify(functionHeap, compare)
console.log(functionHeap);
logger.error("error", err.error.errno);
res.json(err.message).status(err.statusCode)
resolve()
});
})
}
function getPort(usedPort) { function getPort(usedPort) {
let port = -1, ctr = 0 let port = -1, ctr = 0
do { do {
min = Math.ceil(30000); min = Math.ceil(30000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment