Commit 6438b7aa authored by Nilanjan Daw's avatar Nilanjan Daw

container deployment working

Reverse proxy based container worker working
parent 26565f8c
{"id":"10.130.150.246","master_node":"10.129.6.5"} {"id":"192.168.31.51","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
...@@ -102,6 +102,8 @@ function runContainer(imageName, port) { ...@@ -102,6 +102,8 @@ function runContainer(imageName, port) {
} else { } else {
const process_checkContainer = spawn('docker', ['container', 'inspect', imageName]); const process_checkContainer = spawn('docker', ['container', 'inspect', imageName]);
process_checkContainer.on('close', (code) => { process_checkContainer.on('close', (code) => {
console.log("container starting at port", port);
if (code != 0) { if (code != 0) {
const process = spawn('docker', ["run", "-p", `${port}:5000`, "--name", imageName, registry_url + imageName]); const process = spawn('docker', ["run", "-p", `${port}:5000`, "--name", imageName, registry_url + imageName]);
let result = ""; let result = "";
...@@ -124,14 +126,16 @@ function runContainer(imageName, port) { ...@@ -124,14 +126,16 @@ function runContainer(imageName, port) {
workerEvent.emit('end', port, "container"); workerEvent.emit('end', port, "container");
}) })
} else { } else {
const process = spawn('docker', ["start", "-a", imageName]); const clean_container = spawn('docker', ['rm', imageName])
clean_container.on('close', code => {
const process = spawn('docker', ["run", "-p", `${port}:5000`, "--name", imageName, registry_url + imageName]);
let result = ""; let result = "";
timeStart = Date.now() timeStart = Date.now()
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference); console.log("container run time taken: ", timeDifference);
result += data; // result += data;
workerEvent.emit('start', imageName, port, "container") workerEvent.emit('start', imageName, port, "container")
resolve(result); resolve(result);
}); });
...@@ -144,6 +148,8 @@ function runContainer(imageName, port) { ...@@ -144,6 +148,8 @@ function runContainer(imageName, port) {
process.on('close', (code) => { process.on('close', (code) => {
workerEvent.emit('end', port, "container"); workerEvent.emit('end', port, "container");
}) })
})
} }
}) })
......
...@@ -55,6 +55,8 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -55,6 +55,8 @@ libSupport.makeTopic(node_id).then(() => {
function startWorker(local_repository, functionHash,function_id, producer, runtime) { function startWorker(local_repository, functionHash,function_id, producer, runtime) {
let port = libSupport.getPort(usedPort) let port = libSupport.getPort(usedPort)
console.log("Using port", port, "for functionHash", functionHash);
usedPort.set(port, functionHash) usedPort.set(port, functionHash)
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository + functionHash).then(result => { execute.runIsolate(local_repository + functionHash).then(result => {
...@@ -123,17 +125,16 @@ execute.workerEvent.on("start", (functionHash, port, runtime) => { ...@@ -123,17 +125,16 @@ execute.workerEvent.on("start", (functionHash, port, runtime) => {
}) })
execute.workerEvent.on('end', (port) => { execute.workerEvent.on('end', (port, runtime) => {
let functionHash = usedPort.get(port) let functionHash = usedPort.get(port)
usedPort.delete(port) usedPort.delete(port)
producer.send( producer.send(
[{ [{
topic: "removeWorker", topic: "removeWorker",
messages: JSON.stringify({ functionHash, port, runtime, node_id }) messages: JSON.stringify({ functionHash, port, runtime, node_id })
}], () => { }) }], () => {
console.log("Ending worker for function", functionHash, usedPort, functionToPort); console.log("Ending worker for function", functionHash, usedPort);
})
}) })
setInterval(heartbeat, 1000); setInterval(heartbeat, 1000);
\ No newline at end of file
...@@ -25,7 +25,10 @@ let kafka = require('kafka-node'), ...@@ -25,7 +25,10 @@ let kafka = require('kafka-node'),
Consumer = kafka.Consumer, Consumer = kafka.Consumer,
consumer = new Consumer(client, consumer = new Consumer(client,
[ [
{ topic: 'response', partition: 0, offset: 0}, { topic: 'heartbeat' }, {topic: "deployed"} { topic: 'response', partition: 0, offset: 0},
{ topic: 'heartbeat' },
{topic: "deployed"},
{ topic: "removeWorker"}
], ],
[ [
{ autoCommit: true } { autoCommit: true }
...@@ -148,8 +151,7 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -148,8 +151,7 @@ app.post('/serverless/execute/:id', (req, res) => {
let forwardTo = functionToPort.get(req.params.id + runtime) let forwardTo = functionToPort.get(req.params.id + runtime)
console.log("resource found", forwardTo); console.log("resource found", forwardTo);
res.redirect(307, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`); libSupport.reverseProxy(req, res, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`)
console.log("sending request to:", `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`);
} else { } else {
requestQueue.push({ req, res }) requestQueue.push({ req, res })
...@@ -180,7 +182,7 @@ function dispatch() { ...@@ -180,7 +182,7 @@ function dispatch() {
partition: 0 partition: 0
}] }]
producer.send(payload, () => {}) producer.send(payload, () => {})
db.set(function_id, res) db.set(functionHash + runtime, {req, res})
} }
} }
...@@ -189,48 +191,22 @@ function getAddress() { ...@@ -189,48 +191,22 @@ function getAddress() {
return workerNodes[Math.floor(Math.random() * workerNodes.length)]; return workerNodes[Math.floor(Math.random() * workerNodes.length)];
} }
// client.on('message', function (topic, message) {
// if (topic === "response") {
// message = JSON.parse(message)
// console.log(message);
// let res = db.get(message.function_id)
// res.json({
// "status": "success",
// "reply": message.result
// })
// db.delete(message.function_id)
// } else if (topic === "heartbeat") {
// message = JSON.parse(message)
// if (workerNodes.indexOf(message.address) === -1) {
// workerNodes.push(message.address)
// console.log(workerNodes);
// }
// }
// })
app.listen(port, () => console.log(`Server listening on port ${port}!`)) app.listen(port, () => console.log(`Server listening on port ${port}!`))
// client.on('connect', function () {
// client.subscribe("response")
// client.subscribe("heartbeat")
// })
consumer.on('message', function (message) { consumer.on('message', function (message) {
// console.log(message); // console.log(message);
let topic = message.topic let topic = message.topic
message = message.value message = message.value
if (topic === "response") { if (topic === "response") {
message = JSON.parse(message) // message = JSON.parse(message)
console.log(message); // console.log(message);
let res = db.get(message.function_id) // let {req, res} = db.get(message.function_id)
if (res != null) // if (res != null)
res.json({ // res.json({
"status": "success", // "status": "success",
"reply": message.result // "reply": message.result
}) // })
db.delete(message.function_id) // db.delete(message.function_id)
} else if (topic === "heartbeat") { } else if (topic === "heartbeat") {
message = JSON.parse(message) message = JSON.parse(message)
if (Date.now() - message.timestamp < 300) if (Date.now() - message.timestamp < 300)
...@@ -239,14 +215,25 @@ consumer.on('message', function (message) { ...@@ -239,14 +215,25 @@ consumer.on('message', function (message) {
console.log(workerNodes); console.log(workerNodes);
} }
} else if (topic == "deployed") { } else if (topic == "deployed") {
let res = db.get(message.function_id)
message = JSON.parse(message) message = JSON.parse(message)
console.log(message);
if (db.has(message.functionHash + message.runtime)) {
let { req, res } = db.get(message.functionHash + message.runtime)
// res.redirect(307, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`); // res.redirect(307, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`);
functionToPort.set(message.functionHash + message.runtime, {port: parseInt(message.port), if (parseInt(message.port) != -1)
node_id: message.node_id}) functionToPort.set(message.functionHash + message.runtime, {
console.log(functionToPort); port: parseInt(message.port),
node_id: message.node_id
})
libSupport.reverseProxy(req, res, `http://${message.node_id}:${message.port}/serverless/function/execute`)
}
} else if (topic == "removeWorker") {
console.log("removing metadata", message);
message = JSON.parse(message)
functionToPort.delete(message.functionHash + message.runtime)
} }
}); });
......
const crypto = require('crypto'); const crypto = require('crypto');
const fs = require('fs') const fs = require('fs')
var rp = require('request-promise');
function makeid(length) { function makeid(length) {
var result = ''; var result = '';
var characters = 'abcdefghijklmnopqrstuvwxyz0123456789'; var characters = 'abcdefghijklmnopqrstuvwxyz0123456789';
...@@ -26,5 +27,27 @@ function makeid(length) { ...@@ -26,5 +27,27 @@ function makeid(length) {
return hash return hash
} }
function reverseProxy(req, res, url) {
console.log("requesting reverseproxy");
var options = {
method: 'POST',
uri: url,
body: req.body,
json: true // Automatically stringifies the body to JSON
};
rp(options)
.then(function (parsedBody) {
console.log("parsed body:", parsedBody);
res.json(parsedBody)
})
.catch(function (err) {
console.log("error", err.message);
res.json(err.message).status(err.statusCode)
});
}
module.exports.makeid = makeid; module.exports.makeid = makeid;
module.exports.generateExecutor = generateExecutor; module.exports.generateExecutor = generateExecutor;
module.exports.reverseProxy = reverseProxy;
\ No newline at end of file
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"nano": "^8.1.0", "nano": "^8.1.0",
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.88.0",
"request-promise": "^4.2.5",
"save": "^2.4.0" "save": "^2.4.0"
} }
} }
...@@ -5,7 +5,7 @@ const app = express() ...@@ -5,7 +5,7 @@ const app = express()
app.use(bodyParser.urlencoded({ extended: true })) app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json()) app.use(bodyParser.json())
let port = 5000, lastRequest = Date.now() let port = 5000, lastRequest = Date.now()
app.post('/serverless/function/execute/:id', (req, res) => { app.post('/serverless/function/execute/', (req, res) => {
let payload = req.body let payload = req.body
lastRequest = Date.now() lastRequest = Date.now()
executor(payload).then((result) => { executor(payload).then((result) => {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment