Commit 0f85a506 authored by Naman Dixit's avatar Naman Dixit

Merge commit

Mere branch 'master' of https://git.cse.iitb.ac.in/synerg/xanadu
parents 50e49003 5a5cae29
...@@ -3,5 +3,8 @@ ...@@ -3,5 +3,8 @@
"master_port": 8080, "master_port": 8080,
"master_address": "localhost", "master_address": "localhost",
"kafka_host": "10.129.6.5:9092", "kafka_host": "10.129.6.5:9092",
"log_channel": "LOG_COMMON" "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON",
"couchdb_host": "10.129.6.5:5984",
"couchdb_db_name": "serverless"
} }
\ No newline at end of file
...@@ -8,38 +8,47 @@ const { Worker, isMainThread, workerData } = require('worker_threads'); ...@@ -8,38 +8,47 @@ const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = constants.registry_url const registry_url = constants.registry_url
const logger = libSupport.logger const logger = libSupport.logger
function runIsolate(local_repository, functionHash, port, resource_id) { function runIsolate(local_repository, metadata) {
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js" let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const worker = new Worker(filename); const worker = new Worker(filename, {
argv: [resource_id, functionHash, port, "isolate"],
resourceLimits: {
maxOldGenerationSizeMb: memory
}
});
worker.on('message', resolve); worker.on('message', resolve);
worker.on('error', reject); worker.on('error', reject);
worker.on('exit', (code) => { worker.on('exit', (code) => {
if (code !== 0) if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`)); reject(new Error(`Worker stopped with exit code ${code}`));
logger.info(`Isolate Worker with resource_id ${resource_id} blown`); logger.info(`Isolate Worker with resource_id ${resource_id} blown`);
})
worker.on('online', () => {
resolve() resolve()
}) })
}); });
} }
function runProcess(local_repository, functionHash, port, resource_id) { function runProcess(local_repository, metadata) {
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js" let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timeStart = Date.now() let timeStart = Date.now()
const process = spawn('node', [filename, port]);
let result = ""; const process = spawn('node', [filename, resource_id, functionHash, port, "process", `--max-old-space-size=${memory}` ]);
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); console.log(`stdout: ${data}`);
result += data;
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference); console.log("process time taken: ", timeDifference);
resolve(result);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
...@@ -48,6 +57,7 @@ function runProcess(local_repository, functionHash, port, resource_id) { ...@@ -48,6 +57,7 @@ function runProcess(local_repository, functionHash, port, resource_id) {
}); });
process.on('close', (code) => { process.on('close', (code) => {
resolve(code);
logger.info(`Process Environment with resource_id ${resource_id} blown`); logger.info(`Process Environment with resource_id ${resource_id} blown`);
}); });
}) })
...@@ -55,7 +65,12 @@ function runProcess(local_repository, functionHash, port, resource_id) { ...@@ -55,7 +65,12 @@ function runProcess(local_repository, functionHash, port, resource_id) {
} }
function runContainer(imageName, port, resource_id) { function runContainer(metadata) {
let imageName = metadata.functionHash,
port = metadata.port,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
logger.info(imageName); logger.info(imageName);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
...@@ -76,7 +91,7 @@ function runContainer(imageName, port, resource_id) { ...@@ -76,7 +91,7 @@ function runContainer(imageName, port, resource_id) {
if (code != 0) if (code != 0)
reject("error") reject("error")
else { else {
const process = spawn('docker', ["run", "--rm", "-p", `${port}:5000`, "--name", resource_id, registry_url + imageName, const process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`, "--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container"]); resource_id, imageName, port, "container"]);
let result = ""; let result = "";
// timeStart = Date.now() // timeStart = Date.now()
...@@ -103,7 +118,7 @@ function runContainer(imageName, port, resource_id) { ...@@ -103,7 +118,7 @@ function runContainer(imageName, port, resource_id) {
} else { } else {
logger.info("container starting at port", port); logger.info("container starting at port", port);
const process = spawn('docker', ["run", "--rm", "-p", `${port}:5000`, "--name", resource_id, const process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container"]); registry_url + imageName, resource_id, imageName, port, "container"]);
let result = ""; let result = "";
// timeStart = Date.now() // timeStart = Date.now()
......
'use strict'; 'use strict';
const constants = require(".././constants.json") const constants = require(".././constants.json")
const secrets = require('./secrets.json')
const config = require('./config.json') const config = require('./config.json')
const libSupport = require('./lib') const libSupport = require('./lib')
libSupport.updateConfig() libSupport.updateConfig()
...@@ -7,6 +8,11 @@ const node_id = config.id ...@@ -7,6 +8,11 @@ const node_id = config.id
const {spawn } = require('child_process') const {spawn } = require('child_process')
const execute = require('./execute') const execute = require('./execute')
const fs = require('fs') const fs = require('fs')
const fetch = require('node-fetch');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
const kafka = require('kafka-node') const kafka = require('kafka-node')
const logger = libSupport.logger const logger = libSupport.logger
...@@ -44,9 +50,24 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -44,9 +50,24 @@ libSupport.makeTopic(node_id).then(() => {
*/ */
if (message.type === "execute") { if (message.type === "execute") {
logger.info("Received Deployment request for resource_id: " + resource_id); logger.info("Received Deployment request for resource_id: " + resource_id);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => { fetch(metadataDB + functionHash).then(res => res.json())
startWorker(local_repository, functionHash, resource_id, producer, runtime, port) .then(json => {
console.log("metadata", json);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
let metadata = {
resource_id, functionHash,
runtime, port,
resources: {
memory: json.memory
}
}
startWorker(local_repository, producer, metadata)
}) })
}).catch(err => {
logger.error("something went wrong" + err.toString())
});
} }
...@@ -56,7 +77,7 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -56,7 +77,7 @@ libSupport.makeTopic(node_id).then(() => {
/** /**
* download and start grunt * download and start grunt
*/ */
libSupport.download(host_url + '/repository/grunt', "grunt").then(() => { libSupport.download(constants.grunt_host, "grunt").then(() => {
logger.info("Downloaded grunt binary from repository") logger.info("Downloaded grunt binary from repository")
fs.chmod('grunt', 0o555, (err) => { fs.chmod('grunt', 0o555, (err) => {
logger.info("grunt made executable. Starting grunt") logger.info("grunt made executable. Starting grunt")
...@@ -88,26 +109,47 @@ libSupport.download(host_url + '/repository/grunt', "grunt").then(() => { ...@@ -88,26 +109,47 @@ libSupport.download(host_url + '/repository/grunt', "grunt").then(() => {
* @param {String} runtime * @param {String} runtime
* @param {Number} port * @param {Number} port
*/ */
function startWorker(local_repository, functionHash, resource_id, producer, runtime, port) { function startWorker(local_repository, producer, metadata) {
logger.info(`Using port ${port} for functionHash ${functionHash}`); let runtime = metadata.runtime
console.log(metadata);
fs.writeFile('./local_repository/config.json', JSON.stringify({port, functionHash, resource_id, runtime}), () => { logger.info(`Using port ${metadata.port} for functionHash ${metadata.functionHash}`)
if (runtime === "isolate")
execute.runIsolate(local_repository, functionHash, port, resource_id) if (runtime === "isolate")
else if (runtime === "process") execute.runIsolate(local_repository, metadata)
execute.runProcess(local_repository, functionHash, port, resource_id) .catch(err => {
else if (runtime === "container") logger.error("=====================deployment failed=========================");
execute.runContainer(functionHash, port, resource_id) producer.send([{
else { topic: "deployed",
producer.send( messages: JSON.stringify({
[{ "status": false,
topic: "response", resource_id: metadata.resource_id,
messages: JSON.stringify({ status: "unknown runtime" }) "reason": "isolate exit"
}], () => { }) })
}], () => { })
return })
} else if (runtime === "process")
}); execute.runProcess(local_repository, metadata)
.catch(err => {
logger.error("=====================deployment failed=========================");
producer.send([{ topic: "deployed",
messages: JSON.stringify({
"status": false,
resource_id: metadata.resource_id,
"reason": "process exit"
}) }], () => { })
})
else if (runtime === "container")
execute.runContainer(metadata)
else {
producer.send(
[{
topic: "response",
messages: JSON.stringify({ status: "unknown runtime" })
}], () => { })
return
}
} }
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"express": "^4.17.1", "express": "^4.17.1",
"express-fileupload": "^1.1.6", "express-fileupload": "^1.1.6",
"isolated-vm": "^3.0.0",
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
......
...@@ -4,9 +4,15 @@ const express = require('express') ...@@ -4,9 +4,15 @@ const express = require('express')
const bodyParser = require('body-parser') const bodyParser = require('body-parser')
const fileUpload = require('express-fileupload'); const fileUpload = require('express-fileupload');
const constants = require('.././constants.json') const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const fs = require('fs') const fs = require('fs')
const { spawn } = require('child_process'); const { spawn } = require('child_process');
const morgan = require('morgan') const morgan = require('morgan')
const heap = require('heap')
const fetch = require('node-fetch');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
...@@ -14,11 +20,12 @@ const logger = libSupport.logger ...@@ -14,11 +20,12 @@ const logger = libSupport.logger
let date = new Date(); let date = new Date();
let log_channel = constants.log_channel let log_channel = constants.log_channel
let functionToResource = new Map(), // TODO: make the resource a list for horizontal scale out let usedPort = new Map(), // TODO: remove after integration with RM
usedPort = new Map(), // TODO: remove after integration with RM
rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation
db = new Map(), db = new Map(), // queue holding request to be dispatched
resourceMap = new Map() resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map() // a function to resource map. Each map contains a minheap of
// resources associated with the function
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -30,11 +37,11 @@ let kafka = require('kafka-node'), ...@@ -30,11 +37,11 @@ let kafka = require('kafka-node'),
Consumer = kafka.Consumer, Consumer = kafka.Consumer,
consumer = new Consumer(client, consumer = new Consumer(client,
[ [
{ topic: 'response', partition: 0, offset: 0}, { topic: 'heartbeat' }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: 'heartbeat' }, { topic: "deployed" }, // receives deployment confirmation from workers
{ topic: "deployed" }, { topic: "removeWorker" }, // received when a executor environment is blown at the worker
{ topic: "removeWorker" }, { topic: "RESPONSE_RM_2_DM" }, // receives deployment details from RM
{ topic: "RESPONSE_RM_2_DM"} { topic: "hscale" } // receives signals for horizontal scaling
], ],
[ [
{ autoCommit: true } { autoCommit: true }
...@@ -64,9 +71,39 @@ app.post('/serverless/deploy', (req, res) => { ...@@ -64,9 +71,39 @@ app.post('/serverless/deploy', (req, res) => {
let file = req.files.serverless let file = req.files.serverless
let functionHash = file.md5 let functionHash = file.md5
file.mv(file_path + functionHash, function (err) { file.mv(file_path + functionHash, function (err) {
functionHash = libSupport.generateExecutor(file_path, functionHash) functionHash = libSupport.generateExecutor(file_path, functionHash)
/**
* Adding meta caching via couchdb
* This will create / update function related metadata like resource limits etc
* on a database named "serverless".
*/
fetch(metadataDB + functionHash).then(res => res.json())
.then(json => {
if (json.error === "not_found") {
logger.warn("New function, creating metadata")
fetch(metadataDB + functionHash, {
method: 'put',
body: JSON.stringify({
memory: req.body.memory
}),
headers: { 'Content-Type': 'application/json' },
}).then(res => res.json())
.then(json => console.log(json));
} else {
logger.warn('Repeat deployment, updating metadata')
fetch(metadataDB + functionHash, {
method: 'put',
body: JSON.stringify({
memory: req.body.memory,
_rev: json._rev
}),
headers: { 'Content-Type': 'application/json' },
}).then(res => res.json())
.then(json => console.log(json));
}
});
if (err) { if (err) {
logger.error(err) logger.error(err)
res.send("error").status(400) res.send("error").status(400)
...@@ -163,21 +200,19 @@ function deployContainer(path, imageName) { ...@@ -163,21 +200,19 @@ function deployContainer(path, imageName) {
*/ */
app.post('/serverless/execute/:id', (req, res) => { app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime let runtime = req.body.runtime
if (functionToResource.has(req.params.id + runtime)) { let id = req.params.id + runtime
/** if (functionToResource.has(id)) {
* Bypass deployment pipeline if resource available libSupport.reverseProxy(req, res, functionToResource, resourceMap)
*/
let forwardTo = functionToResource.get(req.params.id + runtime)
let resource = resourceMap.get(forwardTo.resource_id)
logger.info("resource found " + JSON.stringify(forwardTo) +
" forwarding via reverse proxy to: " + JSON.stringify(resource));
libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`)
} else { } else {
/** /**
* FIXME: Here, every request even for the same function will be queued up potentially launching multiple * Requests are queued up before being dispatched. To prevent requests coming in for the
* resource of the same type * same function from starting too many workers, they are grouped together
* and one worker is started per group.
*/ */
if (db.has(req.params.id + runtime)) {
db.get(req.params.id + runtime).push({ req, res })
return;
}
requestQueue.push({ req, res }) requestQueue.push({ req, res })
/** /**
* We store functions for function placement heuristics purposes. This lets us look into the function * We store functions for function placement heuristics purposes. This lets us look into the function
...@@ -189,47 +224,53 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -189,47 +224,53 @@ app.post('/serverless/execute/:id', (req, res) => {
}) })
/** /**
* Send dispatch signal and deploy resources after consultation with the RM * Send dispatch signal to Worker nodes and deploy resources after consultation with the RM
*/ */
function dispatch() { function dispatch() {
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length) let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
for (let i = 0; i < lookbackWindow; i++) { for (let i = 0; i < lookbackWindow; i++) {
let {req, res} = requestQueue.shift() let {req, res} = requestQueue.shift()
// logger.info(req.body) // logger.info(req.body)
let runtime = req.body.runtime let runtime = req.body.runtime
let functionHash = req.params.id let functionHash = req.params.id
if (!db.has(functionHash + runtime)) {
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID db.set(functionHash + runtime, [])
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`); db.get(functionHash + runtime).push({ req, res })
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
let node_id = getAddress() // Requests the RM for address and other metadata for function placement logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM
let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM
let payload = [{ let payload = [{
topic: node_id, topic: node_id,
messages: JSON.stringify({ messages: JSON.stringify({
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker "type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id,
runtime, functionHash,
port
}),
partition: 0
}]
logger.info("Requesting RM " + JSON.stringify({
resource_id, resource_id,
runtime, functionHash, "memory": 332,
port }))
}),
partition: 0
}]
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
}))
/** uncomment when RM is unavailable */ /** uncomment when RM is unavailable */
resourceMap.set(resource_id, { resourceMap.set(resource_id, {
runtime, functionHash, port, node_id runtime, functionHash, port, node_id
}) })
logger.info(resourceMap); logger.info(resourceMap);
producer.send(payload, () => { producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`) logger.info(`Resource Deployment request sent to Dispatch Agent`)
}) })
db.set(resource_id, { req, res }) } else {
logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res })
}
...@@ -255,6 +296,78 @@ function getAddress() { ...@@ -255,6 +296,78 @@ function getAddress() {
return workerNodes[Math.floor(Math.random() * workerNodes.length)]; return workerNodes[Math.floor(Math.random() * workerNodes.length)];
} }
function postDeploy(message) {
logger.info("Deployed Resource: " + JSON.stringify(message));
if (message.status == false) {
let sendQueue = db.get(message.functionHash + message.runtime)
// TODO: handle failure
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
res.status(400).json({ reason: message.reason })
}
db.delete(message.functionHash + message.runtime)
return;
}
if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
}, libSupport.compare)
logger.warn("Horizontally scaling up: " +
JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
} else {
/**
* function to resource map - holds a min heap of resources associated with a function
* the min heap is sorted based on a metric [TBD] like CPU usage, request count, mem usage etc
* TODO: decide on metric to use for sorting.
*/
let resourceHeap = []
heap.push(resourceHeap, {
resource_id: message.resource_id,
metric: 0
}, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap)
logger.warn("Creating new resource pool"
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
}
let resource = resourceMap.get(message.resource_id)
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
"reason": "deployment",
"status": true,
"timestamp": date.toISOString()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
if (db.has(message.functionHash + message.runtime)) {
let sendQueue = db.get(message.functionHash + message.runtime)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap)
.then(() => {
})
}
db.delete(message.functionHash + message.runtime)
}
}
consumer.on('message', function (message) { consumer.on('message', function (message) {
let topic = message.topic let topic = message.topic
...@@ -276,50 +389,65 @@ consumer.on('message', function (message) { ...@@ -276,50 +389,65 @@ consumer.on('message', function (message) {
} catch (e) { } catch (e) {
// process.exit(0) // process.exit(0)
} }
postDeploy(message)
logger.info("Deployed Resource: " + JSON.stringify(message));
if (db.has(message.resource_id)) {
let { req, res } = db.get(message.resource_id)
functionToResource.set(message.functionHash + message.runtime, {
resource_id: message.resource_id
})
let resource = resourceMap.get(message.resource_id)
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
"reason": "deployment",
"status": true,
"timestamp": date.toISOString()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
libSupport.reverseProxy(req, res,
`http://${resource.node_id}:${resource.port}/serverless/function/execute`)
.then(() => {
db.delete(message.resource_id)
})
}
} else if (topic == "removeWorker") { } else if (topic == "removeWorker") {
logger.info("Worker blown: Removing Metadata " + message); logger.warn("Worker blown: Removing Metadata " + message);
try { try {
message = JSON.parse(message) message = JSON.parse(message)
} catch(e) { } catch(e) {
// process.exit(0) // process.exit(0)
} }
usedPort.delete(message.port) usedPort.delete(message.port)
let resource = functionToResource.get(message.functionHash + message.runtime) if (functionToResource.has(message.functionHash + message.runtime)) {
functionToResource.delete(message.functionHash + message.runtime) let resourceArray = functionToResource.get(message.functionHash + message.runtime)
if (resource != null) for (let i = 0; i < resourceArray.length; i++)
resourceMap.delete(resource.resource_id) if (resourceArray[i].resource_id === message.resource_id) {
resourceArray.splice(i, 1);
break;
}
heap.heapify(resourceArray, libSupport.compare)
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
functionToResource.delete(message.functionHash + message.runtime)
}
} else if (topic == "hscale") {
message = JSON.parse(message)
let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID
node_id = getAddress(), // Requests the RM for address and other metadata for function placement
port = libSupport.getPort(usedPort), // TODO: will be provided by the RM
runtime = message.runtime,
functionHash = message.functionHash
let payload = [{
topic: node_id,
messages: JSON.stringify({
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id,
runtime, functionHash,
port
}),
partition: 0
}]
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
}))
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port, node_id
})
logger.info(resourceMap);
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
} else if (topic == "RESPONSE_RM_2_DM") { } else if (topic == "RESPONSE_RM_2_DM") {
logger.info("Response from RM: " + message); logger.info("Response from RM: " + message);
...@@ -344,5 +472,24 @@ consumer.on('message', function (message) { ...@@ -344,5 +472,24 @@ consumer.on('message', function (message) {
} }
}); });
setInterval(dispatch, 2000); function autoscalar() {
functionToResource.forEach((resourceList, functionKey, map) => {
console.log(resourceList);
if (resourceList.length > 0 && resourceList[resourceList.length - 1].metric > 100) {
let resource = resourceMap.get(resourceList[resourceList.length - 1].resource_id)
logger.warn(`resource ${resourceList[resourceList.length - 1]} exceeded autoscalar threshold. Scaling up!`)
let payload = [{
topic: "hscale",
messages: JSON.stringify({ "runtime": resource.runtime, "functionHash": resource.functionHash })
}]
producer.send(payload, function () { })
}
});
}
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`)) app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
const crypto = require('crypto'); const crypto = require('crypto');
const fs = require('fs') const fs = require('fs')
const rp = require('request-promise'); const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston') const winston = require('winston')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap')
/** /**
* Generates unique IDs of arbitrary length * Generates unique IDs of arbitrary length
...@@ -20,13 +21,13 @@ function makeid(length) { ...@@ -20,13 +21,13 @@ function makeid(length) {
} }
/** /**
* generates the runtime executor after inserting the received function * generates the runtime executor after inserting the received function
* TODO: make this asynchronous * TODO: make this asynchronous
* @param {string Path from where to extract the function} functionPath * @param {string Path from where to extract the function} functionPath
* @param {string Function Hash value} functionHash * @param {string Function Hash value} functionHash
*/ */
function generateExecutor(functionPath, functionHash) { function generateExecutor(functionPath, functionHash) {
input = fs.readFileSync('./repository/worker_env/env.js') input = fs.readFileSync('./repository/worker_env/env.js')
functionFile = fs.readFileSync(functionPath + functionHash) functionFile = fs.readFileSync(functionPath + functionHash)
searchSize = "(resolve, reject) => {".length searchSize = "(resolve, reject) => {".length
...@@ -42,36 +43,58 @@ function makeid(length) { ...@@ -42,36 +43,58 @@ function makeid(length) {
return hash return hash
} }
function reverseProxy(req, res, url, tryout) { function reverseProxy(req, res, functionToResource, resourceMap) {
return new Promise((resolve, reject) => {
logger.info("Request received at reverseproxy. Forwarding to: " + url);
var options = { return new Promise((resolve, reject) => {
method: 'POST', let runtime = req.body.runtime
uri: url, let id = req.params.id + runtime
body: req.body, /**
json: true // Automatically stringifies the body to JSON * Bypass deployment pipeline if resource available
}; */
let functionHeap = functionToResource.get(id)
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
"\n forwarding via reverse proxy to: " + JSON.stringify(resource));
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.metric += 1
heap.heapify(functionHeap, compare)
logger.info(functionHeap);
var options = {
method: 'POST',
uri: url,
body: req.body,
json: true // Automatically stringifies the body to JSON
};
rp(options) // console.log(options);
.then(function (parsedBody) {
// console.log("parsed body:", parsedBody);
res.json(parsedBody) rp(options)
resolve() .then(function (parsedBody) {
})
.catch(function (err) { res.json(parsedBody)
if (err.error.errno === "ECONNREFUSED") { forwardTo.metric -= 1
reverseProxy(req, res, url, (tryout != null) ? tryout + 1 : 1) heap.heapify(functionHeap, compare)
} else { console.log(functionHeap);
logger.error("error", err.error.errno); resolve()
res.json(err.message).status(err.statusCode) })
resolve() .catch(function (err) {
} forwardTo.metric -= 1
}); heap.heapify(functionHeap, compare)
}) console.log(functionHeap);
} logger.error("error" + err.error.errno);
res.json(err.message).status(err.statusCode)
resolve()
});
})
}
function getPort(usedPort) { function getPort(usedPort) {
let port = -1, ctr = 0 let port = -1, ctr = 0
do { do {
min = Math.ceil(30000); min = Math.ceil(30000);
...@@ -112,6 +135,10 @@ const logger = winston.createLogger({ ...@@ -112,6 +135,10 @@ const logger = winston.createLogger({
] ]
}); });
function compare(a, b) {
return a.metric - b.metric
}
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger makeid, generateExecutor, reverseProxy, getPort, logger, compare
} }
\ No newline at end of file
...@@ -13,11 +13,12 @@ ...@@ -13,11 +13,12 @@
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"express": "^4.17.1", "express": "^4.17.1",
"express-fileupload": "^1.1.6", "express-fileupload": "^1.1.6",
"heap": "^0.2.6",
"isolated-vm": "^3.0.0", "isolated-vm": "^3.0.0",
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"nano": "^8.1.0", "node-fetch": "^2.6.0",
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.88.0", "request": "^2.88.0",
"request-promise": "^4.2.5", "request-promise": "^4.2.5",
......
...@@ -2,21 +2,12 @@ ...@@ -2,21 +2,12 @@
const express = require('express') const express = require('express')
const bodyParser = require('body-parser') const bodyParser = require('body-parser')
const app = express() const app = express()
let port = 5000, resource_id, functionHash, portExternal, runtime let port = 5000, resource_id, functionHash, runtime
let config = null;
try { resource_id = process.argv[2]
config = require('./config.json') functionHash = process.argv[3]
port = config.port port = process.argv[4]
resource_id = config.resource_id runtime = process.argv[5]
functionHash = config.functionHash
runtime = config.runtime
} catch (e) {
port = 5000
resource_id = process.argv[2]
functionHash = process.argv[3]
portExternal = process.argv[4]
runtime = process.argv[5]
}
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -47,18 +38,19 @@ app.listen(port, () => { ...@@ -47,18 +38,19 @@ app.listen(port, () => {
producer.send( producer.send(
[{ [{
topic: "deployed", topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal, runtime, resource_id }) messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id }),
"status": true
}], () => { }) }], () => { })
}) })
function shouldDie() { function shouldDie() {
if (Date.now() - lastRequest > 5 * 1000) { if (Date.now() - lastRequest > 30 * 1000) {
console.log("Idle for too long. Exiting"); console.log("Idle for too long. Exiting");
producer.send( producer.send(
[{ [{
topic: "removeWorker", topic: "removeWorker",
messages: JSON.stringify({ functionHash, portExternal, runtime, resource_id }) messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id })
}], () => { }], () => {
console.log("Ending worker for function", functionHash, "resource_id", resource_id); console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0) process.exit(0)
......
File added
...@@ -23,7 +23,7 @@ The Dispatch Manager (DM) sends a request to the Resource Manager (RM), detailin ...@@ -23,7 +23,7 @@ The Dispatch Manager (DM) sends a request to the Resource Manager (RM), detailin
```javascript ```javascript
{ {
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"memory": 1024, // in MiB "memory": 1024, // in MiB
... // Any other resources ... // Any other resources
} }
...@@ -34,7 +34,7 @@ Format: ...@@ -34,7 +34,7 @@ Format:
```javascript ```javascript
{ {
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"grunts": [ "grunts": [
{ node_id: some unique ID, port: port address}, ... { node_id: some unique ID, port: port address}, ...
] // List of machine IDs ] // List of machine IDs
...@@ -44,10 +44,13 @@ Format: ...@@ -44,10 +44,13 @@ Format:
Once the runtime entity has been launched (or the launch has failed), the Executor sends back a status message on the `LOG_COMMON` topic. Once the runtime entity has been launched (or the launch has failed), the Executor sends back a status message on the `LOG_COMMON` topic.
```javascript ```javascript
{ {
"node_id" : "uique-machine-id", "message_type" : "deployment_launch",
"node_id" : "unique-machine-id",
"entity_id" : "handle for the actual container/VM/etc.",
"entity_type" : "docker/libvirt/etc.",
"resource_id": "logical-entity-id", "resource_id": "logical-entity-id",
"function_id": "unique-function-id", "function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"reason": "deployment"/"termination" "reason": "deployment"/"termination"
"status": true/false // Only valid if reason==deployment "status": true/false // Only valid if reason==deployment
} }
...@@ -57,28 +60,31 @@ Instrumentation data is also sent on the `LOG_COMMON` topic. This data is sent f ...@@ -57,28 +60,31 @@ Instrumentation data is also sent on the `LOG_COMMON` topic. This data is sent f
and whoever needs the data is allowed to read it. Each message is required to have atleast three fields: `node_id`, `resource_id` and `function_id`. and whoever needs the data is allowed to read it. Each message is required to have atleast three fields: `node_id`, `resource_id` and `function_id`.
```javascript ```javascript
{ // Example message from Executor { // Example message from Executor
"node_id" : "uique-machine-id", "message_type" : "instrumentation",
"node_id" : "unique-machine-id",
"resource_id": "logical-entity-id", "resource_id": "logical-entity-id",
"function_id": "unique-function-id", "function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"cpu" : 343, // in MHz "cpu" : 343, // in MHz
"memory": 534, // in MiB "memory": 534, // in MiB
"network": 234 // in KBps "network": 234 // in KBps
} }
{ // Example message from reverse proxy { // Example message from reverse proxy
"node_id" : "uique-machine-id", "message_type" : "instrumentation",
"node_id" : "unique-machine-id",
"resource_id": "logical-entity-id", "resource_id": "logical-entity-id",
"function_id": "unique-function-id", "function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"average_fn_time" : 23 // in ms "average_fn_time" : 23 // in ms
} }
{ // Example message from dispatch manager { // Example message from dispatch manager
"node_id" : "uique-machine-id", "message_type" : "instrumentation",
"node_id" : "unique-machine-id",
"resource_id": "logical-entity-id", "resource_id": "logical-entity-id",
"function_id": "unique-function-id", "function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"coldstart_time" "coldstart_time"
} }
``` ```
...@@ -223,7 +229,7 @@ resources being tracked by RDs on each machine. This data is cached by the RM. ...@@ -223,7 +229,7 @@ resources being tracked by RDs on each machine. This data is cached by the RM.
```javascript ```javascript
{ {
"node_id": "unique-machine-id", "node_id": "unique-machine-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"memory": 1024, // in MiB "memory": 1024, // in MiB
... // Any other resources ... // Any other resources
} }
...@@ -246,7 +252,7 @@ DM on topic `RESPONSE_RM_2_DM`. ...@@ -246,7 +252,7 @@ DM on topic `RESPONSE_RM_2_DM`.
```javascript ```javascript
{ {
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
// "port": 2343 --- NOT IMPLEMENTED YET // "port": 2343 --- NOT IMPLEMENTED YET
"nodes": ["a", "b", ...] // List of unique machine IDs "nodes": ["a", "b", ...] // List of unique machine IDs
} }
...@@ -258,7 +264,7 @@ Format: ...@@ -258,7 +264,7 @@ Format:
```javascript ```javascript
{ {
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"memory": 1024, // in MiB "memory": 1024, // in MiB
... // Any other resources ... // Any other resources
} }
...@@ -269,7 +275,7 @@ The RDs recieve this message and send back whether on not they satisfy the const ...@@ -269,7 +275,7 @@ The RDs recieve this message and send back whether on not they satisfy the const
{ {
"node_id": "unique-machine-id", "node_id": "unique-machine-id",
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp", "timestamp" : "time(2) compatible timestamp",
"success" : 0/1 // 0 = fail, 1 = success "success" : 0/1 // 0 = fail, 1 = success
} }
``` ```
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment