lib.js 4.19 KB
Newer Older
1 2
const crypto = require('crypto');
const fs = require('fs')
Nilanjan Daw's avatar
Nilanjan Daw committed
3
const rp = require('request-promise');
4
const fetch = require('node-fetch');
Nilanjan Daw's avatar
Nilanjan Daw committed
5 6
const winston = require('winston')
const { createLogger, format, transports } = winston;
7
const heap = require('heap')
Nilanjan Daw's avatar
Nilanjan Daw committed
8 9 10 11 12

/**
 * Generates unique IDs of arbitrary length
 * @param {Length of the ID} length 
 */
13 14 15 16 17 18 19 20 21
function makeid(length) {
    var result           = '';
    var characters       = 'abcdefghijklmnopqrstuvwxyz0123456789';
    var charactersLength = characters.length;
    for ( var i = 0; i < length; i++ ) {
       result += characters.charAt(Math.floor(Math.random() * charactersLength));
    }
    return result;
 }
22

Nilanjan Daw's avatar
Nilanjan Daw committed
23

24 25 26 27 28 29 30
/**
 * generates the runtime executor after inserting the received function
 * TODO: make this asynchronous
 * @param {string Path from where to extract the function} functionPath 
 * @param {string Function Hash value} functionHash 
 */
function generateExecutor(functionPath, functionHash) {
31 32 33 34 35 36 37 38 39 40 41
   input = fs.readFileSync('./repository/worker_env/env.js')
   functionFile = fs.readFileSync(functionPath + functionHash)
   searchSize = "(resolve, reject) => {".length

   insertIndex = input.indexOf("(resolve, reject) => {") + searchSize

   output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
    
   let hash = crypto.createHash('md5').update(output).digest("hex");
   console.log(hash);
   
nilanjandaw's avatar
nilanjandaw committed
42
    fs.writeFileSync(functionPath + hash + ".js", output)
43 44
    return hash
 }
45

46
function reverseProxy(req, res, functionToResource, resourceMap) {
Nilanjan Daw's avatar
Nilanjan Daw committed
47

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
   return new Promise((resolve, reject) => {
      let runtime = req.body.runtime
      let id = req.params.id + runtime
      /**
       * Bypass deployment pipeline if resource available
       */
      let functionHeap = functionToResource.get(id)
      let forwardTo = functionHeap[0]
      let resource = resourceMap.get(forwardTo.resource_id)
      logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
         "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
      let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
   
      logger.info("Request received at reverseproxy. Forwarding to: " + url);
      forwardTo.metric += 1
      heap.heapify(functionHeap, compare)
      console.log(functionHeap);
      
      var options = {
         method: 'POST',
         uri: url,
         body: req.body,
         json: true // Automatically stringifies the body to JSON
      };
      
      rp(options)
         .then(function (parsedBody) {
            
            res.json(parsedBody)
            forwardTo.metric -= 1
            heap.heapify(functionHeap, compare)
            console.log(functionHeap);
            resolve()
         })
         .catch(function (err) {
            forwardTo.metric -= 1
            heap.heapify(functionHeap, compare)
            console.log(functionHeap);
            logger.error("error", err.error.errno);
            res.json(err.message).status(err.statusCode)
            resolve()
         });
      
   })
}
nilanjandaw's avatar
nilanjandaw committed
93

94
function getPort(usedPort) {
nilanjandaw's avatar
nilanjandaw committed
95 96 97 98 99 100 101 102 103 104 105 106 107 108
   let port = -1, ctr = 0
   do {
       min = Math.ceil(30000);
       max = Math.floor(60000);
       port = Math.floor(Math.random() * (max - min + 1)) + min;
       ctr += 1;
       if (ctr > 30000) {
           port = -1
           break
       }
   } while (usedPort.has(port))
   return port
}

Nilanjan Daw's avatar
Nilanjan Daw committed
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128

const logger = winston.createLogger({
   level: 'info',
   format: winston.format.combine(
      format.timestamp(),
      format.json()
   ),
   defaultMeta: { module: 'Dispatch Manager' },
   transports: [
      //
      // - Write to all logs with level `info` and below to `combined.log` 
      // - Write all logs error (and below) to `error.log`.
      //
      new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
      new winston.transports.File({ filename: 'log/combined.log' }),
      new winston.transports.Console({
         format: winston.format.combine(
            format.colorize({ all: true }),
            format.timestamp(),
            format.simple()
Nilanjan Daw's avatar
Nilanjan Daw committed
129
            
Nilanjan Daw's avatar
Nilanjan Daw committed
130 131 132 133 134
         )
      })
   ]
});

135 136 137 138
function compare(a, b) {
   return a.metric - b.metric
}

nilanjandaw's avatar
nilanjandaw committed
139
 module.exports = {
140
    makeid, generateExecutor, reverseProxy, getPort, logger, compare
nilanjandaw's avatar
nilanjandaw committed
141
 }