Commit c945c9a1 authored by Nilanjan Daw's avatar Nilanjan Daw

Merge branch 'function_chain_predict'

parents e349248d e9cfe3d5
{"id":"192.168.31.51","master_node":"10.129.6.5"}
\ No newline at end of file
{"id":"10.196.6.51","master_node":"10.129.6.5"}
\ No newline at end of file
......@@ -57,7 +57,7 @@ function runProcess(local_repository, metadata) {
});
process.on('close', (code) => {
resolve(code);
resolve(process.pid);
logger.info(`Process Environment with resource_id ${resource_id} blown`);
});
})
......@@ -100,7 +100,7 @@ function runContainer(metadata) {
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
result += data;
resolve(result);
resolve(resource_id);
});
process.stderr.on('data', (data) => {
......@@ -126,7 +126,7 @@ function runContainer(metadata) {
logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
resolve(result);
resolve(resource_id);
});
process.stderr.on('data', (data) => {
......
......@@ -55,7 +55,7 @@ var download = function (url, dest, cb) {
console.log(url);
if (!fs.existsSync(dest)) {
var file = fs.createWriteStream(dest);
var request = http.get(url, function (response) {
var request = https.get(url, function (response) {
response.pipe(file);
file.on('finish', function () {
file.close(cb); // close() is async, call cb after close completes.
......
......@@ -17,6 +17,7 @@
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
"redis": "^2.8.0",
"request": "^2.88.2",
"winston": "^3.2.1"
}
}
......@@ -25,8 +25,9 @@ let log_channel = constants.log_channel
let usedPort = new Map(), // TODO: remove after integration with RM
db = new Map(), // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map() // a function to resource map. Each map contains a minheap of
functionToResource = new Map(), // a function to resource map. Each map contains a minheap of
// resources associated with the function
workerNodes = new Map()
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -41,7 +42,7 @@ let kafka = require('kafka-node'),
{ topic: 'heartbeat' }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: "deployed" }, // receives deployment confirmation from workers
{ topic: "removeWorker" }, // received when a executor environment is blown at the worker
{ topic: "RESPONSE_RM_2_DM_DUMMY" }, // receives deployment details from RM
{ topic: "RESPONSE_RM_2_DM" }, // receives deployment details from RM
{ topic: "hscale" } // receives signals for horizontal scaling
],
[
......@@ -60,7 +61,6 @@ app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec }));
let requestQueue = []
let workerNodes = []
const WINDOW_SIZE = 10
const port = constants.master_port
......@@ -150,9 +150,9 @@ function deployContainer(path, imageName) {
fs.writeFile('./repository/Dockerfile',
`FROM node:latest
WORKDIR /app
COPY package.json /app
RUN npm install
COPY . /app
COPY ./worker_env/package.json /app
ADD ./worker_env/node_modules /app/node_modules
COPY ${imageName}.js /app
ENTRYPOINT ["node", "${imageName}.js"]`
, function (err) {
if (err) {
......@@ -204,6 +204,7 @@ function deployContainer(path, imageName) {
* REST API to receive execute requests
*/
app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
if (functionToResource.has(id)) {
......@@ -258,10 +259,11 @@ function dispatch() {
let payloadToRM = [{
topic: "request", // changing from REQUEST_DM_2_RM
topic: "REQUEST_DM_2_RM", // changing from REQUEST_DM_2_RM
messages: JSON.stringify({
resource_id,
"memory": 332,
timestamp: Date.now()
}),
partition: 0
}]
......@@ -279,9 +281,6 @@ function dispatch() {
}
}
function getAddress() {
return workerNodes[Math.floor(Math.random() * workerNodes.length)];
}
function postDeploy(message) {
logger.info("Deployed Resource: " + JSON.stringify(message));
......@@ -328,12 +327,14 @@ function postDeploy(message) {
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
// type: "deployment_launch",
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
entity_id: message.entity_id,
"reason": "deployment",
"status": true,
"timestamp": date.toISOString()
"timestamp": Date.now()
}),
partition: 0
}]
......@@ -363,6 +364,7 @@ consumer.on('message', function (message) {
let topic = message.topic
message = message.value
// console.log(topic, message)
if (topic === "response") {
logger.info("response " + message);
......@@ -370,9 +372,10 @@ consumer.on('message', function (message) {
} else if (topic === "heartbeat") {
message = JSON.parse(message)
if (Date.now() - message.timestamp < 1000)
if (workerNodes.indexOf(message.address) === -1) {
workerNodes.push(message.address)
logger.warn("New worker discovered. Worker List: " + workerNodes)
if (!workerNodes.has(message.address)) {
workerNodes.set(message.address, message.timestamp)
logger.warn("New worker discovered. Worker List: ")
logger.warn(workerNodes)
}
} else if (topic == "deployed") {
try {
......@@ -439,15 +442,21 @@ consumer.on('message', function (message) {
console.log("sent rm");
})
} else if (topic == "RESPONSE_RM_2_DM_DUMMY") {
} else if (topic == "RESPONSE_RM_2_DM") {
logger.info("Response from RM: " + message);
message = JSON.parse(message)
let resourceChoice = message.grunts[0]
let resourceChoice = message.nodes[0]
if (resourceMap.has(message.resource_id)) {
let resource = resourceMap.get(message.resource_id)
if (typeof resourceChoice === 'string') {
resource.port = libSupport.getPort(usedPort)
resource.node_id = resourceChoice
} else {
resource.port = (resourceChoice.port) ? resourceChoice.port : libSupport.getPort(usedPort)
resource.node_id = resourceChoice.node_id
}
let payload = [{
topic: resource.node_id,
messages: JSON.stringify({
......
......@@ -7,6 +7,7 @@ const constants = require('.././constants.json')
const { createLogger, format, transports } = winston;
const heap = require('heap')
functionBranchTree = new Map() // a tree to store function branch predictions
/**
* Generates unique IDs of arbitrary length
......@@ -46,7 +47,7 @@ function generateExecutor(functionPath, functionHash) {
}
function reverseProxy(req, res, functionToResource, resourceMap) {
branchChainPredictor(req, resourceMap)
return new Promise((resolve, reject) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
......@@ -139,6 +140,127 @@ function compare(a, b) {
return a.open_request_count - b.open_request_count
}
function branchChainPredictor(req, resourceMap) {
// console.log(req.headers['x-resource-id']);
if (req.headers['x-resource-id'] === undefined) {
let functionHash = req.params.id
if (functionBranchTree.has(functionHash)) {
let branchInfo = functionBranchTree.get(functionHash)
branchInfo.req_count++
} else {
let data = {
req_count: 1,
parent: true,
branches: new Map()
}
functionBranchTree.set(functionHash, data)
}
} else {
let resource_id = req.headers['x-resource-id']
let resource = resourceMap.get(resource_id)
let forwardBranch = req.params.id
if (!functionBranchTree.has(resource.functionHash)) {
let data = {
req_count: 1,
parent: false,
branches: new Map()
}
data.branches.set(forwardBranch, 1)
functionBranchTree.set(resource.functionHash, data)
} else {
let branchInfo = functionBranchTree.get(resource.functionHash)
if (!branchInfo.parent)
branchInfo.req_count++
if (branchInfo.branches.has(forwardBranch)) {
let branchProb = branchInfo.branches.get(forwardBranch)
branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0)
branchInfo.branches.set(forwardBranch, branchProb)
} else {
branchInfo.branches.set(forwardBranch, 1.0)
}
for (let [branch, prob] of branchInfo.branches.entries()) {
if (branch !== forwardBranch)
prob *= (branchInfo.req_count - 1)
prob /= branchInfo.req_count
branchInfo.branches.set(branch, prob)
}
}
}
console.log("branch tree", functionBranchTree);
}
function viterbi() {
let path = []
functionBranchTree.forEach((metadata, node) => {
if (metadata.parent) {
let parents = [[node, {
prob: 1,
metadata
}]]
path.push({node, probability: 1})
let siblings = new Map()
while(parents.length > 0) {
// console.log("parent_group", parents);
for (const parent of parents) {
// console.log("=========begin==========\n",parent, "\n=============end============");
// console.log(parent[1].metadata);
if (parent[1].metadata === undefined)
continue
let forwardBranches = parent[1].metadata.branches
// console.log(forwardBranches);
let parentProbability = parent[1].prob
forwardBranches.forEach((branchProb, subNode) => {
let probability = 0
if (siblings.has(subNode))
probability = siblings.get(subNode)
probability += branchProb * parentProbability
// console.log("prob", probability);
siblings.set(subNode, probability)
})
// console.log("siblings", siblings);
}
parents = []
let maxSibling, maxProb = 0
siblings.forEach((prob, sibling) => {
if (prob > maxProb) {
maxSibling = sibling
maxProb = prob
}
})
parentIDs = Array.from( siblings.keys() );
for (const id of parentIDs) {
let metadata = functionBranchTree.get(id)
parents.push([
id, {
prob: siblings.get(id),
metadata
}
])
}
if (maxSibling !== undefined)
path.push({node: maxSibling, probability: maxProb})
siblings = new Map()
}
}
});
if (path.length > 0)
console.log("path", path);
}
setInterval(viterbi, 5000)
module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger, compare
}
\ No newline at end of file
'use strict';
const express = require('express')
const bodyParser = require('body-parser')
let request = require('request')
const process = require('process')
const app = express()
let port = 5000, resource_id, functionHash, runtime
......@@ -8,6 +11,9 @@ resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -30,15 +36,22 @@ app.post('/serverless/function/execute/', (req, res) => {
function executor(payload) {
return new Promise((resolve, reject) => {
})
}
app.post('/serverless/function/execute/2', (req, res) => {
console.log("2", JSON.stringify(req.headers))
res.send("done")
})
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id }),
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
})
......@@ -50,7 +63,7 @@ function shouldDie() {
producer.send(
[{
topic: "removeWorker",
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id })
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id, entity_id: process.pid})
}], () => {
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
......
{
"_from": "express",
"_from": "express@^4.17.1",
"_id": "express@4.17.1",
"_inBundle": false,
"_integrity": "sha512-mHJ9O79RqluphRrcw2X/GTh3k9tVv8YcoyY4Kkh4WDMUYKRZUq0h1o0w2rrrxBqM7VoeUVqgb27xlEMXTnYt4g==",
"_location": "/express",
"_phantomChildren": {},
"_requested": {
"type": "tag",
"type": "range",
"registry": true,
"raw": "express",
"raw": "express@^4.17.1",
"name": "express",
"escapedName": "express",
"rawSpec": "",
"rawSpec": "^4.17.1",
"saveSpec": null,
"fetchSpec": "latest"
"fetchSpec": "^4.17.1"
},
"_requiredBy": [
"#USER",
......@@ -21,8 +21,8 @@
],
"_resolved": "https://registry.npmjs.org/express/-/express-4.17.1.tgz",
"_shasum": "4491fc38605cf51f8629d39c2b5d026f98a4c134",
"_spec": "express",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env",
"_spec": "express@^4.17.1",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env",
"author": {
"name": "TJ Holowaychuk",
"email": "tj@vision-media.ca"
......
......@@ -173,7 +173,7 @@ addr.octets // => [192, 168, 1, 1]
```
`prefixLengthFromSubnetMask()` will return a CIDR prefix length for a valid IPv4 netmask or
false if the netmask is not valid.
null if the netmask is not valid.
```js
ipaddr.IPv4.parse('255.255.255.240').prefixLengthFromSubnetMask() == 28
......
declare module "ipaddr.js" {
type IPv4Range = 'unicast' | 'unspecified' | 'broadcast' | 'multicast' | 'linkLocal' | 'loopback' | 'carrierGradeNat' | 'private' | 'reserved';
type IPv6Range = 'unicast' | 'unspecified' | 'linkLocal' | 'multicast' | 'loopback' | 'uniqueLocal' | 'ipv4Mapped' | 'rfc6145' | 'rfc6052' | '6to4' | 'teredo' | 'reserved';
......@@ -9,23 +6,20 @@ declare module "ipaddr.js" {
[name: string]: [T, number] | [T, number][];
}
// Common methods/properties for IPv4 and IPv6 classes.
class IP {
prefixLengthFromSubnetMask(): number | false;
prefixLengthFromSubnetMask(): number | null;
toByteArray(): number[];
toNormalizedString(): string;
toString(): string;
}
namespace Address {
export function isValid(addr: string): boolean;
export function fromByteArray(bytes: number[]): IPv4 | IPv6;
export function parse(addr: string): IPv4 | IPv6;
export function parseCIDR(mask: string): [IPv4 | IPv6, number];
export function process(address: string): IPv4 | IPv6;
export function process(addr: string): IPv4 | IPv6;
export function subnetMatch(addr: IPv4, rangeList: RangeList<IPv4>, defaultName?: string): string;
export function subnetMatch(addr: IPv6, rangeList: RangeList<IPv6>, defaultName?: string): string;
......@@ -39,6 +33,7 @@ declare module "ipaddr.js" {
static parseCIDR(addr: string): [IPv4, number];
static subnetMaskFromPrefixLength(prefix: number): IPv4;
constructor(octets: number[]);
octets: number[]
kind(): 'ipv4';
match(addr: IPv4, bits: number): boolean;
......@@ -55,7 +50,9 @@ declare module "ipaddr.js" {
static parse(addr: string): IPv6;
static parseCIDR(addr: string): [IPv6, number];
static subnetMaskFromPrefixLength(prefix: number): IPv6;
constructor(octets: number[]);
constructor(parts: number[]);
parts: number[]
zoneId?: string
isIPv4MappedAddress(): boolean;
kind(): 'ipv6';
......
{
"_from": "ipaddr.js@1.9.0",
"_id": "ipaddr.js@1.9.0",
"_from": "ipaddr.js@1.9.1",
"_id": "ipaddr.js@1.9.1",
"_inBundle": false,
"_integrity": "sha512-M4Sjn6N/+O6/IXSJseKqHoFc+5FdGJ22sXqnjTpdZweHK64MzEPAyQZyEU3R/KRv2GLoa7nNtg/C2Ev6m7z+eA==",
"_integrity": "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==",
"_location": "/ipaddr.js",
"_phantomChildren": {},
"_requested": {
"type": "version",
"registry": true,
"raw": "ipaddr.js@1.9.0",
"raw": "ipaddr.js@1.9.1",
"name": "ipaddr.js",
"escapedName": "ipaddr.js",
"rawSpec": "1.9.0",
"rawSpec": "1.9.1",
"saveSpec": null,
"fetchSpec": "1.9.0"
"fetchSpec": "1.9.1"
},
"_requiredBy": [
"/proxy-addr"
],
"_resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.0.tgz",
"_shasum": "37df74e430a0e47550fe54a2defe30d8acd95f65",
"_spec": "ipaddr.js@1.9.0",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env/node_modules/proxy-addr",
"_resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz",
"_shasum": "bff38543eeb8984825079ff3a2a8e6cbd46781b3",
"_spec": "ipaddr.js@1.9.1",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env/node_modules/proxy-addr",
"author": {
"name": "whitequark",
"email": "whitequark@whitequark.org"
......@@ -35,7 +35,7 @@
"description": "A library for manipulating IPv4 and IPv6 addresses in JavaScript.",
"devDependencies": {
"coffee-script": "~1.12.6",
"nodeunit": ">=0.8.2 <0.8.7",
"nodeunit": "^0.11.3",
"uglify-js": "~3.0.19"
},
"directories": {
......@@ -46,6 +46,7 @@
},
"files": [
"lib/",
"LICENSE",
"ipaddr.min.js"
],
"homepage": "https://github.com/whitequark/ipaddr.js#readme",
......@@ -65,5 +66,5 @@
"test": "cake build test"
},
"types": "./lib/ipaddr.js.d.ts",
"version": "1.9.0"
"version": "1.9.1"
}
2.0.6 / 2020-02-24
==================
* deps: ipaddr.js@1.9.1
2.0.5 / 2019-04-16
==================
......
......@@ -99,7 +99,7 @@ function compile (val) {
for (var i = 0; i < trust.length; i++) {
val = trust[i]
if (!IP_RANGES.hasOwnProperty(val)) {
if (!Object.prototype.hasOwnProperty.call(IP_RANGES, val)) {
continue
}
......
{
"_from": "proxy-addr@~2.0.5",
"_id": "proxy-addr@2.0.5",
"_id": "proxy-addr@2.0.6",
"_inBundle": false,
"_integrity": "sha512-t/7RxHXPH6cJtP0pRG6smSr9QJidhB+3kXu0KgXnbGYMgzEnUxRQ4/LDdfOwZEMyIh3/xHb8PX3t+lfL9z+YVQ==",
"_integrity": "sha512-dh/frvCBVmSsDYzw6n926jv974gddhkFPfiN8hPOi30Wax25QZyZEGveluCgliBnqmuM+UJmBErbAUFIoDbjOw==",
"_location": "/proxy-addr",
"_phantomChildren": {},
"_requested": {
......@@ -18,10 +18,10 @@
"_requiredBy": [
"/express"
],
"_resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.5.tgz",
"_shasum": "34cbd64a2d81f4b1fd21e76f9f06c8a45299ee34",
"_resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.6.tgz",
"_shasum": "fdc2336505447d3f2f2c638ed272caf614bbb2bf",
"_spec": "proxy-addr@~2.0.5",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_module/dispatcher/repository/worker_env/node_modules/express",
"_where": "/home/nilanjan/Desktop/serverless/hybrid/dispatch_system/dispatch_manager/repository/worker_env/node_modules/express",
"author": {
"name": "Douglas Christopher Wilson",
"email": "doug@somethingdoug.com"
......@@ -32,7 +32,7 @@
"bundleDependencies": false,
"dependencies": {
"forwarded": "~0.1.2",
"ipaddr.js": "1.9.0"
"ipaddr.js": "1.9.1"
},
"deprecated": false,
"description": "Determine address of proxied request",
......@@ -40,15 +40,15 @@
"beautify-benchmark": "0.2.4",
"benchmark": "2.1.4",
"deep-equal": "1.0.1",
"eslint": "5.16.0",
"eslint-config-standard": "12.0.0",
"eslint-plugin-import": "2.17.1",
"eslint-plugin-markdown": "1.0.0",
"eslint-plugin-node": "8.0.1",
"eslint-plugin-promise": "4.1.1",
"eslint-plugin-standard": "4.0.0",
"mocha": "6.1.3",
"nyc": "13.3.0"
"eslint": "6.8.0",
"eslint-config-standard": "14.1.0",
"eslint-plugin-import": "2.20.1",
"eslint-plugin-markdown": "1.0.1",
"eslint-plugin-node": "11.0.0",
"eslint-plugin-promise": "4.2.1",
"eslint-plugin-standard": "4.0.1",
"mocha": "7.0.1",
"nyc": "15.0.0"
},
"engines": {
"node": ">= 0.10"
......@@ -78,5 +78,5 @@
"test-cov": "nyc --reporter=text npm test",
"test-travis": "nyc --reporter=html --reporter=text npm test"
},
"version": "2.0.5"
"version": "2.0.6"
}
......@@ -11,6 +11,8 @@
"dependencies": {
"body-parser": "^1.19.0",
"express": "^4.17.1",
"morgan": "^1.9.1"
"kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"request": "^2.88.2"
}
}
cJSON @ f790e17b
Subproject commit f790e17b6cecef030c4eda811149d238c2085fcf
nlib @ 75bc1a11
Subproject commit 75bc1a11e2a10cf249f566b40c85d6526c16f123
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment