Commit 0ad5b02d authored by Nilanjan Daw's avatar Nilanjan Daw

process deployment working

Added long running processes and deployment pipeline bypass support
parent 6438b7aa
...@@ -28,17 +28,19 @@ function runIsolate(filename) { ...@@ -28,17 +28,19 @@ function runIsolate(filename) {
} }
function runProcess(local_repository, functionHash) { function runProcess(local_repository, functionHash, port) {
let filename = local_repository + functionHash let filename = local_repository + functionHash
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timeStart = Date.now() let timeStart = Date.now()
const process = spawn('node', [filename]); const process = spawn('node', [filename, port]);
let result = ""; let result = "";
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); console.log(`stdout: ${data}`);
result += data; result += data;
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference); console.log("process time taken: ", timeDifference);
workerEvent.emit('start', functionHash, port, "process")
resolve(result);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
...@@ -48,7 +50,7 @@ function runProcess(local_repository, functionHash) { ...@@ -48,7 +50,7 @@ function runProcess(local_repository, functionHash) {
process.on('close', (code) => { process.on('close', (code) => {
console.log(`child process exited with code ${code}`); console.log(`child process exited with code ${code}`);
resolve(result); workerEvent.emit('end', port, "process");
}); });
}) })
......
...@@ -70,17 +70,18 @@ function startWorker(local_repository, functionHash,function_id, producer, runti ...@@ -70,17 +70,18 @@ function startWorker(local_repository, functionHash,function_id, producer, runti
}], () => { }) }], () => { })
}) })
else if (runtime === "process") else if (runtime === "process")
execute.runProcess(local_repository, functionHash, port).then(result => { execute.runProcess(local_repository, functionHash, port)
producer.send( // .then(result => {
[{ // producer.send(
topic: "response", // [{
messages: JSON.stringify({ // topic: "response",
status: "success", // messages: JSON.stringify({
result, // status: "success",
function_id // result,
}) // function_id
}], () => { }) // })
}) // }], () => { })
// })
else if (runtime === "container") else if (runtime === "container")
execute.runContainer(functionHash, port) execute.runContainer(functionHash, port)
// .then(result => { // .then(result => {
......
...@@ -217,6 +217,7 @@ consumer.on('message', function (message) { ...@@ -217,6 +217,7 @@ consumer.on('message', function (message) {
} else if (topic == "deployed") { } else if (topic == "deployed") {
message = JSON.parse(message) message = JSON.parse(message)
console.log("deployed", message);
if (db.has(message.functionHash + message.runtime)) { if (db.has(message.functionHash + message.runtime)) {
let { req, res } = db.get(message.functionHash + message.runtime) let { req, res } = db.get(message.functionHash + message.runtime)
...@@ -227,7 +228,11 @@ consumer.on('message', function (message) { ...@@ -227,7 +228,11 @@ consumer.on('message', function (message) {
node_id: message.node_id node_id: message.node_id
}) })
libSupport.reverseProxy(req, res, `http://${message.node_id}:${message.port}/serverless/function/execute`) libSupport.reverseProxy(req, res,
`http://${message.node_id}:${message.port}/serverless/function/execute`)
.then(() => {
db.delete(message.functionHash + message.runtime)
})
} }
} else if (topic == "removeWorker") { } else if (topic == "removeWorker") {
console.log("removing metadata", message); console.log("removing metadata", message);
......
...@@ -28,24 +28,28 @@ function makeid(length) { ...@@ -28,24 +28,28 @@ function makeid(length) {
} }
function reverseProxy(req, res, url) { function reverseProxy(req, res, url) {
console.log("requesting reverseproxy"); return new Promise((resolve, reject) => {
console.log("requesting reverseproxy");
var options = {
method: 'POST', var options = {
uri: url, method: 'POST',
body: req.body, uri: url,
json: true // Automatically stringifies the body to JSON body: req.body,
}; json: true // Automatically stringifies the body to JSON
};
rp(options) rp(options)
.then(function (parsedBody) { .then(function (parsedBody) {
console.log("parsed body:", parsedBody); console.log("parsed body:", parsedBody);
res.json(parsedBody) res.json(parsedBody)
}) resolve()
.catch(function (err) { })
console.log("error", err.message); .catch(function (err) {
res.json(err.message).status(err.statusCode) console.log("error", err.message);
}); res.json(err.message).status(err.statusCode)
resolve()
});
})
} }
module.exports.makeid = makeid; module.exports.makeid = makeid;
......
...@@ -4,7 +4,7 @@ const bodyParser = require('body-parser') ...@@ -4,7 +4,7 @@ const bodyParser = require('body-parser')
const app = express() const app = express()
app.use(bodyParser.urlencoded({ extended: true })) app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json()) app.use(bodyParser.json())
let port = 5000, lastRequest = Date.now() let port = (process.argv[2] != null)? parseInt(process.argv[2]): 5000, lastRequest = Date.now()
app.post('/serverless/function/execute/', (req, res) => { app.post('/serverless/function/execute/', (req, res) => {
let payload = req.body let payload = req.body
lastRequest = Date.now() lastRequest = Date.now()
...@@ -22,7 +22,7 @@ app.listen(port, () => console.log(`Server listening on port ${port}!`)) ...@@ -22,7 +22,7 @@ app.listen(port, () => console.log(`Server listening on port ${port}!`))
function shouldDie() { function shouldDie() {
if (Date.now() - lastRequest > 300 * 1000) { if (Date.now() - lastRequest > 5 * 1000) {
console.log("Idle for too long. Exiting"); console.log("Idle for too long. Exiting");
process.exit(0) process.exit(0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment