Commit ca680d11 authored by Nilanjan Daw's avatar Nilanjan Daw

Started Improved logging

parent 3020d9a5
...@@ -18,7 +18,6 @@ function runIsolate(local_repository, functionHash, port, resource_id) { ...@@ -18,7 +18,6 @@ function runIsolate(local_repository, functionHash, port, resource_id) {
console.log("worker exited"); console.log("worker exited");
}) })
worker.on('online', () => { worker.on('online', () => {
workerEvent.emit('start', functionHash, port, "isolate")
resolve() resolve()
}) })
...@@ -132,5 +131,4 @@ function runContainer(imageName, port, resource_id) { ...@@ -132,5 +131,4 @@ function runContainer(imageName, port, resource_id) {
module.exports.runContainer = runContainer; module.exports.runContainer = runContainer;
module.exports.runProcess = runProcess; module.exports.runProcess = runProcess;
module.exports.runIsolate = runIsolate; module.exports.runIsolate = runIsolate;
module.exports.workerEvent = workerEvent; \ No newline at end of file
\ No newline at end of file
...@@ -65,7 +65,6 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -65,7 +65,6 @@ libSupport.makeTopic(node_id).then(() => {
function startWorker(local_repository, functionHash, resource_id, producer, runtime, port) { function startWorker(local_repository, functionHash, resource_id, producer, runtime, port) {
console.log("Using port", port, "for functionHash", functionHash); console.log("Using port", port, "for functionHash", functionHash);
usedPort.set(port, functionHash)
fs.writeFile('./local_repository/config.json', JSON.stringify({port, functionHash, resource_id, runtime}), () => { fs.writeFile('./local_repository/config.json', JSON.stringify({port, functionHash, resource_id, runtime}), () => {
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository, functionHash, port, resource_id) execute.runIsolate(local_repository, functionHash, port, resource_id)
......
*
!.gitignore
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"redis": "^2.8.0" "redis": "^2.8.0",
"winston": "^3.2.1"
} }
} }
...@@ -7,11 +7,10 @@ const constants = require('.././constants.json') ...@@ -7,11 +7,10 @@ const constants = require('.././constants.json')
const fs = require('fs') const fs = require('fs')
const { spawn } = require('child_process'); const { spawn } = require('child_process');
const morgan = require('morgan') const morgan = require('morgan')
const mqtt = require('mqtt')
// const client = mqtt.connect('mqtt://' + constants.mqtt_url)
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
const logger = libSupport.logger
/** /**
* functionToPort maps the function and its respective port mapping * functionToPort maps the function and its respective port mapping
* TODO: change this to hold a list of mappings of horizontal scaling * TODO: change this to hold a list of mappings of horizontal scaling
...@@ -71,7 +70,7 @@ app.post('/serverless/deploy', (req, res) => { ...@@ -71,7 +70,7 @@ app.post('/serverless/deploy', (req, res) => {
file.mv(file_path + functionHash, function (err) { file.mv(file_path + functionHash, function (err) {
functionHash = libSupport.generateExecutor(file_path, functionHash) functionHash = libSupport.generateExecutor(file_path, functionHash)
if (err) { if (err) {
console.log(err); logger.error(err)
res.send("error").status(400) res.send("error").status(400)
} }
else { else {
...@@ -117,27 +116,28 @@ function deployContainer(path, imageName) { ...@@ -117,27 +116,28 @@ function deployContainer(path, imageName) {
ENTRYPOINT ["node", "${imageName}.js"]` ENTRYPOINT ["node", "${imageName}.js"]`
, function (err) { , function (err) {
if (err) { if (err) {
console.log("failed", err); logger.error("failed", err);
reject(err); reject(err);
} }
else { else {
console.log('Dockerfile created'); logger.info('Dockerfile created');
const process = spawn('docker', ["build", "-t", registry_url + imageName, path]); const process = spawn('docker', ["build", "-t", registry_url + imageName, path]);
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); logger.info(`stdout: ${data}`);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`); logger.error(`stderr: ${data}`);
}); });
process.on('close', (code) => { process.on('close', (code) => {
console.log(`child process exited with code ${code}`); logger.warn(`child process exited with code ${code}`);
let timeDifference = Math.ceil((Date.now() - buildStart)) let timeDifference = Math.ceil((Date.now() - buildStart))
console.log("image build time taken: ", timeDifference); logger.info("image build time taken: ", timeDifference);
const process_push = spawn('docker', ["push", registry_url + imageName]); const process_push = spawn('docker', ["push", registry_url + imageName]);
process_push.stdout.on('data', (data) => { process_push.stdout.on('data', (data) => {
...@@ -146,11 +146,11 @@ function deployContainer(path, imageName) { ...@@ -146,11 +146,11 @@ function deployContainer(path, imageName) {
}); });
process_push.stderr.on('data', (data) => { process_push.stderr.on('data', (data) => {
console.error(`stderr: ${data}`); logger.error(`stderr: ${data}`);
}); });
process_push.on('close', (code) => { process_push.on('close', (code) => {
console.log("image pushed to repository"); logger.info("image pushed to repository");
resolve(); resolve();
}) })
...@@ -171,7 +171,7 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -171,7 +171,7 @@ app.post('/serverless/execute/:id', (req, res) => {
*/ */
let forwardTo = functionToResource.get(req.params.id + runtime) let forwardTo = functionToResource.get(req.params.id + runtime)
let resource = resourceMap.get(forwardTo.resource_id) let resource = resourceMap.get(forwardTo.resource_id)
console.log("resource found", forwardTo, resource); logger.info("resource found", forwardTo, resource);
libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`) libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`)
} else { } else {
...@@ -196,18 +196,18 @@ function dispatch() { ...@@ -196,18 +196,18 @@ function dispatch() {
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length) let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
for (let i = 0; i < lookbackWindow; i++) { for (let i = 0; i < lookbackWindow; i++) {
let {req, res} = requestQueue.shift() let {req, res} = requestQueue.shift()
console.log(req.body) logger.info(req.body)
let runtime = req.body.runtime let runtime = req.body.runtime
let functionHash = req.params.id let functionHash = req.params.id
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
console.log("Dispatching function with Id", resource_id, runtime); logger.info("Dispatching function with Id", resource_id, runtime);
let node_id = getAddress() // Requests the RM for address and other metadata for function placement let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM
resourceMap.set(resource_id, { resourceMap.set(resource_id, {
runtime, functionHash, port, node_id runtime, functionHash, port, node_id
}) })
console.log(resourceMap); logger.info(resourceMap);
let payload = [{ let payload = [{
topic: node_id, topic: node_id,
...@@ -245,17 +245,15 @@ function getAddress() { ...@@ -245,17 +245,15 @@ function getAddress() {
return workerNodes[Math.floor(Math.random() * workerNodes.length)]; return workerNodes[Math.floor(Math.random() * workerNodes.length)];
} }
app.listen(port, () => console.log(`Server listening on port ${port}!`))
consumer.on('message', function (message) { consumer.on('message', function (message) {
let topic = message.topic let topic = message.topic
message = message.value message = message.value
if (topic === "response") { if (topic === "response") {
console.log("response", message); logger.info("response", message);
// message = JSON.parse(message) // message = JSON.parse(message)
// console.log(message); // logger.info(message);
// let {req, res} = db.get(message.function_id) // let {req, res} = db.get(message.function_id)
// if (res != null) // if (res != null)
// res.json({ // res.json({
...@@ -268,7 +266,7 @@ consumer.on('message', function (message) { ...@@ -268,7 +266,7 @@ consumer.on('message', function (message) {
if (Date.now() - message.timestamp < 300) if (Date.now() - message.timestamp < 300)
if (workerNodes.indexOf(message.address) === -1) { if (workerNodes.indexOf(message.address) === -1) {
workerNodes.push(message.address) workerNodes.push(message.address)
console.log(workerNodes); logger.warn(workerNodes);
} }
} else if (topic == "deployed") { } else if (topic == "deployed") {
try { try {
...@@ -277,7 +275,7 @@ consumer.on('message', function (message) { ...@@ -277,7 +275,7 @@ consumer.on('message', function (message) {
// process.exit(0) // process.exit(0)
} }
console.log("deployed", message); logger.info("deployed", message);
if (db.has(message.functionHash + message.runtime)) { if (db.has(message.functionHash + message.runtime)) {
let { req, res } = db.get(message.functionHash + message.runtime) let { req, res } = db.get(message.functionHash + message.runtime)
...@@ -292,7 +290,7 @@ consumer.on('message', function (message) { ...@@ -292,7 +290,7 @@ consumer.on('message', function (message) {
}) })
} }
} else if (topic == "removeWorker") { } else if (topic == "removeWorker") {
console.log("removing metadata", message); logger.info("removing metadata", message);
try { try {
message = JSON.parse(message) message = JSON.parse(message)
} catch(e) { } catch(e) {
...@@ -305,15 +303,15 @@ consumer.on('message', function (message) { ...@@ -305,15 +303,15 @@ consumer.on('message', function (message) {
} else if (topic == "RESPONSE_ARBITER_2_DISPATCHER") { } else if (topic == "RESPONSE_ARBITER_2_DISPATCHER") {
message = JSON.parse(message) message = JSON.parse(message)
console.log(message); logger.info(message);
let payload = rmQueue.get(message.id) let payload = rmQueue.get(message.id)
if (payload != null) { if (payload != null) {
payload[0].topic = getAddress() payload[0].topic = getAddress()
console.log(payload); logger.info(payload);
producer.send(payload, () => { }) producer.send(payload, () => { })
} else { } else {
console.log("something went wrong"); logger.error("something went wrong");
} }
...@@ -321,6 +319,7 @@ consumer.on('message', function (message) { ...@@ -321,6 +319,7 @@ consumer.on('message', function (message) {
}); });
setInterval(dispatch, 2000); setInterval(dispatch, 2000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
// { // {
// id: "!!!!!", // id: "!!!!!",
......
const crypto = require('crypto'); const crypto = require('crypto');
const fs = require('fs') const fs = require('fs')
var rp = require('request-promise'); const rp = require('request-promise');
const winston = require('winston')
const { createLogger, format, transports } = winston;
/** /**
* Generates unique IDs of arbitrary length * Generates unique IDs of arbitrary length
...@@ -83,6 +86,31 @@ function makeid(length) { ...@@ -83,6 +86,31 @@ function makeid(length) {
return port return port
} }
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
format.timestamp(),
format.json()
),
defaultMeta: { module: 'Dispatch Manager' },
transports: [
//
// - Write to all logs with level `info` and below to `combined.log`
// - Write all logs error (and below) to `error.log`.
//
new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
new winston.transports.File({ filename: 'log/combined.log' }),
new winston.transports.Console({
format: winston.format.combine(
format.colorize({ all: true }),
format.timestamp(),
format.simple()
)
})
]
});
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, getPort makeid, generateExecutor, reverseProxy, getPort, logger
} }
\ No newline at end of file
*
!.gitignore
\ No newline at end of file
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.88.0", "request": "^2.88.0",
"request-promise": "^4.2.5", "request-promise": "^4.2.5",
"save": "^2.4.0" "save": "^2.4.0",
"winston": "^3.2.1"
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment