1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
const crypto = require('crypto');
const fs = require('fs')
const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const { createLogger, format, transports } = winston;
const heap = require('heap')
/**
* Generates unique IDs of arbitrary length
* @param {Length of the ID} length
*/
function makeid(length) {
var result = '';
var characters = 'abcdefghijklmnopqrstuvwxyz0123456789';
var charactersLength = characters.length;
for ( var i = 0; i < length; i++ ) {
result += characters.charAt(Math.floor(Math.random() * charactersLength));
}
return result;
}
/**
* generates the runtime executor after inserting the received function
* TODO: make this asynchronous
* @param {string Path from where to extract the function} functionPath
* @param {string Function Hash value} functionHash
*/
function generateExecutor(functionPath, functionHash) {
input = fs.readFileSync('./repository/worker_env/env.js')
functionFile = fs.readFileSync(functionPath + functionHash)
searchSize = "(resolve, reject) => {".length
insertIndex = input.indexOf("(resolve, reject) => {") + searchSize
output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
let hash = crypto.createHash('md5').update(output).digest("hex");
console.log(hash);
fs.writeFileSync(functionPath + hash + ".js", output)
return hash
}
function reverseProxy(req, res, functionToResource, resourceMap) {
return new Promise((resolve, reject) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
/**
* Bypass deployment pipeline if resource available
*/
let functionHeap = functionToResource.get(id)
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
"\n forwarding via reverse proxy to: " + JSON.stringify(resource));
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.metric += 1
heap.heapify(functionHeap, compare)
logger.info(functionHeap);
var options = {
method: 'POST',
uri: url,
body: req.body,
json: true // Automatically stringifies the body to JSON
};
// console.log(options);
rp(options)
.then(function (parsedBody) {
res.json(parsedBody)
forwardTo.metric -= 1
heap.heapify(functionHeap, compare)
console.log(functionHeap);
resolve()
})
.catch(function (err) {
forwardTo.metric -= 1
heap.heapify(functionHeap, compare)
console.log(functionHeap);
logger.error("error" + err.error.errno);
res.json(err.message).status(err.statusCode)
resolve()
});
})
}
function getPort(usedPort) {
let port = -1, ctr = 0
do {
min = Math.ceil(30000);
max = Math.floor(60000);
port = Math.floor(Math.random() * (max - min + 1)) + min;
ctr += 1;
if (ctr > 30000) {
port = -1
break
}
} while (usedPort.has(port))
return port
}
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
format.timestamp(),
format.json()
),
defaultMeta: { module: 'Dispatch Manager' },
transports: [
//
// - Write to all logs with level `info` and below to `combined.log`
// - Write all logs error (and below) to `error.log`.
//
new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
new winston.transports.File({ filename: 'log/combined.log' }),
new winston.transports.Console({
format: winston.format.combine(
format.colorize({ all: true }),
format.timestamp(),
format.simple()
)
})
]
});
function compare(a, b) {
return a.metric - b.metric
}
module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger, compare
}