Commit ca61f8b9 authored by Nilanjan Daw's avatar Nilanjan Daw

Added out-of-band request tracker

Added an out-of-band request tracker and request probability measurement module
parent 0029b21b
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.88.2",
"winston": "^3.2.1" "winston": "^3.2.1"
} }
} }
...@@ -26,8 +26,9 @@ let usedPort = new Map(), // TODO: remove after integration with RM ...@@ -26,8 +26,9 @@ let usedPort = new Map(), // TODO: remove after integration with RM
rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation
db = new Map(), // queue holding request to be dispatched db = new Map(), // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map() // a function to resource map. Each map contains a minheap of functionToResource = new Map(), // a function to resource map. Each map contains a minheap of
// resources associated with the function // resources associated with the function
workerNodes = new Map()
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -61,7 +62,6 @@ app.use(fileUpload()) ...@@ -61,7 +62,6 @@ app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); app.use(swStats.getMiddleware({ swaggerSpec: apiSpec }));
let requestQueue = [] let requestQueue = []
let workerNodes = []
const WINDOW_SIZE = 10 const WINDOW_SIZE = 10
const port = constants.master_port const port = constants.master_port
...@@ -151,9 +151,9 @@ function deployContainer(path, imageName) { ...@@ -151,9 +151,9 @@ function deployContainer(path, imageName) {
fs.writeFile('./repository/Dockerfile', fs.writeFile('./repository/Dockerfile',
`FROM node:latest `FROM node:latest
WORKDIR /app WORKDIR /app
COPY package.json /app COPY ./worker_env/package.json /app
RUN npm install ADD ./worker_env/node_modules /app/node_modules
COPY . /app COPY ${imageName}.js /app
ENTRYPOINT ["node", "${imageName}.js"]` ENTRYPOINT ["node", "${imageName}.js"]`
, function (err) { , function (err) {
if (err) { if (err) {
...@@ -205,6 +205,7 @@ function deployContainer(path, imageName) { ...@@ -205,6 +205,7 @@ function deployContainer(path, imageName) {
* REST API to receive execute requests * REST API to receive execute requests
*/ */
app.post('/serverless/execute/:id', (req, res) => { app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime let runtime = req.body.runtime
let id = req.params.id + runtime let id = req.params.id + runtime
if (functionToResource.has(id)) { if (functionToResource.has(id)) {
...@@ -280,9 +281,6 @@ function dispatch() { ...@@ -280,9 +281,6 @@ function dispatch() {
} }
} }
function getAddress() {
return workerNodes[Math.floor(Math.random() * workerNodes.length)];
}
function postDeploy(message) { function postDeploy(message) {
logger.info("Deployed Resource: " + JSON.stringify(message)); logger.info("Deployed Resource: " + JSON.stringify(message));
...@@ -371,9 +369,10 @@ consumer.on('message', function (message) { ...@@ -371,9 +369,10 @@ consumer.on('message', function (message) {
} else if (topic === "heartbeat") { } else if (topic === "heartbeat") {
message = JSON.parse(message) message = JSON.parse(message)
if (Date.now() - message.timestamp < 1000) if (Date.now() - message.timestamp < 1000)
if (workerNodes.indexOf(message.address) === -1) { if (!workerNodes.has(message.address)) {
workerNodes.push(message.address) workerNodes.set(message.address, message.timestamp)
logger.warn("New worker discovered. Worker List: " + workerNodes) logger.warn("New worker discovered. Worker List: ")
logger.warn(workerNodes)
} }
} else if (topic == "deployed") { } else if (topic == "deployed") {
try { try {
......
...@@ -7,6 +7,7 @@ const constants = require('.././constants.json') ...@@ -7,6 +7,7 @@ const constants = require('.././constants.json')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap') const heap = require('heap')
functionBranchTree = new Map() // a tree to store function branch predictions
/** /**
* Generates unique IDs of arbitrary length * Generates unique IDs of arbitrary length
...@@ -46,6 +47,9 @@ function generateExecutor(functionPath, functionHash) { ...@@ -46,6 +47,9 @@ function generateExecutor(functionPath, functionHash) {
} }
function reverseProxy(req, res, functionToResource, resourceMap) { function reverseProxy(req, res, functionToResource, resourceMap) {
if (req.headers["x-resource-id"]) {
branchChainPredictor(req, resourceMap)
}
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let runtime = req.body.runtime let runtime = req.body.runtime
...@@ -139,6 +143,37 @@ function compare(a, b) { ...@@ -139,6 +143,37 @@ function compare(a, b) {
return a.open_request_count - b.open_request_count return a.open_request_count - b.open_request_count
} }
function branchChainPredictor(req, resourceMap) {
let resource_id = req.headers['x-resource-id']
let resource = resourceMap.get(resource_id)
let forwardBranch = req.params.id
if (!functionBranchTree.has(resource.functionHash)) {
let data = {
req_count: 1,
branches: new Map()
}
data.branches.set(forwardBranch, 1)
functionBranchTree.set(resource.functionHash, data)
} else {
let branchInfo = functionBranchTree.get(resource.functionHash)
branchInfo.req_count++
if (branchInfo.branches.has(forwardBranch)) {
let branchProb = branchInfo.branches.get(forwardBranch)
branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0)
branchInfo.branches.set(forwardBranch, branchProb)
} else {
branchInfo.branches.set(forwardBranch, 1.0)
}
for (let [branch, prob] of branchInfo.branches.entries()) {
if (branch !== forwardBranch)
prob *= (branchInfo.req_count - 1)
prob /= branchInfo.req_count
branchInfo.branches.set(branch, prob)
}
}
console.log(functionBranchTree);
}
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger, compare makeid, generateExecutor, reverseProxy, getPort, logger, compare
} }
\ No newline at end of file
'use strict'; 'use strict';
const express = require('express') const express = require('express')
const bodyParser = require('body-parser') const bodyParser = require('body-parser')
let request = require('request')
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime let port = 5000, resource_id, functionHash, runtime
...@@ -8,6 +11,9 @@ resource_id = process.argv[2] ...@@ -8,6 +11,9 @@ resource_id = process.argv[2]
functionHash = process.argv[3] functionHash = process.argv[3]
port = process.argv[4] port = process.argv[4]
runtime = process.argv[5] runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -30,9 +36,16 @@ app.post('/serverless/function/execute/', (req, res) => { ...@@ -30,9 +36,16 @@ app.post('/serverless/function/execute/', (req, res) => {
function executor(payload) { function executor(payload) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
}) })
} }
app.post('/serverless/function/execute/2', (req, res) => {
console.log("2", JSON.stringify(req.headers))
res.send("done")
})
app.listen(port, () => { app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`) console.log(`Resource ${resource_id} Server listening on port ${port}!`)
producer.send( producer.send(
......
{ {
"_from": "express", "_from": "express@^4.17.1",
"_id": "express@4.17.1", "_id": "express@4.17.1",
"_inBundle": false, "_inBundle": false,
"_integrity": "sha512-mHJ9O79RqluphRrcw2X/GTh3k9tVv8YcoyY4Kkh4WDMUYKRZUq0h1o0w2rrrxBqM7VoeUVqgb27xlEMXTnYt4g==", "_integrity": "sha512-mHJ9O79RqluphRrcw2X/GTh3k9tVv8YcoyY4Kkh4WDMUYKRZUq0h1o0w2rrrxBqM7VoeUVqgb27xlEMXTnYt4g==",
"_location": "/express", "_location": "/express",
"_phantomChildren": {}, "_phantomChildren": {},
"_requested": { "_requested": {
"type": "tag", "type": "range",
"registry": true, "registry": true,
"raw": "express", "raw": "express@^4.17.1",
"name": "express", "name": "express",
"escapedName": "express", "escapedName": "express",
"rawSpec": "", "rawSpec": "^4.17.1",
"saveSpec": null, "saveSpec": null,
"fetchSpec": "latest" "fetchSpec": "^4.17.1"
}, },
"_requiredBy": [ "_requiredBy": [
"#USER", "#USER",
...@@ -21,8 +21,8 @@ ...@@ -21,8 +21,8 @@
], ],
"_resolved": "https://registry.npmjs.org/express/-/express-4.17.1.tgz", "_resolved": "https://registry.npmjs.org/express/-/express-4.17.1.tgz",
"_shasum": "4491fc38605cf51f8629d39c2b5d026f98a4c134", "_shasum": "4491fc38605cf51f8629d39c2b5d026f98a4c134",
"_spec": "express", "_spec": "express@^4.17.1",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env", "_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env",
"author": { "author": {
"name": "TJ Holowaychuk", "name": "TJ Holowaychuk",
"email": "tj@vision-media.ca" "email": "tj@vision-media.ca"
......
...@@ -173,7 +173,7 @@ addr.octets // => [192, 168, 1, 1] ...@@ -173,7 +173,7 @@ addr.octets // => [192, 168, 1, 1]
``` ```
`prefixLengthFromSubnetMask()` will return a CIDR prefix length for a valid IPv4 netmask or `prefixLengthFromSubnetMask()` will return a CIDR prefix length for a valid IPv4 netmask or
false if the netmask is not valid. null if the netmask is not valid.
```js ```js
ipaddr.IPv4.parse('255.255.255.240').prefixLengthFromSubnetMask() == 28 ipaddr.IPv4.parse('255.255.255.240').prefixLengthFromSubnetMask() == 28
......
declare module "ipaddr.js" { declare module "ipaddr.js" {
type IPv4Range = 'unicast' | 'unspecified' | 'broadcast' | 'multicast' | 'linkLocal' | 'loopback' | 'carrierGradeNat' | 'private' | 'reserved'; type IPv4Range = 'unicast' | 'unspecified' | 'broadcast' | 'multicast' | 'linkLocal' | 'loopback' | 'carrierGradeNat' | 'private' | 'reserved';
type IPv6Range = 'unicast' | 'unspecified' | 'linkLocal' | 'multicast' | 'loopback' | 'uniqueLocal' | 'ipv4Mapped' | 'rfc6145' | 'rfc6052' | '6to4' | 'teredo' | 'reserved'; type IPv6Range = 'unicast' | 'unspecified' | 'linkLocal' | 'multicast' | 'loopback' | 'uniqueLocal' | 'ipv4Mapped' | 'rfc6145' | 'rfc6052' | '6to4' | 'teredo' | 'reserved';
...@@ -9,23 +6,20 @@ declare module "ipaddr.js" { ...@@ -9,23 +6,20 @@ declare module "ipaddr.js" {
[name: string]: [T, number] | [T, number][]; [name: string]: [T, number] | [T, number][];
} }
// Common methods/properties for IPv4 and IPv6 classes. // Common methods/properties for IPv4 and IPv6 classes.
class IP { class IP {
prefixLengthFromSubnetMask(): number | null;
prefixLengthFromSubnetMask(): number | false;
toByteArray(): number[]; toByteArray(): number[];
toNormalizedString(): string; toNormalizedString(): string;
toString(): string; toString(): string;
} }
namespace Address { namespace Address {
export function isValid(addr: string): boolean; export function isValid(addr: string): boolean;
export function fromByteArray(bytes: number[]): IPv4 | IPv6; export function fromByteArray(bytes: number[]): IPv4 | IPv6;
export function parse(addr: string): IPv4 | IPv6; export function parse(addr: string): IPv4 | IPv6;
export function parseCIDR(mask: string): [IPv4 | IPv6, number]; export function parseCIDR(mask: string): [IPv4 | IPv6, number];
export function process(address: string): IPv4 | IPv6; export function process(addr: string): IPv4 | IPv6;
export function subnetMatch(addr: IPv4, rangeList: RangeList<IPv4>, defaultName?: string): string; export function subnetMatch(addr: IPv4, rangeList: RangeList<IPv4>, defaultName?: string): string;
export function subnetMatch(addr: IPv6, rangeList: RangeList<IPv6>, defaultName?: string): string; export function subnetMatch(addr: IPv6, rangeList: RangeList<IPv6>, defaultName?: string): string;
...@@ -39,6 +33,7 @@ declare module "ipaddr.js" { ...@@ -39,6 +33,7 @@ declare module "ipaddr.js" {
static parseCIDR(addr: string): [IPv4, number]; static parseCIDR(addr: string): [IPv4, number];
static subnetMaskFromPrefixLength(prefix: number): IPv4; static subnetMaskFromPrefixLength(prefix: number): IPv4;
constructor(octets: number[]); constructor(octets: number[]);
octets: number[]
kind(): 'ipv4'; kind(): 'ipv4';
match(addr: IPv4, bits: number): boolean; match(addr: IPv4, bits: number): boolean;
...@@ -55,7 +50,9 @@ declare module "ipaddr.js" { ...@@ -55,7 +50,9 @@ declare module "ipaddr.js" {
static parse(addr: string): IPv6; static parse(addr: string): IPv6;
static parseCIDR(addr: string): [IPv6, number]; static parseCIDR(addr: string): [IPv6, number];
static subnetMaskFromPrefixLength(prefix: number): IPv6; static subnetMaskFromPrefixLength(prefix: number): IPv6;
constructor(octets: number[]); constructor(parts: number[]);
parts: number[]
zoneId?: string
isIPv4MappedAddress(): boolean; isIPv4MappedAddress(): boolean;
kind(): 'ipv6'; kind(): 'ipv6';
......
{ {
"_from": "ipaddr.js@1.9.0", "_from": "ipaddr.js@1.9.1",
"_id": "ipaddr.js@1.9.0", "_id": "ipaddr.js@1.9.1",
"_inBundle": false, "_inBundle": false,
"_integrity": "sha512-M4Sjn6N/+O6/IXSJseKqHoFc+5FdGJ22sXqnjTpdZweHK64MzEPAyQZyEU3R/KRv2GLoa7nNtg/C2Ev6m7z+eA==", "_integrity": "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==",
"_location": "/ipaddr.js", "_location": "/ipaddr.js",
"_phantomChildren": {}, "_phantomChildren": {},
"_requested": { "_requested": {
"type": "version", "type": "version",
"registry": true, "registry": true,
"raw": "ipaddr.js@1.9.0", "raw": "ipaddr.js@1.9.1",
"name": "ipaddr.js", "name": "ipaddr.js",
"escapedName": "ipaddr.js", "escapedName": "ipaddr.js",
"rawSpec": "1.9.0", "rawSpec": "1.9.1",
"saveSpec": null, "saveSpec": null,
"fetchSpec": "1.9.0" "fetchSpec": "1.9.1"
}, },
"_requiredBy": [ "_requiredBy": [
"/proxy-addr" "/proxy-addr"
], ],
"_resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.0.tgz", "_resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz",
"_shasum": "37df74e430a0e47550fe54a2defe30d8acd95f65", "_shasum": "bff38543eeb8984825079ff3a2a8e6cbd46781b3",
"_spec": "ipaddr.js@1.9.0", "_spec": "ipaddr.js@1.9.1",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env/node_modules/proxy-addr", "_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env/node_modules/proxy-addr",
"author": { "author": {
"name": "whitequark", "name": "whitequark",
"email": "whitequark@whitequark.org" "email": "whitequark@whitequark.org"
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
"description": "A library for manipulating IPv4 and IPv6 addresses in JavaScript.", "description": "A library for manipulating IPv4 and IPv6 addresses in JavaScript.",
"devDependencies": { "devDependencies": {
"coffee-script": "~1.12.6", "coffee-script": "~1.12.6",
"nodeunit": ">=0.8.2 <0.8.7", "nodeunit": "^0.11.3",
"uglify-js": "~3.0.19" "uglify-js": "~3.0.19"
}, },
"directories": { "directories": {
...@@ -46,6 +46,7 @@ ...@@ -46,6 +46,7 @@
}, },
"files": [ "files": [
"lib/", "lib/",
"LICENSE",
"ipaddr.min.js" "ipaddr.min.js"
], ],
"homepage": "https://github.com/whitequark/ipaddr.js#readme", "homepage": "https://github.com/whitequark/ipaddr.js#readme",
...@@ -65,5 +66,5 @@ ...@@ -65,5 +66,5 @@
"test": "cake build test" "test": "cake build test"
}, },
"types": "./lib/ipaddr.js.d.ts", "types": "./lib/ipaddr.js.d.ts",
"version": "1.9.0" "version": "1.9.1"
} }
2.0.6 / 2020-02-24
==================
* deps: ipaddr.js@1.9.1
2.0.5 / 2019-04-16 2.0.5 / 2019-04-16
================== ==================
......
...@@ -99,7 +99,7 @@ function compile (val) { ...@@ -99,7 +99,7 @@ function compile (val) {
for (var i = 0; i < trust.length; i++) { for (var i = 0; i < trust.length; i++) {
val = trust[i] val = trust[i]
if (!IP_RANGES.hasOwnProperty(val)) { if (!Object.prototype.hasOwnProperty.call(IP_RANGES, val)) {
continue continue
} }
......
{ {
"_from": "proxy-addr@~2.0.5", "_from": "proxy-addr@~2.0.5",
"_id": "proxy-addr@2.0.5", "_id": "proxy-addr@2.0.6",
"_inBundle": false, "_inBundle": false,
"_integrity": "sha512-t/7RxHXPH6cJtP0pRG6smSr9QJidhB+3kXu0KgXnbGYMgzEnUxRQ4/LDdfOwZEMyIh3/xHb8PX3t+lfL9z+YVQ==", "_integrity": "sha512-dh/frvCBVmSsDYzw6n926jv974gddhkFPfiN8hPOi30Wax25QZyZEGveluCgliBnqmuM+UJmBErbAUFIoDbjOw==",
"_location": "/proxy-addr", "_location": "/proxy-addr",
"_phantomChildren": {}, "_phantomChildren": {},
"_requested": { "_requested": {
...@@ -18,10 +18,10 @@ ...@@ -18,10 +18,10 @@
"_requiredBy": [ "_requiredBy": [
"/express" "/express"
], ],
"_resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.5.tgz", "_resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.6.tgz",
"_shasum": "34cbd64a2d81f4b1fd21e76f9f06c8a45299ee34", "_shasum": "fdc2336505447d3f2f2c638ed272caf614bbb2bf",
"_spec": "proxy-addr@~2.0.5", "_spec": "proxy-addr@~2.0.5",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env/node_modules/express", "_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env/node_modules/express",
"author": { "author": {
"name": "Douglas Christopher Wilson", "name": "Douglas Christopher Wilson",
"email": "doug@somethingdoug.com" "email": "doug@somethingdoug.com"
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
"bundleDependencies": false, "bundleDependencies": false,
"dependencies": { "dependencies": {
"forwarded": "~0.1.2", "forwarded": "~0.1.2",
"ipaddr.js": "1.9.0" "ipaddr.js": "1.9.1"
}, },
"deprecated": false, "deprecated": false,
"description": "Determine address of proxied request", "description": "Determine address of proxied request",
...@@ -40,15 +40,15 @@ ...@@ -40,15 +40,15 @@
"beautify-benchmark": "0.2.4", "beautify-benchmark": "0.2.4",
"benchmark": "2.1.4", "benchmark": "2.1.4",
"deep-equal": "1.0.1", "deep-equal": "1.0.1",
"eslint": "5.16.0", "eslint": "6.8.0",
"eslint-config-standard": "12.0.0", "eslint-config-standard": "14.1.0",
"eslint-plugin-import": "2.17.1", "eslint-plugin-import": "2.20.1",
"eslint-plugin-markdown": "1.0.0", "eslint-plugin-markdown": "1.0.1",
"eslint-plugin-node": "8.0.1", "eslint-plugin-node": "11.0.0",
"eslint-plugin-promise": "4.1.1", "eslint-plugin-promise": "4.2.1",
"eslint-plugin-standard": "4.0.0", "eslint-plugin-standard": "4.0.1",
"mocha": "6.1.3", "mocha": "7.0.1",
"nyc": "13.3.0" "nyc": "15.0.0"
}, },
"engines": { "engines": {
"node": ">= 0.10" "node": ">= 0.10"
...@@ -78,5 +78,5 @@ ...@@ -78,5 +78,5 @@
"test-cov": "nyc --reporter=text npm test", "test-cov": "nyc --reporter=text npm test",
"test-travis": "nyc --reporter=html --reporter=text npm test" "test-travis": "nyc --reporter=html --reporter=text npm test"
}, },
"version": "2.0.5" "version": "2.0.6"
} }
...@@ -11,6 +11,8 @@ ...@@ -11,6 +11,8 @@
"dependencies": { "dependencies": {
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"express": "^4.17.1", "express": "^4.17.1",
"morgan": "^1.9.1" "kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"request": "^2.88.2"
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment