Commit e728a26d authored by Nilanjan Daw's avatar Nilanjan Daw

Streamlining function deployment and execution

Started streamlining function execution. Infra added for bypassing of function deployment for every function call and reusing of long running containers
parent 91956eb4
{"id":"192.168.1.103","master_node":"10.129.6.5"} {"id":"10.196.11.241","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
'use strict';
const isolateBackend = require('./isolate') const isolateBackend = require('./isolate')
const fs = require('fs') const fs = require('fs')
const { spawn } = require('child_process'); const { spawn } = require('child_process');
const registry_url = "10.129.6.5:5000/" const registry_url = "10.129.6.5:5000/"
const events = require('events');
const workerEvent = new events.EventEmitter();
function runIsolate(filename) { function runIsolate(filename) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
...@@ -25,8 +28,8 @@ function runIsolate(filename) { ...@@ -25,8 +28,8 @@ function runIsolate(filename) {
} }
function runProcess(filename) { function runProcess(local_repository, functionHash) {
let filename = local_repository + functionHash
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timeStart = Date.now() let timeStart = Date.now()
const process = spawn('node', [filename]); const process = spawn('node', [filename]);
...@@ -52,7 +55,7 @@ function runProcess(filename) { ...@@ -52,7 +55,7 @@ function runProcess(filename) {
} }
function runContainer(imageName) { function runContainer(imageName, port) {
console.log(imageName); console.log(imageName);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
...@@ -81,6 +84,8 @@ function runContainer(imageName) { ...@@ -81,6 +84,8 @@ function runContainer(imageName) {
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference); console.log("container run time taken: ", timeDifference);
result += data; result += data;
workerEvent.emit('start', imageName, port)
resolve(result);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
...@@ -89,7 +94,7 @@ function runContainer(imageName) { ...@@ -89,7 +94,7 @@ function runContainer(imageName) {
}); });
process.on('close', (code) => { process.on('close', (code) => {
resolve(result); workerEvent.emit('end', port);
}) })
} }
...@@ -98,14 +103,16 @@ function runContainer(imageName) { ...@@ -98,14 +103,16 @@ function runContainer(imageName) {
const process_checkContainer = spawn('docker', ['container', 'inspect', imageName]); const process_checkContainer = spawn('docker', ['container', 'inspect', imageName]);
process_checkContainer.on('close', (code) => { process_checkContainer.on('close', (code) => {
if (code != 0) { if (code != 0) {
const process = spawn('docker', ["run", "--name", imageName, registry_url + imageName]); const process = spawn('docker', ["run", "-p", `${port}:5000`, "--name", imageName, registry_url + imageName]);
let result = ""; let result = "";
timeStart = Date.now() timeStart = Date.now()
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference); console.log("container run time taken: ", timeDifference);
result += data; // result += data;
workerEvent.emit('start', imageName, port)
resolve(result);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
...@@ -114,7 +121,7 @@ function runContainer(imageName) { ...@@ -114,7 +121,7 @@ function runContainer(imageName) {
}); });
process.on('close', (code) => { process.on('close', (code) => {
resolve(result); workerEvent.emit('end', port);
}) })
} else { } else {
const process = spawn('docker', ["start", "-a", imageName]); const process = spawn('docker', ["start", "-a", imageName]);
...@@ -125,6 +132,8 @@ function runContainer(imageName) { ...@@ -125,6 +132,8 @@ function runContainer(imageName) {
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference); console.log("container run time taken: ", timeDifference);
result += data; result += data;
workerEvent.emit('start', imageName, port)
resolve(result);
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
...@@ -133,7 +142,7 @@ function runContainer(imageName) { ...@@ -133,7 +142,7 @@ function runContainer(imageName) {
}); });
process.on('close', (code) => { process.on('close', (code) => {
resolve(result); workerEvent.emit('end', port);
}) })
} }
}) })
...@@ -150,3 +159,4 @@ function runContainer(imageName) { ...@@ -150,3 +159,4 @@ function runContainer(imageName) {
module.exports.runContainer = runContainer; module.exports.runContainer = runContainer;
module.exports.runProcess = runProcess; module.exports.runProcess = runProcess;
module.exports.runIsolate = runIsolate; module.exports.runIsolate = runIsolate;
module.exports.workerEvent = workerEvent;
\ No newline at end of file
...@@ -10,6 +10,8 @@ const kafka = require('kafka-node') ...@@ -10,6 +10,8 @@ const kafka = require('kafka-node')
const local_repository = __dirname + "/local_repository/" const local_repository = __dirname + "/local_repository/"
const host_url = "http://" + constants.master_address + ":" + constants.master_port const host_url = "http://" + constants.master_address + ":" + constants.master_port
let functionToPort = new Map(), usedPort = new Map()
let Producer = kafka.Producer, let Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage, KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -52,6 +54,9 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -52,6 +54,9 @@ libSupport.makeTopic(node_id).then(() => {
function startWorker(local_repository, functionHash,function_id, producer, runtime) { function startWorker(local_repository, functionHash,function_id, producer, runtime) {
let port = libSupport.getPort(usedPort)
usedPort.set(port, functionHash)
functionToPort.set(functionHash, port)
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository + functionHash).then(result => { execute.runIsolate(local_repository + functionHash).then(result => {
producer.send([{ producer.send([{
...@@ -64,7 +69,7 @@ function startWorker(local_repository, functionHash,function_id, producer, runti ...@@ -64,7 +69,7 @@ function startWorker(local_repository, functionHash,function_id, producer, runti
}], () => { }) }], () => { })
}) })
else if (runtime === "process") else if (runtime === "process")
execute.runProcess(local_repository + functionHash).then(result => { execute.runProcess(local_repository, functionHash, port).then(result => {
producer.send( producer.send(
[{ [{
topic: "response", topic: "response",
...@@ -76,7 +81,7 @@ function startWorker(local_repository, functionHash,function_id, producer, runti ...@@ -76,7 +81,7 @@ function startWorker(local_repository, functionHash,function_id, producer, runti
}], () => { }) }], () => { })
}) })
else if (runtime === "container") else if (runtime === "container")
execute.runContainer(functionHash).then(result => { execute.runContainer(functionHash, port).then(result => {
producer.send( producer.send(
[{ [{
topic: "response", topic: "response",
...@@ -108,4 +113,18 @@ function heartbeat() { ...@@ -108,4 +113,18 @@ function heartbeat() {
}) })
} }
execute.workerEvent.on("start", (port, functionHash) => {
console.log("started function Port: ", port, functionHash);
})
execute.workerEvent.on('end', (port) => {
let functionHash = usedPort.get(port)
usedPort.delete(port)
functionToPort.delete(functionHash)
console.log("Ending worker for function", functionHash, usedPort, functionToPort);
})
setInterval(heartbeat, 1000); setInterval(heartbeat, 1000);
\ No newline at end of file
...@@ -76,6 +76,25 @@ function makeid(length) { ...@@ -76,6 +76,25 @@ function makeid(length) {
return result; return result;
} }
function getPort(usedPort) {
let port = -1, ctr = 0
do {
min = Math.ceil(30000);
max = Math.floor(60000);
port = Math.floor(Math.random() * (max - min + 1)) + min;
ctr += 1;
if (ctr > 30000) {
port = -1
break
}
} while (usedPort.has(port))
return port
}
function returnPort(port, usedPort) {
usedPort.delete((port))
}
module.exports = { module.exports = {
download, makeid, updateConfig, makeTopic download, makeid, updateConfig, makeTopic, getPort, returnPort
} }
\ No newline at end of file
...@@ -53,6 +53,7 @@ app.post('/serverless/deploy', (req, res) => { ...@@ -53,6 +53,7 @@ app.post('/serverless/deploy', (req, res) => {
let functionHash = file.md5 let functionHash = file.md5
file.mv(file_path + functionHash, function (err) { file.mv(file_path + functionHash, function (err) {
functionHash = libSupport.generateExecutor(file_path, functionHash)
if (err) { if (err) {
console.log(err); console.log(err);
res.send("error").status(400) res.send("error").status(400)
......
const crypto = require('crypto');
const fs = require('fs')
function makeid(length) { function makeid(length) {
var result = ''; var result = '';
var characters = 'abcdefghijklmnopqrstuvwxyz0123456789'; var characters = 'abcdefghijklmnopqrstuvwxyz0123456789';
...@@ -8,4 +10,21 @@ function makeid(length) { ...@@ -8,4 +10,21 @@ function makeid(length) {
return result; return result;
} }
function generateExecutor(functionPath, functionHash) {
input = fs.readFileSync('./repository/worker_env/env.js')
functionFile = fs.readFileSync(functionPath + functionHash)
searchSize = "(resolve, reject) => {".length
insertIndex = input.indexOf("(resolve, reject) => {") + searchSize
output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
let hash = crypto.createHash('md5').update(output).digest("hex");
console.log(hash);
fs.writeFileSync(functionPath + hash, output)
return hash
}
module.exports.makeid = makeid; module.exports.makeid = makeid;
module.exports.generateExecutor = generateExecutor;
\ No newline at end of file
* *
!.gitignore !.gitignore
!worker_env
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment