Commit 30066437 authored by Nilanjan Daw's avatar Nilanjan Daw

Support added for Conditional Statements

Conditional chain support added for Explicit chains. This introduces probabilistic chain support to Xanadu.
parent 911b60de
......@@ -7,6 +7,7 @@ const { spawn } = require('child_process')
const fetch = require('node-fetch')
const constants = require('../constants.json')
const secrets = require('./secrets.json')
const operator = require('./operator')
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.function_db_name + "/"
const logger = libSupport.logger
......@@ -190,87 +191,137 @@ router.post('/execute/:id', (req, res) => {
// else {
if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${req.params.id}.json`)
readMap(`./repository/aliases${req.params.id}.json`, true)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
console.log(payload);
orchestrator(res, payload, map, aliases, [])
// getTimeLines(map);
orchestrator(res, payload, map, aliases, {})
})
} else {
readMap(`./repository/map${req.params.id}.json`)
.then(data => {
map = data
readMap(`./repository/aliases${req.params.id}.json`)
readMap(`./repository/aliases${req.params.id}.json`, true)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
console.log(payload);
orchestrator(res, payload, map, aliases, [])
// getTimeLines(map);
orchestrator(res, payload, map, aliases, {})
})
})
}
})
function orchestrator(res, payload, map, aliases, result) {
return new Promise((resolve, reject) => {
console.log("result before run", result);
async function orchestrator(res, payload, map, aliases, result) {
if (Object.keys(map).length == 0) {
console.log("time to resolve");
res.json(result)
// return resolve(result)
}
if (Object.keys(map).length == 0) {
console.log("time to resolve", result);
res.json(result)
// return resolve(result)
}
else {
for (const [functionName, metadata] of Object.entries(map)) {
// console.log(functionName, metadata, aliases[functionName]);
// console.log(metadata);
else {
for (const [functionName, metadata] of Object.entries(map)) {
// console.log(functionName, metadata, aliases[functionName]);
if (metadata.wait_for.length == 0) {
let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName]}`
console.log(url);
let data = {
method: 'post',
body: JSON.stringify({
runtime: metadata.runtime,
payload
}),
headers: { 'Content-Type': 'application/json' }
}
delete map[functionName]
fetch(url, data).then(res => res.json())
.then(json => {
console.log(json);
result.push(json)
for (const [_key, metadata] of Object.entries(map)) {
let index = metadata.wait_for.indexOf(functionName)
if (metadata.type === "function" && metadata.wait_for.length == 0) {
let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}`
console.log(url);
let data = {
method: 'post',
body: JSON.stringify({
runtime: metadata.runtime,
payload
}),
headers: { 'Content-Type': 'application/json' }
}
delete map[functionName]
aliases[functionName].status = "running"
fetch(url, data).then(res => res.json())
.then(json => {
// console.log(json);
result[functionName] = json
aliases[functionName].status = "done"
let branchMap = null
for (const [_key, metadata] of Object.entries(map)) {
if (metadata.type === "function" || metadata.type === "conditional") {
let index = metadata.wait_for.indexOf(functionName)
if (index >= 0)
metadata.wait_for.splice(index, 1);
}
console.log(map, "after run");
orchestrator(res, payload, map, aliases, result)
})
}
if (metadata.type === "conditional" && metadata.wait_for.length == 0) {
let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result)
console.log(conditionResult, "aliases", aliases);
let branchToTake = metadata[conditionResult]
branchMap = map[branchToTake]
delete map[_key]
makeBranchRunnable(branchMap, aliases)
}
}
orchestrator(res, payload, (branchMap == null)? map: branchMap, aliases, result)
})
}
// return resolve(result)
}
// await fetch(constants.master_address)
})
}
}
function makeBranchRunnable(branchMap, aliases) {
delete branchMap['type']
for (const [_key, metadata] of Object.entries(branchMap)) {
let wait_for = []
for (const dependent of metadata.wait_for) {
if (aliases[dependent].status !== "done")
wait_for.push(dependent)
metadata.wait_for = wait_for
}
}
}
function checkCondition(op1, op2, op, result) {
op1 = op1.split(".")
let data = result[op1[0]][op1[1]]
return (operator[op](data, op2))? "success": "fail"
}
function getTimeLines(map) {
console.log(map);
for (const [functionName, metadata] of Object.entries(map)) {
}
}
function readMap(filename) {
function readMap(filename, alias = false) {
return new Promise((resolve, reject) => {
fs.readFile(filename, (err, data) => {
fs.readFile(filename, (err, blob) => {
if (err)
reject(err)
else {
const object = JSON.parse(data)
resolve(object)
const data = JSON.parse(blob)
if (alias) {
for (const [key, functionHash] of Object.entries(data)) {
data[key] = {
alias: functionHash,
status: "waiting"
}
// libSupport.fetchData(metricsDB + functionHash, null)
// .then(metrics => {
// data[key]
// })
}
}
resolve(data)
}
})
})
......
......@@ -525,7 +525,7 @@ consumer.on('message', function (message) {
}),
partition: 0
}]
logger.info(resourceMap);
// logger.info(resourceMap);
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
......
......@@ -53,6 +53,14 @@ function generateExecutor(functionPath, functionHash) {
return hash
}
/**
* Reverse proxy to take user requests and forward them to appropriate workers using a loadbalacer
* @param {JSON} req the user request to be forwarded to the worker
* @param {JSON} res Object to use to return the response to the user
* @param {Map} functionToResource Function to resource Map
* @param {Map} resourceMap Map from resource ID to resource metadata
* @param {Map} functionBranchTree Holds the function path's and related probability distribution
*/
function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) {
branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree)
return new Promise((resolve, reject) => {
......@@ -62,6 +70,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
* Bypass deployment pipeline if resource available
*/
let functionHeap = functionToResource.get(id)
// loadbalancing by choosing worker with lowest load
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
// logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
......@@ -70,7 +79,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare)
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// logger.info(functionHeap);
var options = {
......@@ -79,8 +88,6 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
body: req.body,
json: true // Automatically stringifies the body to JSON
};
// console.log(options);
rp(options)
......@@ -298,10 +305,15 @@ function logBroadcast(message, resource_id, resourceMap) {
}
async function fetchData(url, data) {
let res = await fetch(url, data)
return await res.json()
}
setInterval(metrics.broadcastMetrics, 5000)
module.exports = {
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi, logBroadcast, metrics
viterbi, logBroadcast, fetchData, metrics
}
\ No newline at end of file
......@@ -61,41 +61,38 @@ function collectMetrics(metric) {
async function broadcastMetrics() {
if (Object.keys(metrics).length !== 0) {
for (let [functionHash, metricData] of Object.entries(metrics)) {
let {metric, revision} = await fetchData(functionHash, metricData)
/**
* Shortterm moving average
*/
metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0) ?
metric.shortterm.coldstart_total_request : 1
metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0) ?
metric.shortterm.warm_total_request : 1
/**
* Longterm exponential moving average
*/
if (metric.shortterm.coldstart != 0)
metric.longterm.coldstart = (metric.longterm.coldstart != 0) ? metric.longterm.coldstart * alpha
+ metric.shortterm.coldstart * (1 - alpha) : metric.shortterm.coldstart
if (metric.shortterm.warmstart != 0)
metric.longterm.warmstart = (metric.longterm.warmstart != 0) ? metric.longterm.warmstart * alpha
+ metric.shortterm.warmstart * (1 - alpha) : metric.shortterm.warmstart
let payload = {
method: 'put',
body: JSON.stringify({
coldstart: metric.longterm.coldstart,
warmstart: metric.longterm.warmstart,
_rev: revision
}),
headers: { 'Content-Type': 'application/json' }
}
if (metricData.shortterm.coldstart != 0 || metricData.shortterm.longterm != 0) {
let { metric, revision } = await fetchData(functionHash, metricData)
console.log(payload);
/**
* Shortterm moving average
*/
metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0) ?
metric.shortterm.coldstart_total_request : 1
metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0) ?
metric.shortterm.warm_total_request : 1
/**
* Longterm exponential moving average
*/
if (metric.shortterm.coldstart != 0)
metric.longterm.coldstart = (metric.longterm.coldstart != 0) ? metric.longterm.coldstart * alpha
+ metric.shortterm.coldstart * (1 - alpha) : metric.shortterm.coldstart
if (metric.shortterm.warmstart != 0)
metric.longterm.warmstart = (metric.longterm.warmstart != 0) ? metric.longterm.warmstart * alpha
+ metric.shortterm.warmstart * (1 - alpha) : metric.shortterm.warmstart
await fetch(metricsDB + functionHash, payload)
metric.timestamp = Date.now()
let payload = {
method: 'put',
body: JSON.stringify({
coldstart: metric.longterm.coldstart,
warmstart: metric.longterm.warmstart,
_rev: revision
}),
headers: { 'Content-Type': 'application/json' }
}
await fetch(metricsDB + functionHash, payload)
metric.timestamp = Date.now()
}
}
let log = [{
......@@ -128,7 +125,6 @@ async function fetchData(functionHash, metric) {
let revision
let res = await fetch(metricsDB + functionHash)
let json = await res.json()
console.log(json);
if (json.error === "not_found") {
metric.longterm = {
......@@ -143,7 +139,6 @@ async function fetchData(functionHash, metric) {
}
revision = json._rev
}
console.log(metric);
return {
metric,
......
const op = {
'lt': function (x, y) { return x < y },
'gt': function (x, y) { return x > y },
'lte': function (x, y) { return x <= y },
'gte': function (x, y) { return x >= y },
'eq': function (x, y) { return x === y },
'neq': function (x, y) { return x !== y },
};
module.exports = op
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment