Commit 7b20abb7 authored by Nilanjan Daw's avatar Nilanjan Daw

Request grouping added. Closes #18

To prevent requests coming in for the same function from starting too many workers, they are grouped together and one worker is started per group.
parent b6297c5f
{"id":"192.168.1.103","master_node":"10.129.6.5"} {"id":"192.168.31.51","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
...@@ -205,9 +205,14 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -205,9 +205,14 @@ app.post('/serverless/execute/:id', (req, res) => {
libSupport.reverseProxy(req, res, functionToResource, resourceMap) libSupport.reverseProxy(req, res, functionToResource, resourceMap)
} else { } else {
/** /**
* FIXME: Here, every request even for the same function will be queued up potentially launching multiple * Requests are queued up before being dispatched. To prevent requests coming in for the
* resource of the same type * same function from starting too many workers, they are grouped together
* and one worker is started per group.
*/ */
if (db.has(req.params.id + runtime)) {
db.get(req.params.id + runtime).push({ req, res })
return;
}
requestQueue.push({ req, res }) requestQueue.push({ req, res })
/** /**
* We store functions for function placement heuristics purposes. This lets us look into the function * We store functions for function placement heuristics purposes. This lets us look into the function
...@@ -225,41 +230,46 @@ function dispatch() { ...@@ -225,41 +230,46 @@ function dispatch() {
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length) let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
for (let i = 0; i < lookbackWindow; i++) { for (let i = 0; i < lookbackWindow; i++) {
let {req, res} = requestQueue.shift() let {req, res} = requestQueue.shift()
// logger.info(req.body) // logger.info(req.body)
let runtime = req.body.runtime let runtime = req.body.runtime
let functionHash = req.params.id let functionHash = req.params.id
if (!db.has(functionHash + runtime)) {
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID db.set(functionHash + runtime, [])
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`); let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM
let payload = [{ let payload = [{
topic: node_id, topic: node_id,
messages: JSON.stringify({ messages: JSON.stringify({
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker "type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id,
runtime, functionHash,
port
}),
partition: 0
}]
logger.info("Requesting RM " + JSON.stringify({
resource_id, resource_id,
runtime, functionHash, "memory": 332,
port }))
}),
partition: 0 /** uncomment when RM is unavailable */
}]
logger.info("Requesting RM " + JSON.stringify({ resourceMap.set(resource_id, {
resource_id, runtime, functionHash, port, node_id
"memory": 332, })
})) logger.info(resourceMap);
producer.send(payload, () => {
/** uncomment when RM is unavailable */ logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
resourceMap.set(resource_id, { } else {
runtime, functionHash, port, node_id logger.info("deployment process already started waiting")
}) db.get(functionHash + runtime).push({ req, res })
logger.info(resourceMap); }
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
db.set(resource_id, { req, res })
...@@ -308,9 +318,10 @@ consumer.on('message', function (message) { ...@@ -308,9 +318,10 @@ consumer.on('message', function (message) {
} }
logger.info("Deployed Resource: " + JSON.stringify(message)); logger.info("Deployed Resource: " + JSON.stringify(message));
if (db.has(message.resource_id)) { if (db.has(message.functionHash + message.runtime)) {
let { req, res } = db.get(message.resource_id) let sendQueue = db.get(message.functionHash + message.runtime)
if (message.status == false) { if (message.status == false) {
// TODO: handle failure
res.status(400).json({reason: message.reason}) res.status(400).json({reason: message.reason})
return; return;
} }
...@@ -357,10 +368,14 @@ consumer.on('message', function (message) { ...@@ -357,10 +368,14 @@ consumer.on('message', function (message) {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`) logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
}) })
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource)); logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
libSupport.reverseProxy(req, res, functionToResource, resourceMap) while (sendQueue.length != 0) {
.then(() => { let {req, res} = sendQueue.shift()
db.delete(message.resource_id) libSupport.reverseProxy(req, res, functionToResource, resourceMap)
}) .then(() => {
})
}
db.delete(message.functionHash + message.runtime)
} }
} else if (topic == "removeWorker") { } else if (topic == "removeWorker") {
......
...@@ -61,7 +61,7 @@ function reverseProxy(req, res, functionToResource, resourceMap) { ...@@ -61,7 +61,7 @@ function reverseProxy(req, res, functionToResource, resourceMap) {
logger.info("Request received at reverseproxy. Forwarding to: " + url); logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.metric += 1 forwardTo.metric += 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
console.log(functionHeap); logger.info(functionHeap);
var options = { var options = {
method: 'POST', method: 'POST',
...@@ -69,6 +69,9 @@ function reverseProxy(req, res, functionToResource, resourceMap) { ...@@ -69,6 +69,9 @@ function reverseProxy(req, res, functionToResource, resourceMap) {
body: req.body, body: req.body,
json: true // Automatically stringifies the body to JSON json: true // Automatically stringifies the body to JSON
}; };
// console.log(options);
rp(options) rp(options)
.then(function (parsedBody) { .then(function (parsedBody) {
...@@ -83,7 +86,7 @@ function reverseProxy(req, res, functionToResource, resourceMap) { ...@@ -83,7 +86,7 @@ function reverseProxy(req, res, functionToResource, resourceMap) {
forwardTo.metric -= 1 forwardTo.metric -= 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
console.log(functionHeap); console.log(functionHeap);
logger.error("error", err.error.errno); logger.error("error" + err.error.errno);
res.json(err.message).status(err.statusCode) res.json(err.message).status(err.statusCode)
resolve() resolve()
}); });
......
...@@ -45,7 +45,7 @@ app.listen(port, () => { ...@@ -45,7 +45,7 @@ app.listen(port, () => {
function shouldDie() { function shouldDie() {
if (Date.now() - lastRequest > 5 * 1000) { if (Date.now() - lastRequest > 30 * 1000) {
console.log("Idle for too long. Exiting"); console.log("Idle for too long. Exiting");
producer.send( producer.send(
[{ [{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment