Commit 0029b21b authored by Nilanjan Daw's avatar Nilanjan Daw

Integrated RM with new arch

Integrated RM functionalities with new architecture. Tested using dummy_rm
parent d872ace4
......@@ -3,4 +3,6 @@ node_modules
package-lock.json
firecracker*
secrets.json
grunt
\ No newline at end of file
grunt
.clinic
rm_dummy.js
\ No newline at end of file
......@@ -6,5 +6,8 @@
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON",
"couchdb_host": "10.129.6.5:5984",
"couchdb_db_name": "serverless"
"couchdb_db_name": "serverless",
"topics": {
}
}
\ No newline at end of file
......@@ -10,6 +10,8 @@ const { spawn } = require('child_process');
const morgan = require('morgan')
const heap = require('heap')
const fetch = require('node-fetch');
const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
......@@ -40,20 +42,23 @@ let kafka = require('kafka-node'),
{ topic: 'heartbeat' }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: "deployed" }, // receives deployment confirmation from workers
{ topic: "removeWorker" }, // received when a executor environment is blown at the worker
{ topic: "RESPONSE_RM_2_DM" }, // receives deployment details from RM
{ topic: "RESPONSE_RM_2_DM_DUMMY" }, // receives deployment details from RM
{ topic: "hscale" } // receives signals for horizontal scaling
],
[
{ autoCommit: true }
])
app.use(morgan('combined'))
app.use(morgan('combined', {
skip: function (req, res) { return res.statusCode < 400 }
}))
app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json())
const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path));
app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec }));
let requestQueue = []
let workerNodes = []
......@@ -61,6 +66,7 @@ let workerNodes = []
const WINDOW_SIZE = 10
const port = constants.master_port
const registry_url = constants.registry_url
const AUTOSCALAR_THRESHOLD = 100;
/**
* REST API to receive deployment requests
......@@ -239,20 +245,7 @@ function dispatch() {
db.get(functionHash + runtime).push({ req, res })
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM
let payload = [{
topic: node_id,
messages: JSON.stringify({
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id,
runtime, functionHash,
port
}),
partition: 0
}]
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
......@@ -261,33 +254,28 @@ function dispatch() {
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port, node_id
runtime, functionHash, port: null, node_id: null
})
logger.info(resourceMap);
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
} else {
logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res })
}
/** uncomment when RM is available, TODO: also update resourceMap
rmQueue.set(resource_id, payload)
let payloadToRM = [{
topic: "REQUEST_DM_2_RM",
let payloadToRM = [{
topic: "request", // changing from REQUEST_DM_2_RM
messages: JSON.stringify({
resource_id,
"memory": 332,
}),
partition: 0
}]
producer.send(payloadToRM, () => {
db.set(functionHash + runtime, { req, res })
})
*/
producer.send(payloadToRM, () => {
// db.set(functionHash + runtime, { req, res })
console.log("sent rm");
})
} else {
logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res })
}
}
}
......@@ -314,7 +302,7 @@ function postDeploy(message) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
open_request_count: 0
}, libSupport.compare)
logger.warn("Horizontally scaling up: " +
JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
......@@ -328,7 +316,7 @@ function postDeploy(message) {
let resourceHeap = []
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
open_request_count: 0
}, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap)
logger.warn("Creating new resource pool"
......@@ -336,34 +324,38 @@ function postDeploy(message) {
}
let resource = resourceMap.get(message.resource_id)
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
"reason": "deployment",
"status": true,
"timestamp": date.toISOString()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
if (db.has(message.functionHash + message.runtime)) {
let sendQueue = db.get(message.functionHash + message.runtime)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap)
.then(() => {
try {
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
"reason": "deployment",
"status": true,
"timestamp": date.toISOString()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
if (db.has(message.functionHash + message.runtime)) {
let sendQueue = db.get(message.functionHash + message.runtime)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap)
.then(() => {
})
})
}
db.delete(message.functionHash + message.runtime)
}
db.delete(message.functionHash + message.runtime)
} catch (e) {
logger.error(e.message)
}
}
......@@ -419,21 +411,10 @@ consumer.on('message', function (message) {
} else if (topic == "hscale") {
message = JSON.parse(message)
let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID
node_id = getAddress(), // Requests the RM for address and other metadata for function placement
port = libSupport.getPort(usedPort), // TODO: will be provided by the RM
runtime = message.runtime,
functionHash = message.functionHash
let payload = [{
topic: node_id,
messages: JSON.stringify({
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id,
runtime, functionHash,
port
}),
partition: 0
}]
console.log("Resource Status: ", functionToResource);
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
......@@ -442,31 +423,48 @@ consumer.on('message', function (message) {
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port, node_id
runtime, functionHash, port: null, node_id: null
})
logger.info(resourceMap);
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
let payloadToRM = [{
topic: "request", // changing from REQUEST_DM_2_RM
messages: JSON.stringify({
resource_id,
"memory": 332,
}),
partition: 0
}]
producer.send(payloadToRM, () => {
// db.set(functionHash + runtime, { req, res })
console.log("sent rm");
})
} else if (topic == "RESPONSE_RM_2_DM") {
} else if (topic == "RESPONSE_RM_2_DM_DUMMY") {
logger.info("Response from RM: " + message);
message = JSON.parse(message)
let payload = rmQueue.get(message.id)
if (payload != null) {
payload[0].topic = message.nodes[0]
logger.info(payload);
/** get port and other resources */
let resource = resourceMap.get(message.id)
resource.node_id = message.nodes[0] // TODO: update this to message.nodes[0].node_id
// resource.port = message.nodes[0].port TODO: update after RM supports port allocation
resourceMap.set(message.id, resource)
let resourceChoice = message.grunts[0]
if (resourceMap.has(message.resource_id)) {
let resource = resourceMap.get(message.resource_id)
resource.port = (resourceChoice.port) ? resourceChoice.port : libSupport.getPort(usedPort)
resource.node_id = resourceChoice.node_id
let payload = [{
topic: resource.node_id,
messages: JSON.stringify({
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id: message.resource_id,
runtime: resource.runtime, functionHash: resource.functionHash,
port: resource.port
}),
partition: 0
}]
logger.info(resourceMap);
producer.send(payload, () => { })
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
} else {
logger.error("something went wrong");
logger.error("something went wrong, resource not found in resourceMap")
}
}
......@@ -474,9 +472,8 @@ consumer.on('message', function (message) {
function autoscalar() {
functionToResource.forEach((resourceList, functionKey, map) => {
console.log(resourceList);
if (resourceList.length > 0 && resourceList[resourceList.length - 1].metric > 100) {
if (resourceList.length > 0 && resourceList[resourceList.length - 1].open_request_count > AUTOSCALAR_THRESHOLD) {
let resource = resourceMap.get(resourceList[resourceList.length - 1].resource_id)
logger.warn(`resource ${resourceList[resourceList.length - 1]} exceeded autoscalar threshold. Scaling up!`)
let payload = [{
......
......@@ -3,6 +3,8 @@ const fs = require('fs')
const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const constants = require('.././constants.json')
const { createLogger, format, transports } = winston;
const heap = require('heap')
......@@ -54,14 +56,14 @@ function reverseProxy(req, res, functionToResource, resourceMap) {
let functionHeap = functionToResource.get(id)
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
"\n forwarding via reverse proxy to: " + JSON.stringify(resource));
// logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
// "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.metric += 1
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare)
logger.info(functionHeap);
// logger.info(functionHeap);
var options = {
method: 'POST',
......@@ -77,15 +79,13 @@ function reverseProxy(req, res, functionToResource, resourceMap) {
.then(function (parsedBody) {
res.json(parsedBody)
forwardTo.metric -= 1
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
console.log(functionHeap);
resolve()
})
.catch(function (err) {
forwardTo.metric -= 1
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
console.log(functionHeap);
logger.error("error" + err.error.errno);
res.json(err.message).status(err.statusCode)
resolve()
......@@ -136,7 +136,7 @@ const logger = winston.createLogger({
});
function compare(a, b) {
return a.metric - b.metric
return a.open_request_count - b.open_request_count
}
module.exports = {
......
......@@ -15,6 +15,7 @@
"express-fileupload": "^1.1.6",
"heap": "^0.2.6",
"isolated-vm": "^3.0.0",
"kafka-logger": "^7.1.0",
"kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
......@@ -23,6 +24,7 @@
"request": "^2.88.0",
"request-promise": "^4.2.5",
"save": "^2.4.0",
"swagger-stats": "^0.95.16",
"winston": "^3.2.1"
}
}
{
"name": "xanadu",
"uriPath": "/xanadu"
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment