• Nilanjan Daw's avatar
    Request grouping added. Closes #18 · 7b20abb7
    Nilanjan Daw authored
    To prevent requests coming in for the same function from starting too many workers, they are grouped together and one worker is started per group.
    7b20abb7
lib.js 4.23 KB
const crypto = require('crypto');
const fs = require('fs')
const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const { createLogger, format, transports } = winston;
const heap = require('heap')

/**
 * Generates unique IDs of arbitrary length
 * @param {Length of the ID} length 
 */
function makeid(length) {
    var result           = '';
    var characters       = 'abcdefghijklmnopqrstuvwxyz0123456789';
    var charactersLength = characters.length;
    for ( var i = 0; i < length; i++ ) {
       result += characters.charAt(Math.floor(Math.random() * charactersLength));
    }
    return result;
 }


/**
 * generates the runtime executor after inserting the received function
 * TODO: make this asynchronous
 * @param {string Path from where to extract the function} functionPath 
 * @param {string Function Hash value} functionHash 
 */
function generateExecutor(functionPath, functionHash) {
   input = fs.readFileSync('./repository/worker_env/env.js')
   functionFile = fs.readFileSync(functionPath + functionHash)
   searchSize = "(resolve, reject) => {".length

   insertIndex = input.indexOf("(resolve, reject) => {") + searchSize

   output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
    
   let hash = crypto.createHash('md5').update(output).digest("hex");
   console.log(hash);
   
    fs.writeFileSync(functionPath + hash + ".js", output)
    return hash
 }

function reverseProxy(req, res, functionToResource, resourceMap) {

   return new Promise((resolve, reject) => {
      let runtime = req.body.runtime
      let id = req.params.id + runtime
      /**
       * Bypass deployment pipeline if resource available
       */
      let functionHeap = functionToResource.get(id)
      let forwardTo = functionHeap[0]
      let resource = resourceMap.get(forwardTo.resource_id)
      logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
         "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
      let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
   
      logger.info("Request received at reverseproxy. Forwarding to: " + url);
      forwardTo.metric += 1
      heap.heapify(functionHeap, compare)
      logger.info(functionHeap);
      
      var options = {
         method: 'POST',
         uri: url,
         body: req.body,
         json: true // Automatically stringifies the body to JSON
      };

      // console.log(options);
      
      
      rp(options)
         .then(function (parsedBody) {
            
            res.json(parsedBody)
            forwardTo.metric -= 1
            heap.heapify(functionHeap, compare)
            console.log(functionHeap);
            resolve()
         })
         .catch(function (err) {
            forwardTo.metric -= 1
            heap.heapify(functionHeap, compare)
            console.log(functionHeap);
            logger.error("error" + err.error.errno);
            res.json(err.message).status(err.statusCode)
            resolve()
         });
      
   })
}

function getPort(usedPort) {
   let port = -1, ctr = 0
   do {
       min = Math.ceil(30000);
       max = Math.floor(60000);
       port = Math.floor(Math.random() * (max - min + 1)) + min;
       ctr += 1;
       if (ctr > 30000) {
           port = -1
           break
       }
   } while (usedPort.has(port))
   return port
}


const logger = winston.createLogger({
   level: 'info',
   format: winston.format.combine(
      format.timestamp(),
      format.json()
   ),
   defaultMeta: { module: 'Dispatch Manager' },
   transports: [
      //
      // - Write to all logs with level `info` and below to `combined.log` 
      // - Write all logs error (and below) to `error.log`.
      //
      new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
      new winston.transports.File({ filename: 'log/combined.log' }),
      new winston.transports.Console({
         format: winston.format.combine(
            format.colorize({ all: true }),
            format.timestamp(),
            format.simple()
            
         )
      })
   ]
});

function compare(a, b) {
   return a.metric - b.metric
}

 module.exports = {
    makeid, generateExecutor, reverseProxy, getPort, logger, compare
 }