Commit 79adfd7d authored by Nilanjan Daw's avatar Nilanjan Daw

Added Executor level state emitters

Added Kafka producers to alert DM?DA of executor states. Removed indirect state gathering module from DA
parent 7d5fc388
......@@ -2,4 +2,5 @@ bitnami*
\ No newline at end of file
\ No newline at end of file
\ No newline at end of file
\ No newline at end of file
......@@ -8,7 +8,7 @@ const events = require('events');
const workerEvent = new events.EventEmitter();
const parentProcess = require('process');
function runIsolate(local_repository, functionHash, port) {
function runIsolate(local_repository, functionHash, port, resource_id) {
let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => {
......@@ -30,7 +30,7 @@ function runIsolate(local_repository, functionHash, port) {
function runProcess(local_repository, functionHash, port) {
function runProcess(local_repository, functionHash, port, resource_id) {
let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => {
let timeStart =
......@@ -41,7 +41,7 @@ function runProcess(local_repository, functionHash, port) {
result += data;
let timeDifference = Math.ceil(( - timeStart))
console.log("process time taken: ", timeDifference);
workerEvent.emit('start', functionHash, port, "process")
// workerEvent.emit('start', functionHash, port, "process")
......@@ -52,14 +52,14 @@ function runProcess(local_repository, functionHash, port) {
process.on('close', (code) => {
console.log(`child process exited with code ${code}`);
workerEvent.emit('end', port, "process");
// workerEvent.emit('end', port, "process");
function runContainer(imageName, port) {
function runContainer(imageName, port, resource_id) {
return new Promise((resolve, reject) => {
......@@ -80,9 +80,10 @@ function runContainer(imageName, port) {
if (code != 0)
else {
const process = spawn('docker', ["run", "--name", imageName, registry_url + imageName]);
const process = spawn('docker', ["run", "--rm", "-p", `${port}:5000`, "--name", imageName, registry_url + imageName,
resource_id, imageName, port, "container"]);
let result = "";
timeStart =
// timeStart =
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil(( - timeStart))
......@@ -104,58 +105,29 @@ function runContainer(imageName, port) {
} else {
const process_checkContainer = spawn('docker', ['container', 'inspect', imageName]);
process_checkContainer.on('close', (code) => {
console.log("container starting at port", port);
if (code != 0) {
const process = spawn('docker', ["run", "-p", `${port}:5000`, "--name", imageName, registry_url + imageName]);
let result = "";
timeStart =
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil(( - timeStart))
console.log("container run time taken: ", timeDifference);
// result += data;
workerEvent.emit('start', imageName, port, "container")
process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
process.on('close', (code) => {
workerEvent.emit('end', port, "container");
} else {
const clean_container = spawn('docker', ['rm', imageName])
clean_container.on('close', code => {
const process = spawn('docker', ["run", "-p", `${port}:5000`, "--name", imageName, registry_url + imageName]);
let result = "";
timeStart =
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil(( - timeStart))
console.log("container run time taken: ", timeDifference);
// result += data;
workerEvent.emit('start', imageName, port, "container")
process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
process.on('close', (code) => {
workerEvent.emit('end', port, "container");
const process = spawn('docker', ["run", "--rm", "-p", `${port}:5000`, "--name", imageName,
registry_url + imageName, resource_id, imageName, port, "container"]);
let result = "";
// timeStart =
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil(( - timeStart))
console.log("container run time taken: ", timeDifference);
// result += data;
workerEvent.emit('start', imageName, port, "container")
process.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
process.on('close', (code) => {
workerEvent.emit('end', port, "container");
......@@ -4,6 +4,8 @@ const config = require('./config.json')
const libSupport = require('./lib')
const node_id =
const {spawn } = require('child_process')
let grunt = spawn('./grunt')
const execute = require('./execute')
const fs = require('fs')
const kafka = require('kafka-node')
......@@ -36,21 +38,19 @@ libSupport.makeTopic(node_id).then(() => {
let topic = message.topic
message = message.value
message = JSON.parse(message)
if (topic !== 'heartbeat') {
let runtime = message.runtime
let functionHash = message.functionHash
let function_id = message.function_id
let port = message.port
* Download necessary files (function file) and Start resource deployment
if (message.type === "execute") {
console.log("function_id", function_id); + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
startWorker(local_repository, functionHash, function_id, producer, runtime, port)
let runtime = message.runtime
let functionHash = message.functionHash
let resource_id = message.resource_id
let port = message.port
* Download necessary files (function file) and Start resource deployment
if (message.type === "execute") {
console.log("function_id", resource_id); + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
startWorker(local_repository, functionHash, resource_id, producer, runtime, port)
......@@ -60,31 +60,33 @@ libSupport.makeTopic(node_id).then(() => {
* Start a worker executor of the runtime type
* @param {String} local_repository
* @param {String} functionHash
* @param {String} function_id
* @param {String} resource_id
* @param {String} producer
* @param {String} runtime
* @param {Number} port
function startWorker(local_repository, functionHash, function_id, producer, runtime, port) {
function startWorker(local_repository, functionHash, resource_id, producer, runtime, port) {
console.log("Using port", port, "for functionHash", functionHash);
usedPort.set(port, functionHash)
fs.writeFileSync('./local_repository/config.json', JSON.stringify({port}));
if (runtime === "isolate")
execute.runIsolate(local_repository, functionHash, port)
else if (runtime === "process")
execute.runProcess(local_repository, functionHash, port)
else if (runtime === "container")
execute.runContainer(functionHash, port)
else {
topic: "response",
messages: JSON.stringify({ status: "unknown runtime" })
}], () => { })
fs.writeFile('./local_repository/config.json', JSON.stringify({port, functionHash, resource_id, runtime}), () => {
if (runtime === "isolate")
execute.runIsolate(local_repository, functionHash, port, resource_id)
else if (runtime === "process")
execute.runProcess(local_repository, functionHash, port, resource_id)
else if (runtime === "container")
execute.runContainer(functionHash, port, resource_id)
else {
topic: "response",
messages: JSON.stringify({ status: "unknown runtime" })
}], () => { })
function heartbeat() {
......@@ -92,31 +94,77 @@ function heartbeat() {
topic: "heartbeat",
messages: JSON.stringify({"address": node_id, "timestamp":})
producer.send(payload, function() {
producer.send(payload, function() {})
execute.workerEvent.on("start", (functionHash, port, runtime) => {
console.log("started function Port: ", port, functionHash);
topic: "deployed",
messages: JSON.stringify({ functionHash, port, runtime, node_id })
}], () => { })
// producer.send(
// [{
// topic: "deployed",
// messages: JSON.stringify({ functionHash, port, runtime, node_id })
// }], () => { })
execute.workerEvent.on('end', (port, runtime) => {
let functionHash = usedPort.get(port)
topic: "removeWorker",
messages: JSON.stringify({ functionHash, port, runtime, node_id })
}], () => {
console.log("Ending worker for function", functionHash, usedPort);
// producer.send(
// [{
// topic: "removeWorker",
// messages: JSON.stringify({ functionHash, port, runtime, node_id })
// }], () => {
// console.log("Ending worker for function", functionHash, usedPort);
// })
setInterval(heartbeat, 1000);
\ No newline at end of file
grunt.stdout.on('data', data => {
grunt.stderr.on('data', data => {
setInterval(heartbeat, 1000);
* Channel LOG_COMMON
Source: Executor
"status": true/false
"reason": "deployed / exd"
Source: Executor
"usage": {
Source: ReverseProxy
Source: Dispatch Manager
......@@ -16,9 +16,11 @@ const libSupport = require('./lib')
* functionToPort maps the function and its respective port mapping
* TODO: change this to hold a list of mappings of horizontal scaling
let functionToPort = new Map(),
let functionToResource = new Map(), // TODO: make the resource a list for horizontal scale out
usedPort = new Map(), // TODO: remove after integration with RM
rmQueue = new Map()
rmQueue = new Map(),
db = new Map(),
resourceMap = new Map()
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -41,7 +43,6 @@ let kafka = require('kafka-node'),
{ autoCommit: true }
let db = new Map()
app.use(bodyParser.urlencoded({ extended: true }))
......@@ -113,8 +114,7 @@ function deployContainer(path, imageName) {
COPY package.json /app
RUN npm install
COPY . /app
CMD node ${imageName}.js`
ENTRYPOINT ["node", "${imageName}.js"]`
, function (err) {
if (err) {
console.log("failed", err);
......@@ -165,15 +165,20 @@ function deployContainer(path, imageName) {
*/'/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime
if (functionToPort.has( + runtime)) {
if (functionToResource.has( + runtime)) {
* Bypass deployment pipeline if resource available
let forwardTo = functionToPort.get( + runtime)
console.log("resource found", forwardTo);
libSupport.reverseProxy(req, res, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`)
let forwardTo = functionToResource.get( + runtime)
let resource = resourceMap.get(forwardTo.resource_id)
console.log("resource found", forwardTo, resource);
libSupport.reverseProxy(req, res, `http://${resource.node_id}:${resource.port}/serverless/function/execute`)
} else {
* FIXME: Here, every request even for the same function will be queued up potentially launching multiple
* resource of the same type
requestQueue.push({ req, res })
* We store functions for function placement heuristics purposes. This lets us look into the function
......@@ -195,20 +200,29 @@ function dispatch() {
let runtime = req.body.runtime
let functionHash =
let function_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
console.log("Dispatching function with Id", function_id, runtime);
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
console.log("Dispatching function with Id", resource_id, runtime);
let node_id = getAddress() // Requests the RM for address and other metadata for function placement
let port = libSupport.getPort(usedPort) // TODO: will be provided by the RM
resourceMap.set(resource_id, {
runtime, functionHash, port, node_id
let payload = [{
topic: node_id,
messages: JSON.stringify({
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
runtime, functionHash,
port: libSupport.getPort(usedPort) // TODO: will be provided by the RM
partition: 0
producer.send(payload, () => { })
db.set(functionHash + runtime, { req, res })
/** uncomment when RM is available
rmQueue.set(function_id, payload)
let payloadToRM = [{
......@@ -222,7 +236,7 @@ function dispatch() {
db.set(functionHash + runtime, { req, res })
......@@ -238,7 +252,7 @@ consumer.on('message', function (message) {
let topic = message.topic
message = message.value
if (topic === "response") {
console.log("response", message);
// message = JSON.parse(message)
// console.log(message);
......@@ -257,38 +271,52 @@ consumer.on('message', function (message) {
} else if (topic == "deployed") {
try {
message = JSON.parse(message)
} catch (e) {
// process.exit(0)
message = JSON.parse(message)
console.log("deployed", message);
if (db.has(message.functionHash + message.runtime)) {
let { req, res } = db.get(message.functionHash + message.runtime)
if (parseInt(message.port) != -1)
functionToPort.set(message.functionHash + message.runtime, {
port: parseInt(message.port),
node_id: message.node_id
functionToResource.set(message.functionHash + message.runtime, {
resource_id: message.resource_id
let resource = resourceMap.get(message.resource_id)
libSupport.reverseProxy(req, res,
.then(() => {
db.delete(message.functionHash + message.runtime)
} else if (topic == "removeWorker") {
console.log("removing metadata", message);
message = JSON.parse(message)
try {
message = JSON.parse(message)
} catch(e) {
// process.exit(0)
functionToPort.delete(message.functionHash + message.runtime)
let resource = functionToResource.get(message.functionHash + message.runtime)
functionToResource.delete(message.functionHash + message.runtime)
} else if (topic == "RESPONSE_ARBITER_2_DISPATCHER") {
message = JSON.parse(message)
let payload = rmQueue.get(
payload[0].topic = getAddress()
if (payload != null) {
payload[0].topic = getAddress()
producer.send(payload, () => { })
} else {
console.log("something went wrong");
producer.send(payload, () => { })
......@@ -2,13 +2,20 @@
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
let port = 5000
let port = 5000, resource_id, functionHash, portExternal, runtime
let config = null;
try {
config = require('./config.json')
port = config.port
resource_id = config.resource_id
functionHash = config.functionHash
runtime = config.runtime
} catch (e) {
port = 5000
resource_id = process.argv[2]
functionHash = process.argv[3]
portExternal = process.argv[4]
runtime = process.argv[5]
let kafka = require('kafka-node'),
......@@ -36,11 +43,11 @@ function executor(payload) {
app.listen(port, () => {
console.log(`Server listening on port ${port}!`)
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
topic: "response",
messages: "ready"
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal, runtime, resource_id })
}], () => { })
......@@ -48,8 +55,15 @@ function shouldDie() {
if ( - lastRequest > 5 * 1000) {
console.log("Idle for too long. Exiting");
topic: "removeWorker",
messages: JSON.stringify({ functionHash, portExternal, runtime, resource_id })
}], () => {
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment