Commit 99bb6f85 authored by Naman Dixit's avatar Naman Dixit

Merge branch 'master' of https://git.cse.iitb.ac.in/synerg/xanadu

parents 0ec3406c 901024e7
......@@ -3,4 +3,6 @@ node_modules
package-lock.json
firecracker*
secrets.json
grunt
\ No newline at end of file
grunt
.clinic
rm_dummy.js
\ No newline at end of file
......@@ -6,5 +6,17 @@
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON",
"couchdb_host": "10.129.6.5:5984",
"couchdb_db_name": "serverless"
"couchdb_db_name": "serverless",
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"speculative_deployment": true
}
\ No newline at end of file
......@@ -57,7 +57,7 @@ function runProcess(local_repository, metadata) {
});
process.on('close', (code) => {
resolve(code);
resolve(process.pid);
logger.info(`Process Environment with resource_id ${resource_id} blown`);
});
})
......@@ -100,7 +100,7 @@ function runContainer(metadata) {
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
result += data;
resolve(result);
resolve(resource_id);
});
process.stderr.on('data', (data) => {
......@@ -126,7 +126,7 @@ function runContainer(metadata) {
logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
resolve(result);
resolve(resource_id);
});
process.stderr.on('data', (data) => {
......
......@@ -55,7 +55,7 @@ var download = function (url, dest, cb) {
console.log(url);
if (!fs.existsSync(dest)) {
var file = fs.createWriteStream(dest);
var request = http.get(url, function (response) {
var request = https.get(url, function (response) {
response.pipe(file);
file.on('finish', function () {
file.close(cb); // close() is async, call cb after close completes.
......
......@@ -17,6 +17,7 @@
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
"redis": "^2.8.0",
"request": "^2.88.2",
"winston": "^3.2.1"
}
}
This diff is collapsed.
......@@ -3,6 +3,8 @@ const fs = require('fs')
const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const constants = require('.././constants.json')
const { createLogger, format, transports } = winston;
const heap = require('heap')
......@@ -43,8 +45,8 @@ function generateExecutor(functionPath, functionHash) {
return hash
}
function reverseProxy(req, res, functionToResource, resourceMap) {
function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) {
branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree)
return new Promise((resolve, reject) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
......@@ -54,14 +56,14 @@ function reverseProxy(req, res, functionToResource, resourceMap) {
let functionHeap = functionToResource.get(id)
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
"\n forwarding via reverse proxy to: " + JSON.stringify(resource));
// logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
// "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.metric += 1
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare)
logger.info(functionHeap);
// logger.info(functionHeap);
var options = {
method: 'POST',
......@@ -77,15 +79,13 @@ function reverseProxy(req, res, functionToResource, resourceMap) {
.then(function (parsedBody) {
res.json(parsedBody)
forwardTo.metric -= 1
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
console.log(functionHeap);
resolve()
})
.catch(function (err) {
forwardTo.metric -= 1
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
console.log(functionHeap);
logger.error("error" + err.error.errno);
res.json(err.message).status(err.statusCode)
resolve()
......@@ -136,9 +136,133 @@ const logger = winston.createLogger({
});
function compare(a, b) {
return a.metric - b.metric
return a.open_request_count - b.open_request_count
}
function branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree) {
// console.log(req.headers['x-resource-id']);
if (req.headers['x-resource-id'] === undefined) {
let functionHash = req.params.id
if (functionBranchTree.has(functionHash)) {
let branchInfo = functionBranchTree.get(functionHash)
branchInfo.req_count++
} else {
let data = {
req_count: 1,
parent: true,
branches: new Map()
}
functionBranchTree.set(functionHash, data)
}
} else {
let resource_id = req.headers['x-resource-id']
let resource = resourceMap.get(resource_id)
let forwardBranch = req.params.id
if (!functionBranchTree.has(resource.functionHash)) {
let data = {
req_count: 1,
parent: false,
branches: new Map()
}
data.branches.set(forwardBranch, 1)
functionBranchTree.set(resource.functionHash, data)
} else {
let branchInfo = functionBranchTree.get(resource.functionHash)
if (!branchInfo.parent)
branchInfo.req_count++
if (branchInfo.branches.has(forwardBranch)) {
let branchProb = branchInfo.branches.get(forwardBranch)
branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0)
branchInfo.branches.set(forwardBranch, branchProb)
} else {
branchInfo.branches.set(forwardBranch, 1.0)
}
for (let [branch, prob] of branchInfo.branches.entries()) {
if (branch !== forwardBranch)
prob *= (branchInfo.req_count - 1)
prob /= branchInfo.req_count
branchInfo.branches.set(branch, prob)
}
}
}
// console.log("branch tree", functionBranchTree);
}
function viterbi(functionBranchTree) {
functionBranchTree.forEach((metadata, node) => {
if (metadata.parent && metadata.req_count % 5 == 0) {
let path = []
let parents = [[node, {
prob: 1,
metadata
}]]
path.push({node, probability: 1})
let siblings = new Map()
while(parents.length > 0) {
// console.log("parent_group", parents);
for (const parent of parents) {
// console.log("=========begin==========\n",parent, "\n=============end============");
// console.log(parent[1].metadata);
if (parent[1].metadata === undefined)
continue
let forwardBranches = parent[1].metadata.branches
// console.log(forwardBranches);
let parentProbability = parent[1].prob
forwardBranches.forEach((branchProb, subNode) => {
let probability = 0
if (siblings.has(subNode))
probability = siblings.get(subNode)
probability += branchProb * parentProbability
// console.log("prob", probability);
siblings.set(subNode, probability)
})
// console.log("siblings", siblings);
}
parents = []
let maxSibling, maxProb = 0
siblings.forEach((prob, sibling) => {
if (prob > maxProb) {
maxSibling = sibling
maxProb = prob
}
})
parentIDs = Array.from( siblings.keys() );
for (const id of parentIDs) {
let metadata = functionBranchTree.get(id)
parents.push([
id, {
prob: siblings.get(id),
metadata
}
])
}
if (maxSibling !== undefined)
path.push({node: maxSibling, probability: maxProb})
siblings = new Map()
}
if (path.length > 0)
console.log("path", path);
metadata.mle_path = path
}
});
}
module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger, compare
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi
}
\ No newline at end of file
......@@ -15,6 +15,7 @@
"express-fileupload": "^1.1.6",
"heap": "^0.2.6",
"isolated-vm": "^3.0.0",
"kafka-logger": "^7.1.0",
"kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
......@@ -23,6 +24,7 @@
"request": "^2.88.0",
"request-promise": "^4.2.5",
"save": "^2.4.0",
"swagger-stats": "^0.95.16",
"winston": "^3.2.1"
}
}
'use strict';
const express = require('express')
const bodyParser = require('body-parser')
let request = require('request')
const process = require('process')
const app = express()
let port = 5000, resource_id, functionHash, runtime
let port = 5000, resource_id, functionHash, runtime, idleTime = 30
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -19,17 +25,26 @@ let kafka = require('kafka-node'),
app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json())
let lastRequest = Date.now()
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/worker/timeout', (req, res) => {
idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
})
function executor(payload) {
return new Promise((resolve, reject) => {
})
}
......@@ -38,20 +53,26 @@ app.listen(port, () => {
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id }),
messages: JSON.stringify({ functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
})
function shouldDie() {
if (Date.now() - lastRequest > 30 * 1000) {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest
})
console.log("Idle for too long. Exiting");
producer.send(
[{
topic: "removeWorker",
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id })
}], () => {
[
{topic: "removeWorker", messages: message }
], () => {
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
......
{
"_from": "express",
"_from": "express@^4.17.1",
"_id": "express@4.17.1",
"_inBundle": false,
"_integrity": "sha512-mHJ9O79RqluphRrcw2X/GTh3k9tVv8YcoyY4Kkh4WDMUYKRZUq0h1o0w2rrrxBqM7VoeUVqgb27xlEMXTnYt4g==",
"_location": "/express",
"_phantomChildren": {},
"_requested": {
"type": "tag",
"type": "range",
"registry": true,
"raw": "express",
"raw": "express@^4.17.1",
"name": "express",
"escapedName": "express",
"rawSpec": "",
"rawSpec": "^4.17.1",
"saveSpec": null,
"fetchSpec": "latest"
"fetchSpec": "^4.17.1"
},
"_requiredBy": [
"#USER",
......@@ -21,8 +21,8 @@
],
"_resolved": "https://registry.npmjs.org/express/-/express-4.17.1.tgz",
"_shasum": "4491fc38605cf51f8629d39c2b5d026f98a4c134",
"_spec": "express",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env",
"_spec": "express@^4.17.1",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env",
"author": {
"name": "TJ Holowaychuk",
"email": "tj@vision-media.ca"
......
......@@ -173,7 +173,7 @@ addr.octets // => [192, 168, 1, 1]
```
`prefixLengthFromSubnetMask()` will return a CIDR prefix length for a valid IPv4 netmask or
false if the netmask is not valid.
null if the netmask is not valid.
```js
ipaddr.IPv4.parse('255.255.255.240').prefixLengthFromSubnetMask() == 28
......
declare module "ipaddr.js" {
type IPv4Range = 'unicast' | 'unspecified' | 'broadcast' | 'multicast' | 'linkLocal' | 'loopback' | 'carrierGradeNat' | 'private' | 'reserved';
type IPv6Range = 'unicast' | 'unspecified' | 'linkLocal' | 'multicast' | 'loopback' | 'uniqueLocal' | 'ipv4Mapped' | 'rfc6145' | 'rfc6052' | '6to4' | 'teredo' | 'reserved';
......@@ -9,23 +6,20 @@ declare module "ipaddr.js" {
[name: string]: [T, number] | [T, number][];
}
// Common methods/properties for IPv4 and IPv6 classes.
class IP {
prefixLengthFromSubnetMask(): number | false;
prefixLengthFromSubnetMask(): number | null;
toByteArray(): number[];
toNormalizedString(): string;
toString(): string;
}
namespace Address {
export function isValid(addr: string): boolean;
export function fromByteArray(bytes: number[]): IPv4 | IPv6;
export function parse(addr: string): IPv4 | IPv6;
export function parseCIDR(mask: string): [IPv4 | IPv6, number];
export function process(address: string): IPv4 | IPv6;
export function process(addr: string): IPv4 | IPv6;
export function subnetMatch(addr: IPv4, rangeList: RangeList<IPv4>, defaultName?: string): string;
export function subnetMatch(addr: IPv6, rangeList: RangeList<IPv6>, defaultName?: string): string;
......@@ -39,6 +33,7 @@ declare module "ipaddr.js" {
static parseCIDR(addr: string): [IPv4, number];
static subnetMaskFromPrefixLength(prefix: number): IPv4;
constructor(octets: number[]);
octets: number[]
kind(): 'ipv4';
match(addr: IPv4, bits: number): boolean;
......@@ -55,7 +50,9 @@ declare module "ipaddr.js" {
static parse(addr: string): IPv6;
static parseCIDR(addr: string): [IPv6, number];
static subnetMaskFromPrefixLength(prefix: number): IPv6;
constructor(octets: number[]);
constructor(parts: number[]);
parts: number[]
zoneId?: string
isIPv4MappedAddress(): boolean;
kind(): 'ipv6';
......
{
"_from": "ipaddr.js@1.9.0",
"_id": "ipaddr.js@1.9.0",
"_from": "ipaddr.js@1.9.1",
"_id": "ipaddr.js@1.9.1",
"_inBundle": false,
"_integrity": "sha512-M4Sjn6N/+O6/IXSJseKqHoFc+5FdGJ22sXqnjTpdZweHK64MzEPAyQZyEU3R/KRv2GLoa7nNtg/C2Ev6m7z+eA==",
"_integrity": "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==",
"_location": "/ipaddr.js",
"_phantomChildren": {},
"_requested": {
"type": "version",
"registry": true,
"raw": "ipaddr.js@1.9.0",
"raw": "ipaddr.js@1.9.1",
"name": "ipaddr.js",
"escapedName": "ipaddr.js",
"rawSpec": "1.9.0",
"rawSpec": "1.9.1",
"saveSpec": null,
"fetchSpec": "1.9.0"
"fetchSpec": "1.9.1"
},
"_requiredBy": [
"/proxy-addr"
],
"_resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.0.tgz",
"_shasum": "37df74e430a0e47550fe54a2defe30d8acd95f65",
"_spec": "ipaddr.js@1.9.0",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env/node_modules/proxy-addr",
"_resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz",
"_shasum": "bff38543eeb8984825079ff3a2a8e6cbd46781b3",
"_spec": "ipaddr.js@1.9.1",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env/node_modules/proxy-addr",
"author": {
"name": "whitequark",
"email": "whitequark@whitequark.org"
......@@ -35,7 +35,7 @@
"description": "A library for manipulating IPv4 and IPv6 addresses in JavaScript.",
"devDependencies": {
"coffee-script": "~1.12.6",
"nodeunit": ">=0.8.2 <0.8.7",
"nodeunit": "^0.11.3",
"uglify-js": "~3.0.19"
},
"directories": {
......@@ -46,6 +46,7 @@
},
"files": [
"lib/",
"LICENSE",
"ipaddr.min.js"
],
"homepage": "https://github.com/whitequark/ipaddr.js#readme",
......@@ -65,5 +66,5 @@
"test": "cake build test"
},
"types": "./lib/ipaddr.js.d.ts",
"version": "1.9.0"
"version": "1.9.1"
}
2.0.6 / 2020-02-24
==================
* deps: ipaddr.js@1.9.1
2.0.5 / 2019-04-16
==================
......
......@@ -99,7 +99,7 @@ function compile (val) {
for (var i = 0; i < trust.length; i++) {
val = trust[i]
if (!IP_RANGES.hasOwnProperty(val)) {
if (!Object.prototype.hasOwnProperty.call(IP_RANGES, val)) {
continue
}
......
{
"_from": "proxy-addr@~2.0.5",
"_id": "proxy-addr@2.0.5",
"_id": "proxy-addr@2.0.6",
"_inBundle": false,
"_integrity": "sha512-t/7RxHXPH6cJtP0pRG6smSr9QJidhB+3kXu0KgXnbGYMgzEnUxRQ4/LDdfOwZEMyIh3/xHb8PX3t+lfL9z+YVQ==",
"_integrity": "sha512-dh/frvCBVmSsDYzw6n926jv974gddhkFPfiN8hPOi30Wax25QZyZEGveluCgliBnqmuM+UJmBErbAUFIoDbjOw==",
"_location": "/proxy-addr",
"_phantomChildren": {},
"_requested": {
......@@ -18,10 +18,10 @@
"_requiredBy": [
"/express"
],
"_resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.5.tgz",
"_shasum": "34cbd64a2d81f4b1fd21e76f9f06c8a45299ee34",
"_resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.6.tgz",
"_shasum": "fdc2336505447d3f2f2c638ed272caf614bbb2bf",
"_spec": "proxy-addr@~2.0.5",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env/node_modules/express",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env/node_modules/express",
"author": {
"name": "Douglas Christopher Wilson",
"email": "doug@somethingdoug.com"
......@@ -32,7 +32,7 @@
"bundleDependencies": false,
"dependencies": {
"forwarded": "~0.1.2",
"ipaddr.js": "1.9.0"
"ipaddr.js": "1.9.1"
},
"deprecated": false,
"description": "Determine address of proxied request",
......@@ -40,15 +40,15 @@
"beautify-benchmark": "0.2.4",
"benchmark": "2.1.4",
"deep-equal": "1.0.1",
"eslint": "5.16.0",
"eslint-config-standard": "12.0.0",
"eslint-plugin-import": "2.17.1",
"eslint-plugin-markdown": "1.0.0",
"eslint-plugin-node": "8.0.1",
"eslint-plugin-promise": "4.1.1",
"eslint-plugin-standard": "4.0.0",
"mocha": "6.1.3",
"nyc": "13.3.0"
"eslint": "6.8.0",
"eslint-config-standard": "14.1.0",
"eslint-plugin-import": "2.20.1",
"eslint-plugin-markdown": "1.0.1",
"eslint-plugin-node": "11.0.0",
"eslint-plugin-promise": "4.2.1",
"eslint-plugin-standard": "4.0.1",
"mocha": "7.0.1",
"nyc": "15.0.0"
},
"engines": {
"node": ">= 0.10"
......@@ -78,5 +78,5 @@
"test-cov": "nyc --reporter=text npm test",
"test-travis": "nyc --reporter=html --reporter=text npm test"
},
"version": "2.0.5"
"version": "2.0.6"
}
......@@ -11,6 +11,8 @@
"dependencies": {
"body-parser": "^1.19.0",
"express": "^4.17.1",
"morgan": "^1.9.1"
"kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"request": "^2.88.2"
}
}
{
"name": "xanadu",
"uriPath": "/xanadu"
}
\ No newline at end of file
cJSON @ f790e17b
Subproject commit f790e17b6cecef030c4eda811149d238c2085fcf
nlib @ 75bc1a11
Subproject commit 75bc1a11e2a10cf249f566b40c85d6526c16f123
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment