lib.js 10.9 KB
Newer Older
1 2
const crypto = require('crypto');
const fs = require('fs')
Nilanjan Daw's avatar
Nilanjan Daw committed
3
const rp = require('request-promise');
4
const fetch = require('node-fetch');
Nilanjan Daw's avatar
Nilanjan Daw committed
5
const winston = require('winston')
Nilanjan Daw's avatar
Nilanjan Daw committed
6
const constants = require('.././constants.json')
7
const secrets = require('./secrets.json')
Nilanjan Daw's avatar
Nilanjan Daw committed
8
const metrics = require('./metrics')
Nilanjan Daw's avatar
Nilanjan Daw committed
9
const { createLogger, format, transports } = winston;
10
const heap = require('heap')
Nilanjan Daw's avatar
Nilanjan Daw committed
11

12

13 14 15
let kafka = require('kafka-node'),
   Producer = kafka.Producer,
   client = new kafka.KafkaClient({
16
      kafkaHost: constants.network.external.kafka_host,
17 18 19 20
      autoConnect: true
   }),
   producer = new Producer(client)

21 22 23
let implicitChainDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
implicitChainDB = implicitChainDB + "/" + constants.implicit_chain_db_names + "/"

Nilanjan Daw's avatar
Nilanjan Daw committed
24 25 26 27
/**
 * Generates unique IDs of arbitrary length
 * @param {Length of the ID} length 
 */
28 29 30 31 32 33 34 35 36
function makeid(length) {
    var result           = '';
    var characters       = 'abcdefghijklmnopqrstuvwxyz0123456789';
    var charactersLength = characters.length;
    for ( var i = 0; i < length; i++ ) {
       result += characters.charAt(Math.floor(Math.random() * charactersLength));
    }
    return result;
 }
37

Nilanjan Daw's avatar
Nilanjan Daw committed
38

39 40 41 42 43 44 45
/**
 * generates the runtime executor after inserting the received function
 * TODO: make this asynchronous
 * @param {string Path from where to extract the function} functionPath 
 * @param {string Function Hash value} functionHash 
 */
function generateExecutor(functionPath, functionHash) {
46 47 48 49 50 51 52 53 54 55 56
   input = fs.readFileSync('./repository/worker_env/env.js')
   functionFile = fs.readFileSync(functionPath + functionHash)
   searchSize = "(resolve, reject) => {".length

   insertIndex = input.indexOf("(resolve, reject) => {") + searchSize

   output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
    
   let hash = crypto.createHash('md5').update(output).digest("hex");
   console.log(hash);
   
nilanjandaw's avatar
nilanjandaw committed
57
    fs.writeFileSync(functionPath + hash + ".js", output)
58 59
    return hash
 }
60

61 62 63 64 65 66 67 68
 /**
  * Reverse proxy to take user requests and forward them to appropriate workers using a loadbalacer
  * @param {JSON} req the user request to be forwarded to the worker
  * @param {JSON} res Object to use to return the response to the user
  * @param {Map} functionToResource Function to resource Map
  * @param {Map} resourceMap Map from resource ID to resource metadata
  * @param {Map} functionBranchTree Holds the function path's and related probability distribution
  */
69 70
function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) {
   branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree)
71 72 73 74 75 76 77
   return new Promise((resolve, reject) => {
      let runtime = req.body.runtime
      let id = req.params.id + runtime
      /**
       * Bypass deployment pipeline if resource available
       */
      let functionHeap = functionToResource.get(id)
78
      // loadbalancing by choosing worker with lowest load
79 80
      let forwardTo = functionHeap[0]
      let resource = resourceMap.get(forwardTo.resource_id)
Nilanjan Daw's avatar
Nilanjan Daw committed
81 82
      // logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
      //    "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
83 84
      let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
   
Nilanjan Daw's avatar
Nilanjan Daw committed
85 86
      // logger.info("Request received at reverseproxy. Forwarding to: " + url);
      forwardTo.open_request_count += 1
87
      heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
Nilanjan Daw's avatar
Nilanjan Daw committed
88
      // logger.info(functionHeap);
89 90 91 92 93 94 95
      
      var options = {
         method: 'POST',
         uri: url,
         body: req.body,
         json: true // Automatically stringifies the body to JSON
      };
96
      
97 98 99
      
      rp(options)
         .then(function (parsedBody) {
100
            let serviceTime = Date.now() - res.timestamp
101 102
            
            res.json(parsedBody)
Nilanjan Daw's avatar
Nilanjan Daw committed
103
            forwardTo.open_request_count -= 1
104
            heap.heapify(functionHeap, compare)
Nilanjan Daw's avatar
Nilanjan Daw committed
105
            
Nilanjan Daw's avatar
Nilanjan Daw committed
106
            metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
107 108 109
            resolve()
         })
         .catch(function (err) {
Nilanjan Daw's avatar
Nilanjan Daw committed
110
            forwardTo.open_request_count -= 1
111
            heap.heapify(functionHeap, compare)
Nilanjan Daw's avatar
Nilanjan Daw committed
112
            logger.error("error" + err);
113 114 115 116 117 118
            res.json(err.message).status(err.statusCode)
            resolve()
         });
      
   })
}
nilanjandaw's avatar
nilanjandaw committed
119

120
function getPort(usedPort) {
nilanjandaw's avatar
nilanjandaw committed
121 122 123 124 125 126 127 128 129 130 131 132 133 134
   let port = -1, ctr = 0
   do {
       min = Math.ceil(30000);
       max = Math.floor(60000);
       port = Math.floor(Math.random() * (max - min + 1)) + min;
       ctr += 1;
       if (ctr > 30000) {
           port = -1
           break
       }
   } while (usedPort.has(port))
   return port
}

Nilanjan Daw's avatar
Nilanjan Daw committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
const logger = winston.createLogger({
   level: 'info',
   format: winston.format.combine(
      format.timestamp(),
      format.json()
   ),
   defaultMeta: { module: 'Dispatch Manager' },
   transports: [
      //
      // - Write to all logs with level `info` and below to `combined.log` 
      // - Write all logs error (and below) to `error.log`.
      //
      new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
      new winston.transports.File({ filename: 'log/combined.log' }),
      new winston.transports.Console({
         format: winston.format.combine(
            format.colorize({ all: true }),
            format.timestamp(),
            format.simple()
Nilanjan Daw's avatar
Nilanjan Daw committed
154
            
Nilanjan Daw's avatar
Nilanjan Daw committed
155 156 157 158 159
         )
      })
   ]
});

160
function compare(a, b) {
Nilanjan Daw's avatar
Nilanjan Daw committed
161
   return a.open_request_count - b.open_request_count
162 163
}

164
function branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree) {
Nilanjan Daw's avatar
Nilanjan Daw committed
165
   // console.log(req.headers['x-resource-id']);
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
   
   if (req.headers['x-resource-id'] === undefined) {
      let functionHash = req.params.id
      if (functionBranchTree.has(functionHash)) {
         let branchInfo = functionBranchTree.get(functionHash)
         branchInfo.req_count++

      } else {
         
         let data = {
            req_count: 1,
            parent: true,
            branches: new Map()
         }
         functionBranchTree.set(functionHash, data)
181
      }
182
      
183
   } else {
184 185 186 187 188 189 190 191 192 193 194
      let resource_id = req.headers['x-resource-id']
      let resource = resourceMap.get(resource_id)
      let forwardBranch = req.params.id
      if (!functionBranchTree.has(resource.functionHash)) {
         let data = {
            req_count: 1,
            parent: false,
            branches: new Map()
         }
         data.branches.set(forwardBranch, 1)
         functionBranchTree.set(resource.functionHash, data)
195
      } else {
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
         let branchInfo = functionBranchTree.get(resource.functionHash)
         if (!branchInfo.parent)
            branchInfo.req_count++
         if (branchInfo.branches.has(forwardBranch)) {
            let branchProb = branchInfo.branches.get(forwardBranch)
            branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0)
            branchInfo.branches.set(forwardBranch, branchProb)
         } else {
            branchInfo.branches.set(forwardBranch, 1.0)
         }
         for (let [branch, prob] of branchInfo.branches.entries()) {
            if (branch !== forwardBranch)
               prob *= (branchInfo.req_count - 1)
            prob /= branchInfo.req_count
            branchInfo.branches.set(branch, prob)
         }
212 213
      }
   }
214
   
215
   // console.log("branch tree", functionBranchTree);
216 217
}

218 219
function viterbi(functionBranchTree) {
   
Nilanjan Daw's avatar
Nilanjan Daw committed
220
   functionBranchTree.forEach((metadata, node) => {
221 222
      if (metadata.parent && metadata.req_count % 5 == 0) {
         let path = []
Nilanjan Daw's avatar
Nilanjan Daw committed
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
         let parents = [[node, {
            prob: 1,
            metadata
         }]]
         path.push({node, probability: 1})
         let siblings = new Map()
         while(parents.length > 0) {
            // console.log("parent_group", parents);
            
            for (const parent of parents) {
               // console.log("=========begin==========\n",parent, "\n=============end============");
               // console.log(parent[1].metadata);
               
               if (parent[1].metadata === undefined)
                  continue
               let forwardBranches = parent[1].metadata.branches
               // console.log(forwardBranches);
               
               let parentProbability = parent[1].prob
               
               forwardBranches.forEach((branchProb, subNode) => {
                  let probability = 0
                  if (siblings.has(subNode))
                     probability = siblings.get(subNode)
                  probability += branchProb * parentProbability
                  // console.log("prob", probability);
                  
                  siblings.set(subNode, probability)
               })
               // console.log("siblings", siblings);
               
            }
            parents = []
            let maxSibling, maxProb = 0
            siblings.forEach((prob, sibling) => {
               if (prob > maxProb) {
                  maxSibling = sibling
                  maxProb = prob
               }
            })
            parentIDs = Array.from( siblings.keys() );
            for (const id of parentIDs) {
               let metadata = functionBranchTree.get(id)
               parents.push([
                  id, {
                     prob: siblings.get(id),
                     metadata
                  }
               ])
            }
            if (maxSibling !== undefined)
               path.push({node: maxSibling, probability: maxProb})
            siblings = new Map()
         }
277 278
         // if (path.length > 0)
         //    console.log("path", path);
279
         metadata.mle_path = path
280 281 282 283 284 285 286 287
         if (path.length > 1) {
            let payload = {
               method: 'put',
               body: JSON.stringify(path),
               headers: { 'Content-Type': 'application/json' }
            }
            fetch(implicitChainDB + functionHash, payload)
         }
Nilanjan Daw's avatar
Nilanjan Daw committed
288 289
      }
   });
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
}

function logBroadcast(message, resource_id, resourceMap) {
   return new Promise((resolve, reject) => {
      try {

         message.timestamp = Date.now()
         if (resource_id && resourceMap.has(resource_id)) {
            let resource = resourceMap.get(resource_id)
            message.resource_id = resource_id
            message.node_id = resource.node_id
            message.runtime = resource.runtime
            message.function_id = resource.functionHash
         }
         let log = [{
305
            topic: constants.topics.log_channel,
Nilanjan Daw's avatar
Nilanjan Daw committed
306
            messages: JSON.stringify(message),
307 308 309 310 311 312 313 314 315 316
            partition: 0
         }]
         producer.send(log, () => {
            resolve()
         })
      } catch (err) {
         console.log(err);
         reject()
      }
   })
Nilanjan Daw's avatar
Nilanjan Daw committed
317 318 319
   
}

320 321 322 323 324 325
async function fetchData(url, data = null) {
   let res
   if (data === undefined || data === null)
      res = await fetch(url)
   else
      res = await fetch(url, data)
326 327 328
   return await res.json()
}

nilanjandaw's avatar
nilanjandaw committed
329
 module.exports = {
330 331
    makeid, generateExecutor, reverseProxy, 
    getPort, logger, compare,
332 333
    viterbi, logBroadcast, fetchData, metrics,
    producer
nilanjandaw's avatar
nilanjandaw committed
334
 }