Commit eb2f577c authored by nilanjandaw's avatar nilanjandaw

decoupled dispatcher and dispatch daemon

adding setup for multi-host deployment. Added comms via MQTT. Container deployment broken
parent 9f91906a
const isolateBackend = require('./isolate')
const fs = require('fs')
const { spawn } = require('child_process');
function runIsolate(filename) {
return new Promise((resolve, reject) => {
let timeStart = Date.now()
let { isolate, context } = isolateBackend.createIsolate();
fs.readFile(filename, 'utf-8', (err, data) => {
if (err)
reject(err);
isolate.compileScript(data).then(script => {
script.run(context)
.then(result => {
console.log(result);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("isolate time taken: ", timeDifference);
resolve();
});
}).catch(err => { reject(err) })
});
});
}
function runProcess(filename) {
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('node', [filename]);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference);
});
process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
console.log(`child process exited with code ${code}`);
resolve();
});
})
}
function runContainer(imageName) {
console.log(imageName);
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('docker', ["run", "--name", imageName, imageName]);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference);
});
process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
resolve(code);
})
})
}
module.exports.runContainer = runContainer;
module.exports.runProcess = runProcess;
module.exports.runIsolate = runIsolate;
\ No newline at end of file
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')
const libSupport = require('../dispatcher/lib')
const execute = require('./execute')
const node_id = "20sez54hq8"
client.on('connect', function () {
client.subscribe(node_id, function (err) {
if (!err) {
console.log("node listening to id", node_id);
}
})
})
client.on('message', function (topic, message) {
// message is Buffer
message = JSON.parse(message)
if (message.type !== 'heartbeat')
console.log(message.toString())
let runtime = message.runtime
let functionHash = message.functionHash
if (message.type === "execute") {
if (runtime === "isolate")
execute.runIsolate('../dispatcher/test/' + functionHash).then(() => client.publish(node_id, JSON.stringify({ status: "success" })))
else if (runtime === "process")
execute.runProcess('../dispatcher/test/' + functionHash).then(() => client.publish(node_id, JSON.stringify({ status: "success" })))
else if (runtime === "container")
execute.runContainer(functionHash).then(() => client.publish(node_id, JSON.stringify({ status: "success" })))
else {
client.publish(node_id, JSON.stringify({ status: "unknown runtime" }))
return
}
}
})
function heartbeat() {
client.publish(node_id, JSON.stringify({"type": "heartbeat"}))
}
setInterval(heartbeat, 10000);
\ No newline at end of file
{
"name": "hybrid",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node index.js"
},
"author": "",
"license": "ISC",
"dependencies": {
"body-parser": "^1.19.0",
"express": "^4.17.1",
"express-fileupload": "^1.1.6",
"isolated-vm": "^3.0.0",
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
"redis": "^2.8.0"
}
}
const fs = require('fs')
"use strict";
const isolateBackend = require('./isolate')
const { spawn } = require('child_process');
const libSupport = require('./lib')
const fs = require('fs')
const express = require('express')
const bodyParser = require('body-parser')
const fileUpload = require('express-fileupload');
const redis = require('redis')
const morgan = require('morgan')
let client = redis.createClient();
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')
const app = express()
app.use(morgan('combined'))
app.use(bodyParser.urlencoded({ extended: false }))
app.use(bodyParser.json())
app.use(fileUpload())
const node_id = "20sez54hq8"
const port = 8080
......@@ -27,7 +23,6 @@ app.post('/serverless/deploy', (req, res) => {
let file = req.files.serverless
let functionHash = file.md5
console.log("sa");
file.mv(__dirname + "/test/" + functionHash, function (err) {
if (err) {
......@@ -59,75 +54,11 @@ app.post('/serverless/deploy', (req, res) => {
})
app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime
let functionHash = req.params.id
if (runtime === "isolate")
runIsolate('./test/' + functionHash).then(() => res.json({status: "success"}))
else if (runtime === "process")
runProcess('./test/' + functionHash).then(() => res.json({ status: "success" }))
else if (runtime === "container")
runContainer(functionHash).then(() => res.json({ status: "success" }))
else {
res.send("unknown")
return
}
})
function runIsolate(filename) {
return new Promise( (resolve, reject) => {
let timeStart = Date.now()
let {isolate, context} = isolateBackend.createIsolate();
fs.readFile(filename, 'utf-8',(err, data) => {
if (err)
reject(err);
isolate.compileScript(data).then(script => {
script.run(context)
.then(result => {
console.log(result);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("isolate time taken: ", timeDifference);
resolve();
});
}).catch(err => {reject(err)})
});
});
}
function runProcess(filename) {
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('node', [filename]);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference);
});
process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
console.log(`child process exited with code ${code}`);
resolve();
});
})
}
function deployContainer(path, imageName) {
return new Promise((resolve, reject) => {
let buildStart = Date.now()
fs.writeFile(__dirname + '/test/Dockerfile',
fs.writeFile('./test/Dockerfile',
`FROM node:latest
WORKDIR /app
COPY package.json /app
......@@ -136,7 +67,11 @@ function deployContainer(path, imageName) {
CMD node ${imageName}`
, function (err) {
if (err) reject(err);
if (err) {
console.log("failed", err);
reject(err);
}
else {
console.log('Dockerfile created');
const process = spawn('docker', ["build", "-t", imageName, path, "-q"]);
......@@ -157,33 +92,23 @@ function deployContainer(path, imageName) {
resolve();
});
}
});
});
})
}
function runContainer(imageName) {
console.log(imageName);
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('docker', ["run", "--name", imageName, imageName]);
app.post('/serverless/execute/:id', (req, res) => {
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference);
});
let runtime = req.body.runtime
let functionHash = req.params.id
client.publish(node_id, JSON.stringify({
"type": "execute",
runtime, functionHash
}))
res.json({
"status": "success"
})
})
process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
resolve(code);
})
})
}
app.listen(port, () => console.log(`Server listening on port ${port}!`))
\ No newline at end of file
// Create a new isolate limited to 128MB
const ivm = require('isolated-vm');
function createIsolate() {
let context;
const isolate = new ivm.Isolate({ memoryLimit: 128 });
// Create a new context within this isolate. Each context has its own copy of all the builtin
// Objects. So for instance if one context does Object.prototype.foo = 1 this would not affect any
// other contexts.
context = isolate.createContextSync();
// Get a Reference{} to the global object within the context.
const jail = context.global;
// This make the global object available in the context as `global`. We use `derefInto()` here
// because otherwise `global` would actually be a Reference{} object in the new isolate.
jail.setSync('global', jail.derefInto());
// We will create a basic `log` function for the new isolate to use.
const logCallback = function(...args) {
console.log(...args);
};
context.evalClosureSync(`global.console.log = function(...args) {
$0.applyIgnored(undefined, args, { arguments: { copy: true } });
}`, [ logCallback ], { arguments: { reference: true } });
// And let's test it out:
// context.evalSync('logging sync test.');
return {isolate, context};
}
module.exports.createIsolate = createIsolate;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment