Commit d872ace4 authored by Nilanjan Daw's avatar Nilanjan Daw

Added Horizontal autoscalar. Ref Issue #2

Added a horizontal autoscalar to scale if number of outstanding requests exceed 100 (currently hardcoded) within a 1s time window.
parent 7b20abb7
......@@ -37,11 +37,11 @@ let kafka = require('kafka-node'),
Consumer = kafka.Consumer,
consumer = new Consumer(client,
[
{ topic: 'response', partition: 0, offset: 0},
{ topic: 'heartbeat' },
{ topic: "deployed" },
{ topic: "removeWorker" },
{ topic: "RESPONSE_RM_2_DM"}
{ topic: 'heartbeat' }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: "deployed" }, // receives deployment confirmation from workers
{ topic: "removeWorker" }, // received when a executor environment is blown at the worker
{ topic: "RESPONSE_RM_2_DM" }, // receives deployment details from RM
{ topic: "hscale" } // receives signals for horizontal scaling
],
[
{ autoCommit: true }
......@@ -236,6 +236,7 @@ function dispatch() {
let functionHash = req.params.id
if (!db.has(functionHash + runtime)) {
db.set(functionHash + runtime, [])
db.get(functionHash + runtime).push({ req, res })
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
......@@ -295,6 +296,78 @@ function getAddress() {
return workerNodes[Math.floor(Math.random() * workerNodes.length)];
}
function postDeploy(message) {
logger.info("Deployed Resource: " + JSON.stringify(message));
if (message.status == false) {
let sendQueue = db.get(message.functionHash + message.runtime)
// TODO: handle failure
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
res.status(400).json({ reason: message.reason })
}
db.delete(message.functionHash + message.runtime)
return;
}
if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
}, libSupport.compare)
logger.warn("Horizontally scaling up: " +
JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
} else {
/**
* function to resource map - holds a min heap of resources associated with a function
* the min heap is sorted based on a metric [TBD] like CPU usage, request count, mem usage etc
* TODO: decide on metric to use for sorting.
*/
let resourceHeap = []
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
}, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap)
logger.warn("Creating new resource pool"
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
}
let resource = resourceMap.get(message.resource_id)
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
"reason": "deployment",
"status": true,
"timestamp": date.toISOString()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
if (db.has(message.functionHash + message.runtime)) {
let sendQueue = db.get(message.functionHash + message.runtime)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap)
.then(() => {
})
}
db.delete(message.functionHash + message.runtime)
}
}
consumer.on('message', function (message) {
let topic = message.topic
......@@ -316,68 +389,8 @@ consumer.on('message', function (message) {
} catch (e) {
// process.exit(0)
}
logger.info("Deployed Resource: " + JSON.stringify(message));
postDeploy(message)
if (db.has(message.functionHash + message.runtime)) {
let sendQueue = db.get(message.functionHash + message.runtime)
if (message.status == false) {
// TODO: handle failure
res.status(400).json({reason: message.reason})
return;
}
if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
}, libSupport.compare)
logger.warn("Horizontally scaling up: " +
JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
} else {
/**
* function to resource map - holds a min heap of resources associated with a function
* the min heap is sorted based on a metric [TBD] like CPU usage, request count, mem usage etc
* TODO: decide on metric to use for sorting.
*/
let resourceHeap = []
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
}, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap)
logger.warn("Creating new resource pool"
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
}
let resource = resourceMap.get(message.resource_id)
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
"reason": "deployment",
"status": true,
"timestamp": date.toISOString()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue.length != 0) {
let {req, res} = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap)
.then(() => {
})
}
db.delete(message.functionHash + message.runtime)
}
} else if (topic == "removeWorker") {
logger.warn("Worker blown: Removing Metadata " + message);
try {
......@@ -403,6 +416,38 @@ consumer.on('message', function (message) {
}
} else if (topic == "hscale") {
message = JSON.parse(message)
let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID
node_id = getAddress(), // Requests the RM for address and other metadata for function placement
port = libSupport.getPort(usedPort), // TODO: will be provided by the RM
runtime = message.runtime,
functionHash = message.functionHash
let payload = [{
topic: node_id,
messages: JSON.stringify({
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id,
runtime, functionHash,
port
}),
partition: 0
}]
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
}))
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port, node_id
})
logger.info(resourceMap);
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
} else if (topic == "RESPONSE_RM_2_DM") {
logger.info("Response from RM: " + message);
......@@ -427,5 +472,24 @@ consumer.on('message', function (message) {
}
});
setInterval(dispatch, 2000);
function autoscalar() {
functionToResource.forEach((resourceList, functionKey, map) => {
console.log(resourceList);
if (resourceList.length > 0 && resourceList[resourceList.length - 1].metric > 100) {
let resource = resourceMap.get(resourceList[resourceList.length - 1].resource_id)
logger.warn(`resource ${resourceList[resourceList.length - 1]} exceeded autoscalar threshold. Scaling up!`)
let payload = [{
topic: "hscale",
messages: JSON.stringify({ "runtime": resource.runtime, "functionHash": resource.functionHash })
}]
producer.send(payload, function () { })
}
});
}
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment