Commit f51e5697 authored by Nilanjan Daw's avatar Nilanjan Daw

Added memory limits to isolate and processes. Ref issue #9

Tested process termination on memory limit exceeded. Need to test it for isolates
parent 3d8147e1
......@@ -8,38 +8,46 @@ const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = constants.registry_url
const logger = libSupport.logger
function runIsolate(local_repository, functionHash, port, resource_id) {
function runIsolate(local_repository, metadata) {
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => {
const worker = new Worker(filename);
const worker = new Worker(filename, {
resourceLimits: {
maxOldGenerationSizeMb: memory
}
});
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
logger.info(`Isolate Worker with resource_id ${resource_id} blown`);
})
worker.on('online', () => {
resolve()
})
});
}
function runProcess(local_repository, functionHash, port, resource_id) {
function runProcess(local_repository, metadata) {
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('node', [filename, port]);
let result = "";
const process = spawn('node', [`--max-old-space-size=${memory}`, filename, port]);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
result += data;
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference);
resolve(result);
});
process.stderr.on('data', (data) => {
......@@ -48,6 +56,7 @@ function runProcess(local_repository, functionHash, port, resource_id) {
});
process.on('close', (code) => {
resolve(code);
logger.info(`Process Environment with resource_id ${resource_id} blown`);
});
})
......@@ -55,7 +64,12 @@ function runProcess(local_repository, functionHash, port, resource_id) {
}
function runContainer(imageName, port, resource_id) {
function runContainer(metadata) {
let imageName = metadata.functionHash,
port = metadata.port,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
logger.info(imageName);
return new Promise((resolve, reject) => {
......
......@@ -55,7 +55,14 @@ libSupport.makeTopic(node_id).then(() => {
console.log("metadata", json);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
startWorker(local_repository, functionHash, resource_id, producer, runtime, port)
let metadata = {
resource_id, functionHash,
runtime, port,
resources: {
memory: json.memory
}
}
startWorker(local_repository, producer, metadata)
})
}).catch(err => {
logger.error("something went wrong" + err.toString())
......@@ -102,16 +109,44 @@ libSupport.download(constants.grunt_host, "grunt").then(() => {
* @param {String} runtime
* @param {Number} port
*/
function startWorker(local_repository, functionHash, resource_id, producer, runtime, port) {
logger.info(`Using port ${port} for functionHash ${functionHash}`);
function startWorker(local_repository, producer, metadata) {
let runtime = metadata.runtime
console.log(metadata);
fs.writeFile('./local_repository/config.json', JSON.stringify({port, functionHash, resource_id, runtime}), () => {
logger.info(`Using port ${metadata.port} for functionHash ${metadata.functionHash}`);
fs.writeFile('./local_repository/config.json', JSON.stringify({
port: metadata.port,
functionHash: metadata.functionHash,
resource_id: metadata.resource_id,
runtime: metadata.runtime,
memory: metadata.resources.memory
}), () => {
if (runtime === "isolate")
execute.runIsolate(local_repository, functionHash, port, resource_id)
execute.runIsolate(local_repository, metadata)
.catch(err => {
logger.error("=====================deployment failed=========================");
producer.send([{
topic: "deployed",
messages: JSON.stringify({
"status": false,
resource_id: metadata.resource_id,
"reason": "isolate exit"
})
}], () => { })
})
else if (runtime === "process")
execute.runProcess(local_repository, functionHash, port, resource_id)
execute.runProcess(local_repository, metadata)
.catch(err => {
logger.error("=====================deployment failed=========================");
producer.send([{ topic: "deployed",
messages: JSON.stringify({
"status": false,
resource_id: metadata.resource_id,
"reason": "process exit"
}) }], () => { })
})
else if (runtime === "container")
execute.runContainer(functionHash, port, resource_id)
execute.runContainer(metadata)
else {
producer.send(
[{
......
......@@ -317,11 +317,14 @@ consumer.on('message', function (message) {
} catch (e) {
// process.exit(0)
}
logger.info("Deployed Resource: " + JSON.stringify(message));
if (db.has(message.resource_id)) {
let { req, res } = db.get(message.resource_id)
if (message.status == false) {
res.status(400).json({reason: message.reason})
return;
}
if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
heap.push(resourceHeap, {
......
......@@ -47,7 +47,8 @@ app.listen(port, () => {
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal, runtime, resource_id })
messages: JSON.stringify({ functionHash, portExternal, runtime, resource_id }),
"status": true
}], () => { })
})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment