Commit df51f208 authored by nilanjandaw's avatar nilanjandaw

multi-host setup wotking with minor bugs

parent 826b3465
......@@ -3,3 +3,4 @@ bitnami*
node_modules
package-lock.json
firecracker*
secrets.json
\ No newline at end of file
......@@ -17,7 +17,7 @@ function runIsolate(filename) {
console.log(result);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("isolate time taken: ", timeDifference);
resolve();
resolve(result);
});
}).catch(err => { reject(err) })
});
......@@ -29,9 +29,10 @@ function runProcess(filename) {
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('node', [filename]);
let result = "";
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
result += data;
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference);
});
......@@ -43,7 +44,7 @@ function runProcess(filename) {
process.on('close', (code) => {
console.log(`child process exited with code ${code}`);
resolve();
resolve(result);
});
})
......@@ -56,11 +57,12 @@ function runContainer(imageName) {
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('docker', ["run", "--name", imageName, imageName]);
let result = "";
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference);
result += data;
});
process.stderr.on('data', (data) => {
......@@ -69,7 +71,7 @@ function runContainer(imageName) {
});
process.on('close', (code) => {
resolve(code);
resolve(result);
})
})
......
......@@ -17,23 +17,43 @@ client.on('message', function (topic, message) {
// message is Buffer
message = JSON.parse(message)
if (message.type !== 'heartbeat')
console.log(message.toString())
if (message.type !== 'heartbeat') {
let runtime = message.runtime
let functionHash = message.functionHash
let function_id = message.function_id
console.log("function_id", function_id);
if (message.type === "execute") {
if (runtime === "isolate")
execute.runIsolate('../dispatcher/test/' + functionHash).then(() => client.publish(node_id, JSON.stringify({ status: "success" })))
execute.runIsolate('../dispatcher/test/' + functionHash).then( result => {
client.publish("response", JSON.stringify({
status: "success",
result,
function_id
}))
})
else if (runtime === "process")
execute.runProcess('../dispatcher/test/' + functionHash).then(() => client.publish(node_id, JSON.stringify({ status: "success" })))
execute.runProcess('../dispatcher/test/' + functionHash).then( result => {
client.publish("response", JSON.stringify({
status: "success",
result,
function_id
}))
})
else if (runtime === "container")
execute.runContainer(functionHash).then(() => client.publish(node_id, JSON.stringify({ status: "success" })))
execute.runContainer(functionHash).then(result => {
client.publish("response", JSON.stringify({
status: "success",
result,
function_id
}))
})
else {
client.publish(node_id, JSON.stringify({ status: "unknown runtime" }))
client.publish("response", JSON.stringify({ status: "unknown runtime" }))
return
}
}
}
})
......
......@@ -3,23 +3,29 @@
const express = require('express')
const bodyParser = require('body-parser')
const fileUpload = require('express-fileupload');
const fs = require('fs')
const { spawn } = require('child_process');
const redis = require('redis')
const morgan = require('morgan')
const mqtt = require('mqtt')
const client = mqtt.connect('mqtt://localhost')
const fs = require('fs')
const app = express()
const libSupport = require('./lib')
// const secret = require('../secrets.json')
// const db = require('nano')(`http://${secret.dispatched_db_username}:${secret.dispatched_db_password}@localhost:5984/dispatched`)
let db = new Map()
app.use(morgan('combined'))
app.use(bodyParser.urlencoded({ extended: false }))
app.use(bodyParser.json())
app.use(fileUpload())
const node_id = "20sez54hq8"
const WINDOW_SIZE = 1
const port = 8080
let requestQueue = []
app.post('/serverless/deploy', (req, res) => {
let runtime = req.body.runtime
......@@ -101,17 +107,49 @@ function deployContainer(path, imageName) {
app.post('/serverless/execute/:id', (req, res) => {
requestQueue.push({
req, res
})
if (requestQueue.length >= WINDOW_SIZE)
dispatch()
})
function dispatch() {
for (let i = 0; i < WINDOW_SIZE; i++) {
let {req, res} = requestQueue.shift()
let runtime = req.body.runtime
let functionHash = req.params.id
let function_id = libSupport.makeid(20)
console.log("Dispatching function with Id", function_id);
client.publish(node_id, JSON.stringify({
"type": "execute",
function_id,
runtime, functionHash
}))
db.set(function_id, res)
}
}
client.on('message', function (topic, message) {
if (topic === "response") {
message = JSON.parse(message)
console.log(message);
let res = db.get(message.function_id)
res.json({
"status": "success"
"status": "success",
"reply": message.result
})
}
})
app.listen(port, () => console.log(`Server listening on port ${port}!`))
client.on('connect', function () {
client.subscribe("response")
})
\ No newline at end of file
......@@ -15,6 +15,8 @@
"express-fileupload": "^1.1.6",
"isolated-vm": "^3.0.0",
"morgan": "^1.9.1",
"redis": "^2.8.0"
"nano": "^8.1.0",
"redis": "^2.8.0",
"save": "^2.4.0"
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment