Commit 50bf8185 authored by Nilanjan Daw's avatar Nilanjan Daw

Extracted constants to constants.json for better parameterisation

parent e9cfe3d5
......@@ -8,6 +8,14 @@
"couchdb_host": "10.129.6.5:5984",
"couchdb_db_name": "serverless",
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale"
},
"autoscalar_metrics": {
"open_request_threshold": 100
}
}
\ No newline at end of file
......@@ -40,11 +40,11 @@ let kafka = require('kafka-node'),
Consumer = kafka.Consumer,
consumer = new Consumer(client,
[
{ topic: 'heartbeat' }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: "deployed" }, // receives deployment confirmation from workers
{ topic: "removeWorker" }, // received when a executor environment is blown at the worker
{ topic: "RESPONSE_RM_2_DM" }, // receives deployment details from RM
{ topic: "hscale" } // receives signals for horizontal scaling
{ topic: constants.topics.heartbeat }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: constants.topics.deployed }, // receives deployment confirmation from workers
{ topic: constants.topics.remove_worker }, // received when a executor environment is blown at the worker
{ topic: constants.topics.response_rm_2_dm }, // receives deployment details from RM
{ topic: constants.topics.hscale } // receives signals for horizontal scaling
],
[
{ autoCommit: true }
......@@ -66,7 +66,6 @@ let requestQueue = []
const WINDOW_SIZE = 10
const port = constants.master_port
const registry_url = constants.registry_url
const AUTOSCALAR_THRESHOLD = 100;
/**
* REST API to receive deployment requests
......@@ -260,7 +259,7 @@ function dispatch() {
let payloadToRM = [{
topic: "REQUEST_DM_2_RM", // changing from REQUEST_DM_2_RM
topic: constants.topics.request_dm_2_rm, // changing from REQUEST_DM_2_RM
messages: JSON.stringify({
resource_id,
"memory": 332,
......@@ -370,7 +369,7 @@ consumer.on('message', function (message) {
logger.info("response " + message);
} else if (topic === "heartbeat") {
} else if (topic === constants.topics.heartbeat) {
message = JSON.parse(message)
if (Date.now() - message.timestamp < 1000)
if (!workerNodes.has(message.address)) {
......@@ -378,7 +377,7 @@ consumer.on('message', function (message) {
logger.warn("New worker discovered. Worker List: ")
logger.warn(workerNodes)
}
} else if (topic == "deployed") {
} else if (topic == constants.topics.deployed) {
try {
message = JSON.parse(message)
} catch (e) {
......@@ -386,7 +385,7 @@ consumer.on('message', function (message) {
}
postDeploy(message)
} else if (topic == "removeWorker") {
} else if (topic == constants.topics.remove_worker) {
logger.warn("Worker blown: Removing Metadata " + message);
try {
message = JSON.parse(message)
......@@ -411,7 +410,7 @@ consumer.on('message', function (message) {
}
} else if (topic == "hscale") {
} else if (topic == constants.topics.hscale) {
message = JSON.parse(message)
let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID
runtime = message.runtime,
......@@ -431,7 +430,7 @@ consumer.on('message', function (message) {
let payloadToRM = [{
topic: "request", // changing from REQUEST_DM_2_RM
topic: constants.topics.request_dm_2_rm, // changing from REQUEST_DM_2_RM
messages: JSON.stringify({
resource_id,
"memory": 332,
......@@ -443,7 +442,7 @@ consumer.on('message', function (message) {
console.log("sent rm");
})
} else if (topic == "RESPONSE_RM_2_DM") {
} else if (topic == constants.topics.response_rm_2_dm) {
logger.info("Response from RM: " + message);
message = JSON.parse(message)
......@@ -482,7 +481,8 @@ consumer.on('message', function (message) {
function autoscalar() {
functionToResource.forEach((resourceList, functionKey, map) => {
if (resourceList.length > 0 && resourceList[resourceList.length - 1].open_request_count > AUTOSCALAR_THRESHOLD) {
if (resourceList.length > 0 &&
resourceList[resourceList.length - 1].open_request_count > constants.autoscalar_metrics.open_request_threshold) {
let resource = resourceMap.get(resourceList[resourceList.length - 1].resource_id)
logger.warn(`resource ${resourceList[resourceList.length - 1]} exceeded autoscalar threshold. Scaling up!`)
let payload = [{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment