Commit 551148c1 authored by Nilanjan Daw's avatar Nilanjan Daw

Partial documentation added

parent d5fdbfd1
...@@ -6,6 +6,7 @@ const { Worker, isMainThread, workerData } = require('worker_threads'); ...@@ -6,6 +6,7 @@ const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = "10.129.6.5:5000/" const registry_url = "10.129.6.5:5000/"
const events = require('events'); const events = require('events');
const workerEvent = new events.EventEmitter(); const workerEvent = new events.EventEmitter();
const parentProcess = require('process');
function runIsolate(local_repository, functionHash, port) { function runIsolate(local_repository, functionHash, port) {
let filename = local_repository + functionHash + ".js" let filename = local_repository + functionHash + ".js"
...@@ -20,24 +21,11 @@ function runIsolate(local_repository, functionHash, port) { ...@@ -20,24 +21,11 @@ function runIsolate(local_repository, functionHash, port) {
console.log("worker exited"); console.log("worker exited");
workerEvent.emit('end', port, "isolate"); workerEvent.emit('end', port, "isolate");
}) })
workerEvent.emit('start', functionHash, port, "isolate") worker.on('online', () => {
resolve() workerEvent.emit('start', functionHash, port, "isolate")
// let timeStart = Date.now() resolve()
// let { isolate, context } = isolateBackend.createIsolate(); })
// fs.readFile(filename, 'utf-8', (err, data) => {
// if (err)
// reject(err);
// context.evalClosure(data).then(result => {
// let timeDifference = Math.ceil((Date.now() - timeStart))
// console.log("isolate time taken: ", timeDifference);
// resolve(result.result);
// }).catch(err => { reject(err) })
// });
}); });
} }
...@@ -178,6 +166,11 @@ function runContainer(imageName, port) { ...@@ -178,6 +166,11 @@ function runContainer(imageName, port) {
} }
parentProcess.stdout.on('data', data => {
console.log("handler", data);
})
module.exports.runContainer = runContainer; module.exports.runContainer = runContainer;
module.exports.runProcess = runProcess; module.exports.runProcess = runProcess;
module.exports.runIsolate = runIsolate; module.exports.runIsolate = runIsolate;
......
...@@ -7,6 +7,7 @@ const node_id = config.id ...@@ -7,6 +7,7 @@ const node_id = config.id
const execute = require('./execute') const execute = require('./execute')
const fs = require('fs') const fs = require('fs')
const kafka = require('kafka-node') const kafka = require('kafka-node')
const local_repository = __dirname + "/local_repository/" const local_repository = __dirname + "/local_repository/"
const host_url = "http://" + constants.master_address + ":" + constants.master_port const host_url = "http://" + constants.master_address + ":" + constants.master_port
...@@ -40,6 +41,9 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -40,6 +41,9 @@ libSupport.makeTopic(node_id).then(() => {
let functionHash = message.functionHash let functionHash = message.functionHash
let function_id = message.function_id let function_id = message.function_id
let port = message.port let port = message.port
/**
* Download necessary files (function file) and Start resource deployment
*/
if (message.type === "execute") { if (message.type === "execute") {
console.log("function_id", function_id); console.log("function_id", function_id);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => { libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
...@@ -52,50 +56,26 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -52,50 +56,26 @@ libSupport.makeTopic(node_id).then(() => {
}) })
}) })
/**
function startWorker(local_repository, functionHash,function_id, producer, runtime, port) { * Start a worker executor of the runtime type
* @param {String} local_repository
* @param {String} functionHash
* @param {String} function_id
* @param {String} producer
* @param {String} runtime
* @param {Number} port
*/
function startWorker(local_repository, functionHash, function_id, producer, runtime, port) {
console.log("Using port", port, "for functionHash", functionHash); console.log("Using port", port, "for functionHash", functionHash);
usedPort.set(port, functionHash) usedPort.set(port, functionHash)
fs.writeFileSync('./local_repository/config.json', JSON.stringify({port})); fs.writeFileSync('./local_repository/config.json', JSON.stringify({port}));
if (runtime === "isolate") if (runtime === "isolate")
execute.runIsolate(local_repository, functionHash, port) execute.runIsolate(local_repository, functionHash, port)
// .then(result => {
// producer.send([{
// topic: "response",
// messages: JSON.stringify({
// status: "success",
// result,
// function_id
// })
// }], () => { })
// })
else if (runtime === "process") else if (runtime === "process")
execute.runProcess(local_repository, functionHash, port) execute.runProcess(local_repository, functionHash, port)
// .then(result => {
// producer.send(
// [{
// topic: "response",
// messages: JSON.stringify({
// status: "success",
// result,
// function_id
// })
// }], () => { })
// })
else if (runtime === "container") else if (runtime === "container")
execute.runContainer(functionHash, port) execute.runContainer(functionHash, port)
// .then(result => {
// producer.send(
// [{
// topic: "response",
// messages: JSON.stringify({
// status: "success",
// result,
// function_id
// })
// }], () => { })
// })
else { else {
producer.send( producer.send(
[{ [{
......
const { Worker, isMainThread, workerData } = require('worker_threads');
function runService() {
return new Promise((resolve, reject) => {
if (isMainThread) {
const worker = new Worker('./local_repository/5a6bd79bcd13b95441c733827cd6b016.js');
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
console.log("worker exited");
})
}
})
}
runService().catch(err => console.error(err))
\ No newline at end of file
...@@ -12,7 +12,12 @@ const mqtt = require('mqtt') ...@@ -12,7 +12,12 @@ const mqtt = require('mqtt')
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
let functionToPort = new Map(), usedPort = new Map() /**
* functionToPort maps the function and its respective port mapping
* TODO: change this to hold a list of mappings of horizontal scaling
*/
let functionToPort = new Map(),
usedPort = new Map() // TODO: remove after integration with RM
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -29,6 +34,7 @@ let kafka = require('kafka-node'), ...@@ -29,6 +34,7 @@ let kafka = require('kafka-node'),
{ topic: 'heartbeat' }, { topic: 'heartbeat' },
{ topic: "deployed" }, { topic: "deployed" },
{ topic: "removeWorker" } { topic: "removeWorker" }
// { topic: "RESPONSE_ARBITER_2_DISPATCHER"}
], ],
[ [
{ autoCommit: true } { autoCommit: true }
...@@ -50,6 +56,9 @@ const WINDOW_SIZE = 10 ...@@ -50,6 +56,9 @@ const WINDOW_SIZE = 10
const port = 8080 const port = 8080
const registry_url = constants.registry_url const registry_url = constants.registry_url
/**
* REST API to receive deployment requests
*/
app.post('/serverless/deploy', (req, res) => { app.post('/serverless/deploy', (req, res) => {
let runtime = req.body.runtime let runtime = req.body.runtime
...@@ -88,6 +97,11 @@ app.post('/serverless/deploy', (req, res) => { ...@@ -88,6 +97,11 @@ app.post('/serverless/deploy', (req, res) => {
}) })
/**
* Create the docker file, build and push image to remote repository
* @param {String: Path from where to extract function executor} path
* @param {String: Name of the image} imageName
*/
function deployContainer(path, imageName) { function deployContainer(path, imageName) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let buildStart = Date.now() let buildStart = Date.now()
...@@ -145,22 +159,33 @@ function deployContainer(path, imageName) { ...@@ -145,22 +159,33 @@ function deployContainer(path, imageName) {
}) })
} }
/**
* REST API to receive execute requests
*/
app.post('/serverless/execute/:id', (req, res) => { app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime let runtime = req.body.runtime
if (functionToPort.has(req.params.id + runtime)) { if (functionToPort.has(req.params.id + runtime)) {
/**
let forwardTo = functionToPort.get(req.params.id + runtime) * Bypass deployment pipeline if resource available
*/
let forwardTo = functionToPort.get(req.params.id + runtime)
console.log("resource found", forwardTo); console.log("resource found", forwardTo);
libSupport.reverseProxy(req, res, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`) libSupport.reverseProxy(req, res, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`)
} else { } else {
requestQueue.push({ req, res }) requestQueue.push({ req, res })
/**
* We store functions for function placement heuristics purposes. This lets us look into the function
* patterns being received and make intelligent deployment decisions based on it.
*/
if (requestQueue.length >= WINDOW_SIZE) if (requestQueue.length >= WINDOW_SIZE)
dispatch() dispatch()
} }
}) })
/**
* Send dispatch signal and deploy resources after consultation with the RM
*/
function dispatch() { function dispatch() {
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length) let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
for (let i = 0; i < lookbackWindow; i++) { for (let i = 0; i < lookbackWindow; i++) {
...@@ -169,16 +194,16 @@ function dispatch() { ...@@ -169,16 +194,16 @@ function dispatch() {
let runtime = req.body.runtime let runtime = req.body.runtime
let functionHash = req.params.id let functionHash = req.params.id
let function_id = libSupport.makeid(20) let function_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
console.log("Dispatching function with Id", function_id, runtime); console.log("Dispatching function with Id", function_id, runtime);
let node_id = getAddress() let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let payload = [{ let payload = [{
topic: node_id, topic: node_id,
messages: JSON.stringify({ messages: JSON.stringify({
"type": "execute", "type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
function_id, function_id,
runtime, functionHash, runtime, functionHash,
port: libSupport.getPort(usedPort) port: libSupport.getPort(usedPort) // TODO: will be provided by the RM
}), }),
partition: 0 partition: 0
}] }]
...@@ -199,6 +224,8 @@ consumer.on('message', function (message) { ...@@ -199,6 +224,8 @@ consumer.on('message', function (message) {
let topic = message.topic let topic = message.topic
message = message.value message = message.value
if (topic === "response") { if (topic === "response") {
console.log(message);
// message = JSON.parse(message) // message = JSON.parse(message)
// console.log(message); // console.log(message);
// let {req, res} = db.get(message.function_id) // let {req, res} = db.get(message.function_id)
...@@ -239,7 +266,21 @@ consumer.on('message', function (message) { ...@@ -239,7 +266,21 @@ consumer.on('message', function (message) {
message = JSON.parse(message) message = JSON.parse(message)
usedPort.delete(message.port) usedPort.delete(message.port)
functionToPort.delete(message.functionHash + message.runtime) functionToPort.delete(message.functionHash + message.runtime)
} else if (topic == "RESPONSE_ARBITER_2_DISPATCHER") {
console.log(message);
} }
}); });
setInterval(dispatch, 2000); setInterval(dispatch, 2000);
// {
// id: "!!!!!",
// "grunts": [{
// id: "a",
// port: 12121
// },{
// id: "b",
// port: 123445
// }]
// }
\ No newline at end of file
const crypto = require('crypto'); const crypto = require('crypto');
const fs = require('fs') const fs = require('fs')
var rp = require('request-promise'); var rp = require('request-promise');
/**
* Generates unique IDs of arbitrary length
* @param {Length of the ID} length
*/
function makeid(length) { function makeid(length) {
var result = ''; var result = '';
var characters = 'abcdefghijklmnopqrstuvwxyz0123456789'; var characters = 'abcdefghijklmnopqrstuvwxyz0123456789';
...@@ -11,6 +16,13 @@ function makeid(length) { ...@@ -11,6 +16,13 @@ function makeid(length) {
return result; return result;
} }
/**
* generates the runtime executor after inserting the received function
* TODO: make this asynchronous
* @param {string Path from where to extract the function} functionPath
* @param {string Function Hash value} functionHash
*/
function generateExecutor(functionPath, functionHash) { function generateExecutor(functionPath, functionHash) {
input = fs.readFileSync('./repository/worker_env/env.js') input = fs.readFileSync('./repository/worker_env/env.js')
functionFile = fs.readFileSync(functionPath + functionHash) functionFile = fs.readFileSync(functionPath + functionHash)
...@@ -27,7 +39,7 @@ function makeid(length) { ...@@ -27,7 +39,7 @@ function makeid(length) {
return hash return hash
} }
function reverseProxy(req, res, url) { function reverseProxy(req, res, url, tryout) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
console.log("requesting reverseproxy"); console.log("requesting reverseproxy");
...@@ -45,9 +57,13 @@ function makeid(length) { ...@@ -45,9 +57,13 @@ function makeid(length) {
resolve() resolve()
}) })
.catch(function (err) { .catch(function (err) {
console.log("error", err.message); if (err.error.errno === "ECONNREFUSED") {
res.json(err.message).status(err.statusCode) reverseProxy(req, res, url, (tryout != null) ? tryout + 1 : 1)
resolve() } else {
console.log("error", err.error.errno);
res.json(err.message).status(err.statusCode)
resolve()
}
}); });
}) })
} }
......
...@@ -3,12 +3,22 @@ const express = require('express') ...@@ -3,12 +3,22 @@ const express = require('express')
const bodyParser = require('body-parser') const bodyParser = require('body-parser')
const app = express() const app = express()
let port = 5000 let port = 5000
let config = null;
try { try {
const config = require('./config.json') config = require('./config.json')
port = config.port port = config.port
} catch (e) { } catch (e) {
port = 5000 port = 5000
} }
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: '10.129.6.5:9092',
autoConnect: true
}),
producer = new Producer(client)
app.use(bodyParser.urlencoded({ extended: true })) app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json()) app.use(bodyParser.json())
let lastRequest = Date.now() let lastRequest = Date.now()
...@@ -25,7 +35,14 @@ function executor(payload) { ...@@ -25,7 +35,14 @@ function executor(payload) {
}) })
} }
app.listen(port, () => console.log(`Server listening on port ${port}!`)) app.listen(port, () => {
console.log(`Server listening on port ${port}!`)
producer.send(
[{
topic: "response",
messages: "ready"
}], () => { })
})
function shouldDie() { function shouldDie() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment