Commit c082b0fe authored by Nilanjan Daw's avatar Nilanjan Daw

Added memory limits to isolate and processes. Ref issue #9

Tested process crash on memory limit exceeded. Need to test it for isolates
parent 3d8147e1
...@@ -8,38 +8,46 @@ const { Worker, isMainThread, workerData } = require('worker_threads'); ...@@ -8,38 +8,46 @@ const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = constants.registry_url const registry_url = constants.registry_url
const logger = libSupport.logger const logger = libSupport.logger
function runIsolate(local_repository, functionHash, port, resource_id) { function runIsolate(local_repository, metadata) {
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js" let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const worker = new Worker(filename); const worker = new Worker(filename, {
resourceLimits: {
maxOldGenerationSizeMb: memory
}
});
worker.on('message', resolve); worker.on('message', resolve);
worker.on('error', reject); worker.on('error', reject);
worker.on('exit', (code) => { worker.on('exit', (code) => {
if (code !== 0) if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`)); reject(new Error(`Worker stopped with exit code ${code}`));
logger.info(`Isolate Worker with resource_id ${resource_id} blown`); logger.info(`Isolate Worker with resource_id ${resource_id} blown`);
})
worker.on('online', () => {
resolve() resolve()
}) })
}); });
} }
function runProcess(local_repository, functionHash, port, resource_id) { function runProcess(local_repository, metadata) {
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js" let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timeStart = Date.now() let timeStart = Date.now()
const process = spawn('node', [filename, port]);
let result = ""; const process = spawn('node', [`--max-old-space-size=${memory}`, filename, port]);
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); console.log(`stdout: ${data}`);
result += data;
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference); console.log("process time taken: ", timeDifference);
resolve(result);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
...@@ -48,6 +56,7 @@ function runProcess(local_repository, functionHash, port, resource_id) { ...@@ -48,6 +56,7 @@ function runProcess(local_repository, functionHash, port, resource_id) {
}); });
process.on('close', (code) => { process.on('close', (code) => {
resolve(code);
logger.info(`Process Environment with resource_id ${resource_id} blown`); logger.info(`Process Environment with resource_id ${resource_id} blown`);
}); });
}) })
...@@ -55,7 +64,12 @@ function runProcess(local_repository, functionHash, port, resource_id) { ...@@ -55,7 +64,12 @@ function runProcess(local_repository, functionHash, port, resource_id) {
} }
function runContainer(imageName, port, resource_id) { function runContainer(metadata) {
let imageName = metadata.functionHash,
port = metadata.port,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
logger.info(imageName); logger.info(imageName);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
......
...@@ -55,7 +55,14 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -55,7 +55,14 @@ libSupport.makeTopic(node_id).then(() => {
console.log("metadata", json); console.log("metadata", json);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => { libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
startWorker(local_repository, functionHash, resource_id, producer, runtime, port) let metadata = {
resource_id, functionHash,
runtime, port,
resources: {
memory: json.memory
}
}
startWorker(local_repository, producer, metadata)
}) })
}).catch(err => { }).catch(err => {
logger.error("something went wrong" + err.toString()) logger.error("something went wrong" + err.toString())
...@@ -102,16 +109,44 @@ libSupport.download(constants.grunt_host, "grunt").then(() => { ...@@ -102,16 +109,44 @@ libSupport.download(constants.grunt_host, "grunt").then(() => {
* @param {String} runtime * @param {String} runtime
* @param {Number} port * @param {Number} port
*/ */
function startWorker(local_repository, functionHash, resource_id, producer, runtime, port) { function startWorker(local_repository, producer, metadata) {
logger.info(`Using port ${port} for functionHash ${functionHash}`); let runtime = metadata.runtime
console.log(metadata);
fs.writeFile('./local_repository/config.json', JSON.stringify({port, functionHash, resource_id, runtime}), () => { logger.info(`Using port ${metadata.port} for functionHash ${metadata.functionHash}`);
fs.writeFile('./local_repository/config.json', JSON.stringify({
port: metadata.port,
functionHash: metadata.functionHash,
resource_id: metadata.resource_id,
runtime: metadata.runtime,
memory: metadata.resources.memory
}), () => {
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository, functionHash, port, resource_id) execute.runIsolate(local_repository, metadata)
.catch(err => {
logger.error("=====================deployment failed=========================");
producer.send([{
topic: "deployed",
messages: JSON.stringify({
"status": false,
resource_id: metadata.resource_id,
"reason": "isolate exit"
})
}], () => { })
})
else if (runtime === "process") else if (runtime === "process")
execute.runProcess(local_repository, functionHash, port, resource_id) execute.runProcess(local_repository, metadata)
.catch(err => {
logger.error("=====================deployment failed=========================");
producer.send([{ topic: "deployed",
messages: JSON.stringify({
"status": false,
resource_id: metadata.resource_id,
"reason": "process exit"
}) }], () => { })
})
else if (runtime === "container") else if (runtime === "container")
execute.runContainer(functionHash, port, resource_id) execute.runContainer(metadata)
else { else {
producer.send( producer.send(
[{ [{
......
...@@ -317,11 +317,14 @@ consumer.on('message', function (message) { ...@@ -317,11 +317,14 @@ consumer.on('message', function (message) {
} catch (e) { } catch (e) {
// process.exit(0) // process.exit(0)
} }
logger.info("Deployed Resource: " + JSON.stringify(message)); logger.info("Deployed Resource: " + JSON.stringify(message));
if (db.has(message.resource_id)) { if (db.has(message.resource_id)) {
let { req, res } = db.get(message.resource_id) let { req, res } = db.get(message.resource_id)
if (message.status == false) {
res.status(400).json({reason: message.reason})
return;
}
if (functionToResource.has(message.functionHash + message.runtime)) { if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime) let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
heap.push(resourceHeap, { heap.push(resourceHeap, {
......
...@@ -47,7 +47,8 @@ app.listen(port, () => { ...@@ -47,7 +47,8 @@ app.listen(port, () => {
producer.send( producer.send(
[{ [{
topic: "deployed", topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal, runtime, resource_id }) messages: JSON.stringify({ functionHash, portExternal, runtime, resource_id }),
"status": true
}], () => { }) }], () => { })
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment