Commit 753d6993 authored by nilanjandaw's avatar nilanjandaw

streamlining isolates

adding infra for long running isolates, pushed port allocation to dispatcher. TODO: reduce datastructure redundancy
parent 0ad5b02d
{"id":"192.168.31.51","master_node":"10.129.6.5"} {"id":"10.130.150.246","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
...@@ -29,7 +29,7 @@ function runIsolate(filename) { ...@@ -29,7 +29,7 @@ function runIsolate(filename) {
} }
function runProcess(local_repository, functionHash, port) { function runProcess(local_repository, functionHash, port) {
let filename = local_repository + functionHash let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timeStart = Date.now() let timeStart = Date.now()
const process = spawn('node', [filename, port]); const process = spawn('node', [filename, port]);
......
...@@ -39,11 +39,11 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -39,11 +39,11 @@ libSupport.makeTopic(node_id).then(() => {
let runtime = message.runtime let runtime = message.runtime
let functionHash = message.functionHash let functionHash = message.functionHash
let function_id = message.function_id let function_id = message.function_id
let port = message.port
if (message.type === "execute") { if (message.type === "execute") {
console.log("function_id", function_id); console.log("function_id", function_id);
libSupport.download(host_url + "/repository/" + functionHash, local_repository + functionHash).then(() => { libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash).then(() => {
startWorker(local_repository, functionHash, function_id, producer, runtime) startWorker(local_repository, functionHash, function_id, producer, runtime, port)
}) })
} }
...@@ -53,11 +53,11 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -53,11 +53,11 @@ libSupport.makeTopic(node_id).then(() => {
}) })
function startWorker(local_repository, functionHash,function_id, producer, runtime) { function startWorker(local_repository, functionHash,function_id, producer, runtime, port) {
let port = libSupport.getPort(usedPort)
console.log("Using port", port, "for functionHash", functionHash); console.log("Using port", port, "for functionHash", functionHash);
usedPort.set(port, functionHash) usedPort.set(port, functionHash)
fs.writeFileSync('./local_repository/config.json', JSON.stringify({port}));
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository + functionHash).then(result => { execute.runIsolate(local_repository + functionHash).then(result => {
producer.send([{ producer.send([{
......
...@@ -76,25 +76,10 @@ function makeid(length) { ...@@ -76,25 +76,10 @@ function makeid(length) {
return result; return result;
} }
function getPort(usedPort) {
let port = -1, ctr = 0
do {
min = Math.ceil(30000);
max = Math.floor(60000);
port = Math.floor(Math.random() * (max - min + 1)) + min;
ctr += 1;
if (ctr > 30000) {
port = -1
break
}
} while (usedPort.has(port))
return port
}
function returnPort(port, usedPort) { function returnPort(port, usedPort) {
usedPort.delete((port)) usedPort.delete((port))
} }
module.exports = { module.exports = {
download, makeid, updateConfig, makeTopic, getPort, returnPort download, makeid, updateConfig, makeTopic, returnPort
} }
\ No newline at end of file
const { Worker, isMainThread, workerData } = require('worker_threads');
function runService() {
return new Promise((resolve, reject) => {
if (isMainThread) {
const worker = new Worker('./local_repository/49ce6b6f383591cf86a667a118ff0d2c.js');
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
console.log("worker exited");
})
}
})
}
async function run() {
const result = await runService(
)
console.log(result);
}
run().catch(err => console.error(err))
\ No newline at end of file
...@@ -12,7 +12,7 @@ const mqtt = require('mqtt') ...@@ -12,7 +12,7 @@ const mqtt = require('mqtt')
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
let functionToPort = new Map() let functionToPort = new Map(), usedPort = new Map()
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -27,8 +27,8 @@ let kafka = require('kafka-node'), ...@@ -27,8 +27,8 @@ let kafka = require('kafka-node'),
[ [
{ topic: 'response', partition: 0, offset: 0}, { topic: 'response', partition: 0, offset: 0},
{ topic: 'heartbeat' }, { topic: 'heartbeat' },
{topic: "deployed"}, { topic: "deployed" },
{ topic: "removeWorker"} { topic: "removeWorker" }
], ],
[ [
{ autoCommit: true } { autoCommit: true }
...@@ -99,7 +99,7 @@ function deployContainer(path, imageName) { ...@@ -99,7 +99,7 @@ function deployContainer(path, imageName) {
RUN npm install RUN npm install
COPY . /app COPY . /app
CMD node ${imageName}` CMD node ${imageName}.js`
, function (err) { , function (err) {
if (err) { if (err) {
console.log("failed", err); console.log("failed", err);
...@@ -177,7 +177,8 @@ function dispatch() { ...@@ -177,7 +177,8 @@ function dispatch() {
messages: JSON.stringify({ messages: JSON.stringify({
"type": "execute", "type": "execute",
function_id, function_id,
runtime, functionHash runtime, functionHash,
port: libSupport.getPort(usedPort)
}), }),
partition: 0 partition: 0
}] }]
...@@ -221,7 +222,6 @@ consumer.on('message', function (message) { ...@@ -221,7 +222,6 @@ consumer.on('message', function (message) {
if (db.has(message.functionHash + message.runtime)) { if (db.has(message.functionHash + message.runtime)) {
let { req, res } = db.get(message.functionHash + message.runtime) let { req, res } = db.get(message.functionHash + message.runtime)
// res.redirect(307, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`);
if (parseInt(message.port) != -1) if (parseInt(message.port) != -1)
functionToPort.set(message.functionHash + message.runtime, { functionToPort.set(message.functionHash + message.runtime, {
port: parseInt(message.port), port: parseInt(message.port),
...@@ -236,8 +236,8 @@ consumer.on('message', function (message) { ...@@ -236,8 +236,8 @@ consumer.on('message', function (message) {
} }
} else if (topic == "removeWorker") { } else if (topic == "removeWorker") {
console.log("removing metadata", message); console.log("removing metadata", message);
message = JSON.parse(message) message = JSON.parse(message)
usedPort.delete(message.port)
functionToPort.delete(message.functionHash + message.runtime) functionToPort.delete(message.functionHash + message.runtime)
} }
}); });
......
...@@ -23,7 +23,7 @@ function makeid(length) { ...@@ -23,7 +23,7 @@ function makeid(length) {
let hash = crypto.createHash('md5').update(output).digest("hex"); let hash = crypto.createHash('md5').update(output).digest("hex");
console.log(hash); console.log(hash);
fs.writeFileSync(functionPath + hash, output) fs.writeFileSync(functionPath + hash + ".js", output)
return hash return hash
} }
...@@ -52,6 +52,21 @@ function makeid(length) { ...@@ -52,6 +52,21 @@ function makeid(length) {
}) })
} }
module.exports.makeid = makeid; function getPort(usedPort) {
module.exports.generateExecutor = generateExecutor; let port = -1, ctr = 0
module.exports.reverseProxy = reverseProxy; do {
\ No newline at end of file min = Math.ceil(30000);
max = Math.floor(60000);
port = Math.floor(Math.random() * (max - min + 1)) + min;
ctr += 1;
if (ctr > 30000) {
port = -1
break
}
} while (usedPort.has(port))
return port
}
module.exports = {
makeid, generateExecutor, reverseProxy, getPort
}
\ No newline at end of file
...@@ -2,9 +2,16 @@ ...@@ -2,9 +2,16 @@
const express = require('express') const express = require('express')
const bodyParser = require('body-parser') const bodyParser = require('body-parser')
const app = express() const app = express()
let port = 5000
try {
const config = require('./config.json')
port = config.port
} catch (e) {
port = 5000
}
app.use(bodyParser.urlencoded({ extended: true })) app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json()) app.use(bodyParser.json())
let port = (process.argv[2] != null)? parseInt(process.argv[2]): 5000, lastRequest = Date.now() let lastRequest = Date.now()
app.post('/serverless/function/execute/', (req, res) => { app.post('/serverless/function/execute/', (req, res) => {
let payload = req.body let payload = req.body
lastRequest = Date.now() lastRequest = Date.now()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment