Commit 72a72d4b authored by Nilanjan Daw's avatar Nilanjan Daw

Added better logging at Dispatch Daemon

parent 5e40ac96
...@@ -2,8 +2,11 @@ ...@@ -2,8 +2,11 @@
// const isolateBackend = require('./isolate') // const isolateBackend = require('./isolate')
const fs = require('fs') const fs = require('fs')
const { spawn } = require('child_process'); const { spawn } = require('child_process');
const constants = require("../constants.json")
const libSupport = require('./lib')
const { Worker, isMainThread, workerData } = require('worker_threads'); const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = "10.129.6.5:5000/" const registry_url = constants.registry_url
const logger = libSupport.logger
function runIsolate(local_repository, functionHash, port, resource_id) { function runIsolate(local_repository, functionHash, port, resource_id) {
let filename = local_repository + functionHash + ".js" let filename = local_repository + functionHash + ".js"
...@@ -15,7 +18,7 @@ function runIsolate(local_repository, functionHash, port, resource_id) { ...@@ -15,7 +18,7 @@ function runIsolate(local_repository, functionHash, port, resource_id) {
worker.on('exit', (code) => { worker.on('exit', (code) => {
if (code !== 0) if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`)); reject(new Error(`Worker stopped with exit code ${code}`));
console.log("worker exited"); logger.info(`Isolate Worker with resource_id ${resource_id} blown`);
}) })
worker.on('online', () => { worker.on('online', () => {
resolve() resolve()
...@@ -40,12 +43,12 @@ function runProcess(local_repository, functionHash, port, resource_id) { ...@@ -40,12 +43,12 @@ function runProcess(local_repository, functionHash, port, resource_id) {
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`); logger.error(`stderr: ${data}`);
reject(data); reject(data);
}); });
process.on('close', (code) => { process.on('close', (code) => {
console.log(`child process exited with code ${code}`); logger.info(`Process Environment with resource_id ${resource_id} blown`);
}); });
}) })
...@@ -53,7 +56,7 @@ function runProcess(local_repository, functionHash, port, resource_id) { ...@@ -53,7 +56,7 @@ function runProcess(local_repository, functionHash, port, resource_id) {
function runContainer(imageName, port, resource_id) { function runContainer(imageName, port, resource_id) {
console.log(imageName); logger.info(imageName);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timeStart = Date.now() let timeStart = Date.now()
...@@ -73,51 +76,51 @@ function runContainer(imageName, port, resource_id) { ...@@ -73,51 +76,51 @@ function runContainer(imageName, port, resource_id) {
if (code != 0) if (code != 0)
reject("error") reject("error")
else { else {
const process = spawn('docker', ["run", "--rm", "-p", `${port}:5000`, "--name", imageName, registry_url + imageName, const process = spawn('docker', ["run", "--rm", "-p", `${port}:5000`, "--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container"]); resource_id, imageName, port, "container"]);
let result = ""; let result = "";
// timeStart = Date.now() // timeStart = Date.now()
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference); logger.info("container run time taken: ", timeDifference);
result += data; result += data;
resolve(result); resolve(result);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`); logger.error(`stderr: ${data}`);
reject(data); reject(data);
}); });
process.on('close', (code) => { process.on('close', (code) => {
console.log("Exiting container"); logger.info("Exiting container");
}) })
} }
}) })
} else { } else {
console.log("container starting at port", port); logger.info("container starting at port", port);
const process = spawn('docker', ["run", "--rm", "-p", `${port}:5000`, "--name", imageName, const process = spawn('docker', ["run", "--rm", "-p", `${port}:5000`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container"]); registry_url + imageName, resource_id, imageName, port, "container"]);
let result = ""; let result = "";
// timeStart = Date.now() // timeStart = Date.now()
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference); logger.info("container run time taken: ", timeDifference);
resolve(result); resolve(result);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`); logger.error(`stderr: ${data}`);
reject(data); reject(data);
}); });
process.on('close', (code) => { process.on('close', (code) => {
console.log("Exiting container"); logger.info("Exiting container");
}) })
} }
......
...@@ -9,6 +9,7 @@ let grunt = spawn('./grunt', [node_id]) ...@@ -9,6 +9,7 @@ let grunt = spawn('./grunt', [node_id])
const execute = require('./execute') const execute = require('./execute')
const fs = require('fs') const fs = require('fs')
const kafka = require('kafka-node') const kafka = require('kafka-node')
const logger = libSupport.logger
const local_repository = __dirname + "/local_repository/" const local_repository = __dirname + "/local_repository/"
const host_url = "http://" + constants.master_address + ":" + constants.master_port const host_url = "http://" + constants.master_address + ":" + constants.master_port
...@@ -22,7 +23,7 @@ let Producer = kafka.Producer, ...@@ -22,7 +23,7 @@ let Producer = kafka.Producer,
Consumer = kafka.Consumer Consumer = kafka.Consumer
libSupport.makeTopic(node_id).then(() => { libSupport.makeTopic(node_id).then(() => {
console.log("node topic created") logger.info("node topic created")
let consumer = new Consumer(client, let consumer = new Consumer(client,
[ [
{ topic: node_id, partition: 0, offset: 0 } { topic: node_id, partition: 0, offset: 0 }
...@@ -31,7 +32,7 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -31,7 +32,7 @@ libSupport.makeTopic(node_id).then(() => {
{ autoCommit: true } { autoCommit: true }
]) ])
consumer.on('message', function (message) { consumer.on('message', function (message) {
console.log(message); logger.info(message);
let topic = message.topic let topic = message.topic
message = message.value message = message.value
message = JSON.parse(message) message = JSON.parse(message)
...@@ -43,7 +44,7 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -43,7 +44,7 @@ libSupport.makeTopic(node_id).then(() => {
* Download necessary files (function file) and Start resource deployment * Download necessary files (function file) and Start resource deployment
*/ */
if (message.type === "execute") { if (message.type === "execute") {
console.log("function_id", resource_id); logger.info("Received Deployment request for resource_id: " + resource_id);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => { libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
startWorker(local_repository, functionHash, resource_id, producer, runtime, port) startWorker(local_repository, functionHash, resource_id, producer, runtime, port)
}) })
...@@ -63,7 +64,7 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -63,7 +64,7 @@ libSupport.makeTopic(node_id).then(() => {
* @param {Number} port * @param {Number} port
*/ */
function startWorker(local_repository, functionHash, resource_id, producer, runtime, port) { function startWorker(local_repository, functionHash, resource_id, producer, runtime, port) {
console.log("Using port", port, "for functionHash", functionHash); logger.info(`Using port ${port} for functionHash ${functionHash}`);
fs.writeFile('./local_repository/config.json', JSON.stringify({port, functionHash, resource_id, runtime}), () => { fs.writeFile('./local_repository/config.json', JSON.stringify({port, functionHash, resource_id, runtime}), () => {
if (runtime === "isolate") if (runtime === "isolate")
...@@ -95,56 +96,20 @@ function heartbeat() { ...@@ -95,56 +96,20 @@ function heartbeat() {
grunt.stdout.on('data', data => { grunt.stdout.on('data', data => {
console.log(data.toString()); logger.info(data.toString());
}) })
grunt.stderr.on('data', data => { grunt.stderr.on('data', data => {
console.log(data.toString()); logger.info(data.toString());
}) })
grunt.on('close', (code) => { grunt.on('close', (code) => {
console.log("Grunt exited with exit code", code); logger.info("Grunt exited with exit code", code);
}) })
setInterval(heartbeat, 1000); setInterval(heartbeat, 1000);
/**
* Channel LOG_COMMON
Source: Executor
{
"node_id"
"resource_id"
"function_id"
"status": true/false
"reason": "deployed / exd"
}
Source: Executor
{
"node_id"
"resource_id"
"function_id"
"usage": {
"cpu"
"memory"
"network"
}
}
Source: ReverseProxy
{
"node_id"
"resource_id"
"function_id"
"average_fn_time"
}
Source: Dispatch Manager
{
"node_id"
"resource_id"
"function_id"
"coldstart_time"
}
*/
...@@ -4,6 +4,10 @@ const process = require('process') ...@@ -4,6 +4,10 @@ const process = require('process')
const { spawnSync } = require('child_process'); const { spawnSync } = require('child_process');
const constants = require(".././constants.json") const constants = require(".././constants.json")
const kafka = require('kafka-node') const kafka = require('kafka-node')
const winston = require('winston')
const { createLogger, format, transports } = winston;
function updateConfig() { function updateConfig() {
console.log("Retrieving primary IP"); console.log("Retrieving primary IP");
let file = JSON.parse(fs.readFileSync('./config.json', { encoding: 'utf-8' })) let file = JSON.parse(fs.readFileSync('./config.json', { encoding: 'utf-8' }))
...@@ -83,6 +87,32 @@ function returnPort(port, usedPort) { ...@@ -83,6 +87,32 @@ function returnPort(port, usedPort) {
usedPort.delete((port)) usedPort.delete((port))
} }
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
format.timestamp(),
format.json()
),
defaultMeta: { module: 'Dispatch Agent' },
transports: [
//
// - Write to all logs with level `info` and below to `combined.log`
// - Write all logs error (and below) to `error.log`.
//
new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
new winston.transports.File({ filename: 'log/combined.log' }),
new winston.transports.Console({
format: winston.format.combine(
format.colorize({ all: true }),
format.timestamp(),
format.simple()
)
})
]
});
module.exports = { module.exports = {
download, makeid, updateConfig, makeTopic, returnPort download, makeid, updateConfig, makeTopic, returnPort, logger
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment