    To prevent requests coming in for the same function from starting too many workers, they are grouped together and one worker is started per group.
lib.js 4.23 KB
const crypto = require('crypto');
const fs = require('fs')
const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const { createLogger, format, transports } = winston;
const heap = require('heap')

 * Generates unique IDs of arbitrary length
 * @param {Length of the ID} length 
function makeid(length) {
    var result           = '';
    var characters       = 'abcdefghijklmnopqrstuvwxyz0123456789';
    var charactersLength = characters.length;
    for ( var i = 0; i < length; i++ ) {
       result += characters.charAt(Math.floor(Math.random() * charactersLength));
    return result;

 * generates the runtime executor after inserting the received function
 * TODO: make this asynchronous
 * @param {string Path from where to extract the function} functionPath 
 * @param {string Function Hash value} functionHash 
function generateExecutor(functionPath, functionHash) {
   input = fs.readFileSync('./repository/worker_env/env.js')
   functionFile = fs.readFileSync(functionPath + functionHash)
   searchSize = "(resolve, reject) => {".length

   insertIndex = input.indexOf("(resolve, reject) => {") + searchSize

   output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
   let hash = crypto.createHash('md5').update(output).digest("hex");
    fs.writeFileSync(functionPath + hash + ".js", output)
    return hash

function reverseProxy(req, res, functionToResource, resourceMap) {

   return new Promise((resolve, reject) => {
      let runtime = req.body.runtime
      let id = req.params.id + runtime
       * Bypass deployment pipeline if resource available
      let functionHeap = functionToResource.get(id)
      let forwardTo = functionHeap[0]
      let resource = resourceMap.get(forwardTo.resource_id)
      logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
         "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
      let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
      logger.info("Request received at reverseproxy. Forwarding to: " + url);
      forwardTo.metric += 1
      heap.heapify(functionHeap, compare)
      var options = {
         method: 'POST',
         uri: url,
         body: req.body,
         json: true // Automatically stringifies the body to JSON

      // console.log(options);
         .then(function (parsedBody) {
            forwardTo.metric -= 1
            heap.heapify(functionHeap, compare)
         .catch(function (err) {
            forwardTo.metric -= 1
            heap.heapify(functionHeap, compare)
            logger.error("error" + err.error.errno);

function getPort(usedPort) {
   let port = -1, ctr = 0
   do {
       min = Math.ceil(30000);
       max = Math.floor(60000);
       port = Math.floor(Math.random() * (max - min + 1)) + min;
       ctr += 1;
       if (ctr > 30000) {
           port = -1
   } while (usedPort.has(port))
   return port

const logger = winston.createLogger({
   level: 'info',
   format: winston.format.combine(
   defaultMeta: { module: 'Dispatch Manager' },
   transports: [
      // - Write to all logs with level `info` and below to `combined.log` 
      // - Write all logs error (and below) to `error.log`.
      new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
      new winston.transports.File({ filename: 'log/combined.log' }),
      new winston.transports.Console({
         format: winston.format.combine(
            format.colorize({ all: true }),

function compare(a, b) {
   return a.metric - b.metric

 module.exports = {
    makeid, generateExecutor, reverseProxy, getPort, logger, compare