Commit 4d074b06 authored by Nilanjan Daw's avatar Nilanjan Daw

Merge branch 'explicit_function_chaining'

parents 66dc57aa fb27d6a9
{ {
"registry_url" :"localhost:5000/", "registry_url" :"10.129.6.5:5000/",
"master_port": 8080, "master_port": 8080,
"master_address": "localhost", "master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "localhost:5984", "couchdb_host": "10.129.6.5:5984",
"function_db_name": "serverless", "db": {
"metrics_db_name": "metrics", "function_meta": "serverless",
"implicit_chain_db_name": "implicit_chain", "metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": { "network": {
"network_bridge": "hybrid_kafka-serverless", "network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": { "internal": {
"kafka_host": "kafka:9092" "kafka_host": "10.129.6.5:9092"
}, },
"external": { "external": {
"kafka_host": "localhost:29092" "kafka_host": "10.129.6.5:9092"
} }
}, },
"topics": { "topics": {
...@@ -28,7 +32,10 @@ ...@@ -28,7 +32,10 @@
"autoscalar_metrics": { "autoscalar_metrics": {
"open_request_threshold": 100 "open_request_threshold": 100
}, },
"speculative_deployment": false, "metrics": {
"JIT_deployment": false, "alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20 "id_size": 20
} }
{
"registry_url" :"10.129.6.5:5000/",
"master_port": 8080,
"master_address": "10.129.6.5",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "10.129.6.5:9092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": false,
"JIT_deployment": true,
"id_size": 20
}
...@@ -92,9 +92,15 @@ function runContainer(metadata) { ...@@ -92,9 +92,15 @@ function runContainer(metadata) {
if (code != 0) if (code != 0)
reject("error") reject("error")
else { else {
const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`, let process = null;
"--name", resource_id, registry_url + imageName, if (constants.network.use_bridge)
resource_id, imageName, port, "container", constants.network.internal.kafka_host]); process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`,
"--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
else
process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`,
"--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = ""; let result = "";
// timeStart = Date.now() // timeStart = Date.now()
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
...@@ -119,10 +125,15 @@ function runContainer(metadata) { ...@@ -119,10 +125,15 @@ function runContainer(metadata) {
}) })
} else { } else {
logger.info("container starting at port", port); logger.info("container starting at port", port);
let process = null;
const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`, if (constants.network.use_bridge)
"-p", `${port}:${port}`, "--name", resource_id, process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]); "-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
else
process = spawn('docker', ["run", "--rm",
"-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = ""; let result = "";
// timeStart = Date.now() // timeStart = Date.now()
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
......
...@@ -158,7 +158,7 @@ function heartbeat() { ...@@ -158,7 +158,7 @@ function heartbeat() {
topic: "heartbeat", topic: "heartbeat",
messages: JSON.stringify({"address": node_id, "timestamp": Date.now()}) messages: JSON.stringify({"address": node_id, "timestamp": Date.now()})
}] }]
producer.send(payload, function() {}) producer.send(payload, function(cb) {})
} }
......
...@@ -27,7 +27,7 @@ function updateConfig() { ...@@ -27,7 +27,7 @@ function updateConfig() {
} }
function makeTopic(id) { function makeTopic(id) {
console.log("Using Primary IP", id, "as topic"); console.log("Using Primary IP", id, "as topic", "publishing to:", constants.network.external.kafka_host);
let client = new kafka.KafkaClient({ let client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host, kafkaHost: constants.network.external.kafka_host,
......
...@@ -6,17 +6,20 @@ const fs = require('fs') ...@@ -6,17 +6,20 @@ const fs = require('fs')
const { spawn } = require('child_process') const { spawn } = require('child_process')
const fetch = require('node-fetch') const fetch = require('node-fetch')
const constants = require('../constants.json') const constants = require('../constants.json')
const secrets = require('./secrets.json')
const operator = require('./operator') const operator = require('./operator')
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` const sharedMeta = require('./shared_meta')
metadataDB = metadataDB + "/" + constants.function_db_name + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const logger = libSupport.logger const logger = libSupport.logger
const registry_url = constants.registry_url const registry_url = constants.registry_url
let functionToResource = sharedMeta.functionToResource,
db = sharedMeta.db,
conditionProbabilityExplicit = sharedMeta.conditionProbabilityExplicit,
metricsDB = sharedMeta.metricsDB,
metadataDB = sharedMeta.metadataDB,
explicitChainDB = sharedMeta.explicitChainDB
router.post('/deploy', (req, res) => { router.post('/deploy', (req, res) => {
...@@ -190,50 +193,57 @@ async function deployContainer(path, imageName) { ...@@ -190,50 +193,57 @@ async function deployContainer(path, imageName) {
router.post('/execute/:id', (req, res) => { router.post('/execute/:id', (req, res) => {
let map, aliases let map, aliases
// if (req.body.map) let chain_id = req.params.id
// map = req.body.map libSupport.fetchData(explicitChainDB + chain_id)
// else { .then(chainData => {
if (req.files && req.files.map) { console.log(chainData);
map = JSON.parse(req.files.map.data.toString()); if (chainData.error !== "not_found")
let mapPlanner = JSON.parse(req.files.map.data.toString()); conditionProbabilityExplicit[chain_id] = chainData
readMap(`./repository/aliases${req.params.id}.json`, true) if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString());
let mapPlanner = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${chain_id}.json`, true)
.then(data => { .then(data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
console.log(payload); speculative_deployment(chain_id, aliases, mapPlanner, 0);
speculative_deployment(aliases, mapPlanner); orchestrator(chain_id, res, payload, map, aliases, {})
orchestrator(res, payload, map, aliases, {})
}) })
} else { } else {
readMap(`./repository/map${req.params.id}.json`) readMap(`./repository/map${chain_id}.json`)
.then(data => { .then(data => {
map = data map = data
let mapPlanner = JSON.parse(JSON.stringify(map)) let mapPlanner = JSON.parse(JSON.stringify(map))
readMap(`./repository/aliases${req.params.id}.json`, true) readMap(`./repository/aliases${chain_id}.json`, true)
.then(data => { .then(data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
speculative_deployment(aliases, mapPlanner); speculative_deployment(chain_id, aliases, mapPlanner, 0);
orchestrator(res, payload, map, aliases, {}) orchestrator(chain_id, res, payload, map, aliases, {})
}) })
}) })
} }
})
}) })
async function orchestrator(res, payload, map, aliases, result) { async function orchestrator(chain_id, res, payload, map, aliases, result) {
if (Object.keys(map).length == 0) { if (Object.keys(map).length == 0) {
console.log("time to resolve", result); console.log("time to resolve", result);
res.json(result) res.json(result)
let payload = {
method: 'put',
body: JSON.stringify(conditionProbabilityExplicit[chain_id]),
headers: { 'Content-Type': 'application/json' }
}
libSupport.fetchData(explicitChainDB + chain_id, payload)
// return resolve(result) // return resolve(result)
} }
else { else {
for (const [functionName, metadata] of Object.entries(map)) { for (const [functionName, metadata] of Object.entries(map)) {
// console.log(functionName, metadata, aliases[functionName]);
// console.log(metadata);
if (metadata.type === "function" && metadata.wait_for.length == 0) { if (metadata.type === "function" && metadata.wait_for.length == 0) {
let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}` let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}`
...@@ -244,12 +254,12 @@ async function orchestrator(res, payload, map, aliases, result) { ...@@ -244,12 +254,12 @@ async function orchestrator(res, payload, map, aliases, result) {
runtime: metadata.runtime, runtime: metadata.runtime,
payload payload
}), }),
headers: { 'Content-Type': 'application/json' } headers: { 'Content-Type': 'application/json', 'x-chain-type': 'explicit' }
} }
delete map[functionName] delete map[functionName]
aliases[functionName].status = "running" aliases[functionName].status = "running"
fetch(url, data).then(res => res.json()) libSupport.fetchData(url, data)
.then(json => { .then(json => {
// console.log(json); // console.log(json);
result[functionName] = json result[functionName] = json
...@@ -267,7 +277,21 @@ async function orchestrator(res, payload, map, aliases, result) { ...@@ -267,7 +277,21 @@ async function orchestrator(res, payload, map, aliases, result) {
if (metadata.type === "conditional" && metadata.wait_for.length == 0) { if (metadata.type === "conditional" && metadata.wait_for.length == 0) {
let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result) let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result)
console.log(conditionResult, "aliases", aliases); if (conditionProbabilityExplicit[chain_id] === undefined)
conditionProbabilityExplicit[chain_id] = {}
if (conditionProbabilityExplicit[chain_id][_key] === undefined)
conditionProbabilityExplicit[chain_id][_key] = {
request_count: 0,
probability: 0
}
let oldProbability = conditionProbabilityExplicit[chain_id][_key].probability
let updateProbability = (conditionResult === 'success') ? 1.0 : 0.0
conditionProbabilityExplicit[chain_id][_key].probability =
oldProbability * conditionProbabilityExplicit[chain_id][_key].request_count + updateProbability
conditionProbabilityExplicit[chain_id][_key].request_count++
conditionProbabilityExplicit[chain_id][_key].probability /=
conditionProbabilityExplicit[chain_id][_key].request_count
console.log(conditionResult, "probability table", conditionProbabilityExplicit);
let branchToTake = metadata[conditionResult] let branchToTake = metadata[conditionResult]
branchMap = map[branchToTake] branchMap = map[branchToTake]
delete map[_key] delete map[_key]
...@@ -275,7 +299,7 @@ async function orchestrator(res, payload, map, aliases, result) { ...@@ -275,7 +299,7 @@ async function orchestrator(res, payload, map, aliases, result) {
} }
} }
orchestrator(res, payload, (branchMap == null)? map: branchMap, aliases, result) orchestrator(chain_id, res, payload, (branchMap == null)? map: branchMap, aliases, result)
}) })
} }
} }
...@@ -302,68 +326,131 @@ function checkCondition(op1, op2, op, result) { ...@@ -302,68 +326,131 @@ function checkCondition(op1, op2, op, result) {
return (operator[op](data, op2))? "success": "fail" return (operator[op](data, op2))? "success": "fail"
} }
async function speculative_deployment(aliases, map) { async function speculative_deployment(chain_id, aliases, map, offset, done, toBeDone, ignoreSet) {
if (constants.speculative_deployment) { console.log("offset: ", offset, "ignoreSet", ignoreSet);
console.log(aliases);
if (constants.speculative_deployment) {
let getData = [] let getData = []
for (const [mod, metadata] of Object.entries(map)) { for (const [mod, metadata] of Object.entries(map)) {
if (metadata.type !== 'function') {
if (metadata.type === 'conditional' && !constants.JIT_deployment) {
let probability
try {
probability = conditionProbabilityExplicit[chain_id][mod].probability
} catch (error) {
console.log("branch probability not present, random branch taken");
probability = Math.random()
}
let branch = (probability >= 0.5) ? metadata['success'] : metadata['fail']
let branchMap = JSON.parse(JSON.stringify(map[branch]))
delete branchMap['type']
console.log("success probability", probability, "taking branch: ", branch);
speculative_deployment(chain_id, aliases, branchMap)
}
continue
}
if (constants.JIT_deployment) { if (constants.JIT_deployment) {
console.log(mod, metadata, aliases[mod].alias); // console.log(mod, metadata, aliases[mod].alias);
let url = metricsDB + aliases[mod].alias let url = metricsDB + aliases[mod].alias
console.log(url);
let data = libSupport.fetchData(url) let data = libSupport.fetchData(url)
console.log(data);
getData.push(data) getData.push(data)
} else { } else {
let payload = [{ notify(metadata.runtime, aliases[mod].alias)
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias })
}]
notify(payload)
} }
} }
if (constants.JIT_deployment) { if (constants.JIT_deployment) {
Promise.all(getData).then((values) => { Promise.all(getData).then((values) => {
let dataMap = new Map() let dataMap = new Map()
for (const data of values) { for (const data of values) {
if (values.error === "not_found")
dataMap[data._id] = 0
dataMap[data._id] = data dataMap[data._id] = data
} }
let done = new Map() if (done === undefined) {
let toBeDone = new Set() console.log("new map");
done = new Map()
}
if (toBeDone === undefined) {
toBeDone = new Set()
}
// var plannerMap = new Map(map) // var plannerMap = new Map(map)
do { do {
for (const [mod, metadata] of Object.entries(map)) { for (const [mod, metadata] of Object.entries(map)) {
if (metadata.type !== 'function' && metadata.type !== 'conditional') {
continue
}
if (metadata.wait_for.length == 0 && done[mod] === undefined) { if (metadata.wait_for.length == 0 && done[mod] === undefined) {
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart // expecting the first ones to run /**
// to be hit by coldstarts * expecting the first ones to run to be hit by coldstarts
*/
try {
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart
} catch (e) {
done[mod] = 0
}
// delete plannerMap[mod]; // delete plannerMap[mod];
} else if (done[mod] === undefined) { } else if (done[mod] === undefined) {
let flag = true let flag = true, redundantFlag = false
let maxWait = 0 let maxWait = 0
for (const dependency of metadata.wait_for) { for (const dependency of metadata.wait_for) {
console.log(dependency);
if (done[dependency] === undefined) { if (done[dependency] === undefined) {
flag = false flag = false
break break
} else if (maxWait < done[dependency]) } else if (maxWait < done[dependency] && (ignoreSet === undefined || !ignoreSet.has(dependency)))
maxWait = done[dependency] maxWait = done[dependency]
else if (ignoreSet !== undefined && ignoreSet.has(dependency)) {
redundantFlag = true
console.log("ignoring redundant dependency", dependency);
}
} }
// if (redundantFlag)
// maxWait += offset;
maxWait += offset
if (flag) { if (flag) {
console.log("notifying", mod); if (metadata.type === 'conditional') {
let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ? console.log("setting notification for conditional", mod);
maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0 let probability
console.log(mod, "max wait", maxWait, "notify time:", notifyTime); try {
let payload = [{ probability = conditionProbabilityExplicit[chain_id][mod].probability
topic: constants.topics.hscale, } catch (error) {
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias }) console.log("branch probability not present, random branch taken");
}] probability = Math.random()
setTimeout(notify, notifyTime, payload) }
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart let branch = (probability >= 0.5)? metadata['success']: metadata['fail']
let branchMap = JSON.parse(JSON.stringify(map[branch]))
delete branchMap['type']
console.log("success probability", probability, "taking branch: ", branch);
if (ignoreSet === undefined)
ignoreSet = new Set(metadata.wait_for)
else
ignoreSet = new Set(ignoreSet, new Set(metadata.wait_for))
speculative_deployment(chain_id, aliases, branchMap, maxWait, done, toBeDone, ignoreSet)
done[mod] = maxWait - offset
} else {
console.log("notification set", mod);
let starttime
try {
starttime = dataMap[aliases[mod].alias][metadata.runtime].starttime
} catch (e) {
starttime = 0
}
let notifyTime = ((maxWait - starttime) > 0) ?
maxWait - starttime: 0
// notifyTime += offset
console.log(mod, "max wait", maxWait, "notify time:", notifyTime, "offset added", offset);
setTimeout(notify, notifyTime, metadata.runtime, aliases[mod].alias)
try {
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart - offset
} catch (e) {
done[mod] = maxWait - offset
}
}
if (toBeDone.has(mod)) if (toBeDone.has(mod))
delete toBeDone[mod] delete toBeDone[mod]
// delete plannerMap[mod] // delete plannerMap[mod]
...@@ -371,7 +458,7 @@ async function speculative_deployment(aliases, map) { ...@@ -371,7 +458,7 @@ async function speculative_deployment(aliases, map) {
toBeDone.add(mod) toBeDone.add(mod)
} }
} }
console.log(done, toBeDone); console.log("done", done);
} }
} while (toBeDone.size != 0) } while (toBeDone.size != 0)
}) })
...@@ -405,8 +492,18 @@ function readMap(filename, alias = false) { ...@@ -405,8 +492,18 @@ function readMap(filename, alias = false) {
}) })
} }
function notify(payload) { function notify(runtime, functionHash) {
libSupport.producer.send(payload, function () { }) // console.log("check map: ", functionToResource.has(functionHash + runtime));
if (!functionToResource.has(functionHash + runtime) && !db.has(functionHash + runtime)) {
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ runtime, functionHash })
}]
libSupport.producer.send(payload, function () { })
} else {
console.log("resource already present: skipping speculation");
}
} }
function createDirectory(path) { function createDirectory(path) {
...@@ -423,4 +520,7 @@ function createDirectory(path) { ...@@ -423,4 +520,7 @@ function createDirectory(path) {
}) })
} }
module.exports = router;
module.exports = {
router
}
...@@ -10,18 +10,11 @@ const { spawn } = require('child_process'); ...@@ -10,18 +10,11 @@ const { spawn } = require('child_process');
const morgan = require('morgan'); const morgan = require('morgan');
const heap = require('heap'); const heap = require('heap');
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const swStats = require('swagger-stats'); // const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json'); // const apiSpec = require('./swagger.json');
const util = require('util') const util = require('util')
const sharedMeta = require('./shared_meta')
/**
* URL to the couchdb database server used to store function metadata
*/
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.function_db_name + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
...@@ -30,13 +23,15 @@ let date = new Date(); ...@@ -30,13 +23,15 @@ let date = new Date();
let log_channel = constants.topics.log_channel let log_channel = constants.topics.log_channel
let usedPort = new Map(), // TODO: remove after integration with RM let usedPort = new Map(), // TODO: remove after integration with RM
db = new Map(), // queue holding request to be dispatched db = sharedMeta.db, // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc resourceMap = sharedMeta.resourceMap, // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map(), // a function to resource map. Each map contains a minheap of functionToResource = sharedMeta.functionToResource, // a function to resource map. Each map contains a minheap of
// resources associated with the function // resources associated with the function
workerNodes = new Map(), // list of worker nodes currently known to the DM workerNodes = sharedMeta.workerNodes, // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions functionBranchTree = sharedMeta.functionBranchTree, // a tree to store function branch predictions
metricsDB = sharedMeta.metricsDB,
metadataDB = sharedMeta.metadataDB
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -64,11 +59,10 @@ app.use(morgan('combined', { ...@@ -64,11 +59,10 @@ app.use(morgan('combined', {
app.use(express.json()); app.use(express.json());
app.use(express.urlencoded({ extended: true })); app.use(express.urlencoded({ extended: true }));
const file_path = __dirname + "/repository/" const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path)); // file server hosting deployed functions app.use('/repository', express.static(file_path)); // file server hosting deployed functions
app.use(fileUpload()) app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); // statistics middleware // app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); // statistics middleware
app.use('/serverless/chain', chainHandler); // chain router (explicit_chain_handler.js) for handling explicit chains app.use('/serverless/chain', chainHandler.router); // chain router (explicit_chain_handler.js) for handling explicit chains
let requestQueue = [] let requestQueue = []
const WINDOW_SIZE = 10 const WINDOW_SIZE = 10
...@@ -225,7 +219,7 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -225,7 +219,7 @@ app.post('/serverless/execute/:id', (req, res) => {
res.timestamp = Date.now() res.timestamp = Date.now()
if (functionToResource.has(id)) { if (functionToResource.has(id)) {
res.start = 'warmstart' res.start = 'warmstart'
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) libSupport.reverseProxy(req, res)
} else { } else {
res.start = 'coldstart' res.start = 'coldstart'
/** /**
...@@ -334,17 +328,14 @@ function postDeploy(message) { ...@@ -334,17 +328,14 @@ function postDeploy(message) {
"reason": "deployment", "reason": "deployment",
"status": true, "status": true,
starttime: (Date.now() - resource.deploy_request_time) starttime: (Date.now() - resource.deploy_request_time)
}, message.resource_id, resourceMap) }, message.resource_id)
if (db.has(id)) { if (db.has(id)) {
let sendQueue = db.get(id) let sendQueue = db.get(id)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource)); logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) { while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift() let { req, res } = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) libSupport.reverseProxy(req, res)
.then(() => {
})
} }
db.delete(id) db.delete(id)
} }
...@@ -410,7 +401,7 @@ consumer.on('message', function (message) { ...@@ -410,7 +401,7 @@ consumer.on('message', function (message) {
"reason": "terminate", "reason": "terminate",
"total_request": message.total_request, "total_request": message.total_request,
"status": true "status": true
}, message.resource_id, resourceMap) }, message.resource_id)
.then(() => { .then(() => {
resourceMap.delete(message.resource_id) resourceMap.delete(message.resource_id)
if (resourceArray.length == 0) if (resourceArray.length == 0)
...@@ -428,7 +419,11 @@ consumer.on('message', function (message) { ...@@ -428,7 +419,11 @@ consumer.on('message', function (message) {
functionHash = message.functionHash functionHash = message.functionHash
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`); logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
console.log("Resource Status: ", functionToResource); console.log("Resource Status: ", functionToResource);
if (!functionToResource.has(functionHash + runtime) && !db.has(functionHash + runtime)) {
console.log("adding db");
db.set(functionHash + runtime, [])
}
/** /**
* Request RM for resource * Request RM for resource
*/ */
...@@ -451,6 +446,7 @@ consumer.on('message', function (message) { ...@@ -451,6 +446,7 @@ consumer.on('message', function (message) {
}), }),
partition: 0 partition: 0
}] }]
producer.send(payloadToRM, () => { producer.send(payloadToRM, () => {
// db.set(functionHash + runtime, { req, res }) // db.set(functionHash + runtime, { req, res })
console.log("sent rm"); console.log("sent rm");
...@@ -521,7 +517,7 @@ function autoscalar() { ...@@ -521,7 +517,7 @@ function autoscalar() {
*/ */
async function speculative_deployment(req, runtime) { async function speculative_deployment(req, runtime) {
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) { if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id); // console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) { if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id) let branchInfo = functionBranchTree.get(req.params.id)
...@@ -558,7 +554,7 @@ async function speculative_deployment(req, runtime) { ...@@ -558,7 +554,7 @@ async function speculative_deployment(req, runtime) {
} }
} }
setInterval(libSupport.metrics.broadcastMetrics, 5000) setInterval(libSupport.metrics.broadcastMetrics, 5000)
setInterval(libSupport.viterbi, 1000, functionBranchTree) // setInterval(libSupport.viterbi, 1000)
setInterval(autoscalar, 1000); setInterval(autoscalar, 1000);
setInterval(dispatch, 1000); setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`)) app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
...@@ -6,10 +6,18 @@ const winston = require('winston') ...@@ -6,10 +6,18 @@ const winston = require('winston')
const constants = require('.././constants.json') const constants = require('.././constants.json')
const secrets = require('./secrets.json') const secrets = require('./secrets.json')
const metrics = require('./metrics') const metrics = require('./metrics')
const sharedMeta = require('./shared_meta')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap') const heap = require('heap')
let db = sharedMeta.db, // queue holding request to be dispatched
resourceMap = sharedMeta.resourceMap, // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = sharedMeta.functionToResource, // a function to resource map. Each map contains a minheap of
// resources associated with the function
workerNodes = sharedMeta.workerNodes, // list of worker nodes currently known to the DM
functionBranchTree = sharedMeta.functionBranchTree // Holds the function path's and related probability distribution
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -18,9 +26,7 @@ let kafka = require('kafka-node'), ...@@ -18,9 +26,7 @@ let kafka = require('kafka-node'),
}), }),
producer = new Producer(client) producer = new Producer(client)
let implicitChainDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let implicitChainDB = sharedMeta.implicitChainDB
implicitChainDB = implicitChainDB + "/" + constants.implicit_chain_db_names + "/"
/** /**
* Generates unique IDs of arbitrary length * Generates unique IDs of arbitrary length
* @param {Length of the ID} length * @param {Length of the ID} length
...@@ -62,59 +68,73 @@ function generateExecutor(functionPath, functionHash) { ...@@ -62,59 +68,73 @@ function generateExecutor(functionPath, functionHash) {
* Reverse proxy to take user requests and forward them to appropriate workers using a loadbalacer * Reverse proxy to take user requests and forward them to appropriate workers using a loadbalacer
* @param {JSON} req the user request to be forwarded to the worker * @param {JSON} req the user request to be forwarded to the worker
* @param {JSON} res Object to use to return the response to the user * @param {JSON} res Object to use to return the response to the user
* @param {Map} functionToResource Function to resource Map
* @param {Map} resourceMap Map from resource ID to resource metadata
* @param {Map} functionBranchTree Holds the function path's and related probability distribution
*/ */
function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) { function reverseProxy(req, res) {
branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree) if (req.headers['x-chain-type'] !== 'explicit')
return new Promise((resolve, reject) => { branchChainPredictor(req)
let runtime = req.body.runtime let runtime = req.body.runtime
let id = req.params.id + runtime let id = req.params.id + runtime
/** /**
* Bypass deployment pipeline if resource available * Bypass deployment pipeline if resource available
*/ */
let functionHeap = functionToResource.get(id) let functionHeap = functionToResource.get(id)
// loadbalancing by choosing worker with lowest load // loadbalancing by choosing worker with lowest load
let forwardTo = functionHeap[0] let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id) let resource = resourceMap.get(forwardTo.resource_id)
// logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` + // logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
// "\n forwarding via reverse proxy to: " + JSON.stringify(resource)); // "\n forwarding via reverse proxy to: " + JSON.stringify(resource));
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute` let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// logger.info(functionHeap);
// logger.info("Request received at reverseproxy. Forwarding to: " + url); var options = {
forwardTo.open_request_count += 1 method: 'POST',
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map uri: url,
// logger.info(functionHeap); body: req.body,
json: true // Automatically stringifies the body to JSON
var options = { };
method: 'POST',
uri: url,
body: req.body, rp(options)
json: true // Automatically stringifies the body to JSON .then(function (parsedBody) {
}; let serviceTime = Date.now() - res.timestamp
res.json(parsedBody)
rp(options) forwardTo.open_request_count -= 1
.then(function (parsedBody) { heap.heapify(functionHeap, compare)
let serviceTime = Date.now() - res.timestamp let functionHash = req.params.id
let functionData = functionBranchTree.get(functionHash)
res.json(parsedBody) if (functionData && functionData.req_count % 5 == 0) {
forwardTo.open_request_count -= 1 if (functionData.parent)
heap.heapify(functionHeap, compare) viterbi(functionHash, functionData)
else {
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime}) functionData.branches = Array.from(functionData.branches.entries())
resolve() let payload = {
}) method: 'put',
.catch(function (err) { body: JSON.stringify(functionBranchTree.get(functionHash)),
forwardTo.open_request_count -= 1 headers: { 'Content-Type': 'application/json' }
heap.heapify(functionHeap, compare) }
logger.error("error" + err);
res.json(err.message).status(err.statusCode) fetchData(implicitChainDB + functionHash, payload)
resolve() .then((updateStatus) => {
}); console.log(updateStatus);
if (updateStatus.error === undefined)
}) functionData._rev = updateStatus.rev
})
functionData.branches = new Map(functionData.branches)
}
}
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
})
.catch(function (err) {
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
logger.error("error" + err);
res.json(err.message).status(err.statusCode)
});
} }
function getPort(usedPort) { function getPort(usedPort) {
...@@ -161,10 +181,20 @@ function compare(a, b) { ...@@ -161,10 +181,20 @@ function compare(a, b) {
return a.open_request_count - b.open_request_count return a.open_request_count - b.open_request_count
} }
function branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree) { async function branchChainPredictor(req) {
// console.log(req.headers['x-resource-id']); // console.log(req.headers['x-resource-id']);
if (!functionBranchTree.has(req.params.id)) {
let data = await fetchData(implicitChainDB + req.params.id)
if (data.error === "not_found")
console.log("no data", req.params.id);
else {
data.branches = new Map(data.branches)
functionBranchTree.set(req.params.id, data)
}
}
if (req.headers['x-resource-id'] === undefined) { if (req.headers['x-resource-id'] === undefined) {
let functionHash = req.params.id let functionHash = req.params.id
if (functionBranchTree.has(functionHash)) { if (functionBranchTree.has(functionHash)) {
let branchInfo = functionBranchTree.get(functionHash) let branchInfo = functionBranchTree.get(functionHash)
...@@ -215,81 +245,87 @@ function branchChainPredictor(req, resourceMap, functionToResource, functionBran ...@@ -215,81 +245,87 @@ function branchChainPredictor(req, resourceMap, functionToResource, functionBran
// console.log("branch tree", functionBranchTree); // console.log("branch tree", functionBranchTree);
} }
function viterbi(functionBranchTree) { async function viterbi(node, metadata) {
console.log("function branch tree", functionBranchTree.get(node));
functionBranchTree.forEach((metadata, node) => { let path = []
if (metadata.parent && metadata.req_count % 5 == 0) { let parents = [[node, {
let path = [] prob: 1,
let parents = [[node, { metadata
prob: 1, }]]
metadata path.push({ node, probability: 1 })
}]] let siblings = new Map()
path.push({node, probability: 1}) while (parents.length > 0) {
let siblings = new Map() // console.log("parent_group", parents);
while(parents.length > 0) {
// console.log("parent_group", parents); for (const parent of parents) {
// console.log("=========begin==========\n",parent, "\n=============end============");
for (const parent of parents) { // console.log(parent[1].metadata);
// console.log("=========begin==========\n",parent, "\n=============end============");
// console.log(parent[1].metadata); if (parent[1].metadata === undefined)
continue
if (parent[1].metadata === undefined) let forwardBranches = parent[1].metadata.branches
continue // console.log(forwardBranches);
let forwardBranches = parent[1].metadata.branches
// console.log(forwardBranches); let parentProbability = parent[1].prob
let parentProbability = parent[1].prob forwardBranches.forEach((branchProb, subNode) => {
let probability = 0
forwardBranches.forEach((branchProb, subNode) => { if (siblings.has(subNode))
let probability = 0 probability = siblings.get(subNode)
if (siblings.has(subNode)) probability += branchProb * parentProbability
probability = siblings.get(subNode) // console.log("prob", probability);
probability += branchProb * parentProbability
// console.log("prob", probability); siblings.set(subNode, probability)
})
siblings.set(subNode, probability) // console.log("siblings", siblings);
})
// console.log("siblings", siblings); }
parents = []
} let maxSibling, maxProb = 0
parents = [] siblings.forEach((prob, sibling) => {
let maxSibling, maxProb = 0 if (prob > maxProb) {
siblings.forEach((prob, sibling) => { maxSibling = sibling
if (prob > maxProb) { maxProb = prob
maxSibling = sibling
maxProb = prob
}
})
parentIDs = Array.from( siblings.keys() );
for (const id of parentIDs) {
let metadata = functionBranchTree.get(id)
parents.push([
id, {
prob: siblings.get(id),
metadata
}
])
}
if (maxSibling !== undefined)
path.push({node: maxSibling, probability: maxProb})
siblings = new Map()
} }
// if (path.length > 0) })
// console.log("path", path); parentIDs = Array.from(siblings.keys());
metadata.mle_path = path for (const id of parentIDs) {
if (path.length > 1) { let metadata = functionBranchTree.get(id)
let payload = { parents.push([
method: 'put', id, {
body: JSON.stringify(path), prob: siblings.get(id),
headers: { 'Content-Type': 'application/json' } metadata
} }
fetch(implicitChainDB + functionHash, payload) ])
} }
if (maxSibling !== undefined)
path.push({ node: maxSibling, probability: maxProb })
siblings = new Map()
}
if (path.length > 1)
console.log("path", path);
metadata.mle_path = path
if (path.length > 1) {
metadata.branches = Array.from(metadata.branches.entries())
let payload = {
method: 'put',
body: JSON.stringify(functionBranchTree.get(node)),
headers: { 'Content-Type': 'application/json' }
} }
});
fetchData(implicitChainDB + node, payload)
.then((updateStatus) => {
console.log(updateStatus);
if (updateStatus.error === undefined)
metadata._rev = updateStatus.rev
})
metadata.branches = new Map(metadata.branches)
}
} }
function logBroadcast(message, resource_id, resourceMap) { function logBroadcast(message, resource_id) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
try { try {
...@@ -329,6 +365,6 @@ async function fetchData(url, data = null) { ...@@ -329,6 +365,6 @@ async function fetchData(url, data = null) {
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, makeid, generateExecutor, reverseProxy,
getPort, logger, compare, getPort, logger, compare,
viterbi, logBroadcast, fetchData, metrics, logBroadcast, fetchData, metrics,
producer producer
} }
\ No newline at end of file
...@@ -5,11 +5,12 @@ const secrets = require('./secrets.json') ...@@ -5,11 +5,12 @@ const secrets = require('./secrets.json')
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const util = require('util') const util = require('util')
const prom = require('prom-client'); const prom = require('prom-client');
const sharedMeta = require('./shared_meta');
const Registry = prom.Registry; const Registry = prom.Registry;
const register = new Registry(); const register = new Registry();
const alpha = 0.99 const alpha = constants.metrics.alpha
let log_channel = constants.topics.log_channel, let log_channel = constants.topics.log_channel,
metrics = { } metrics = { }
...@@ -29,8 +30,7 @@ register.registerMetric(coldstartMetric); ...@@ -29,8 +30,7 @@ register.registerMetric(coldstartMetric);
register.registerMetric(starttimeMetric); register.registerMetric(starttimeMetric);
register.registerMetric(requestMetric); register.registerMetric(requestMetric);
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metricsDB = sharedMeta.metricsDB
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -129,6 +129,7 @@ async function broadcastMetrics() { ...@@ -129,6 +129,7 @@ async function broadcastMetrics() {
warmstart: metric.longterm.warmstart, warmstart: metric.longterm.warmstart,
starttime: metric.longterm.starttime starttime: metric.longterm.starttime
} }
let payload = { let payload = {
method: 'put', method: 'put',
body: JSON.stringify(dbData), body: JSON.stringify(dbData),
...@@ -136,6 +137,7 @@ async function broadcastMetrics() { ...@@ -136,6 +137,7 @@ async function broadcastMetrics() {
} }
await fetch(metricsDB + functionHash, payload) await fetch(metricsDB + functionHash, payload)
metric.timestamp = Date.now() metric.timestamp = Date.now()
} }
} }
......
const secrets = require('./secrets.json')
const constants = require('.././constants.json')
let db = new Map(), // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map(), // a function to resource map. Each map contains a minheap of
// resources associated with the function
workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map(), // a tree to store function branch predictions
conditionProbabilityExplicit = new Map() // tree holding conditional probabilities for explicit chains
/**
* URL to the couchdb database server used to store data
*/
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.db.function_meta + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.db.metrics + "/"
let implicitChainDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
implicitChainDB = implicitChainDB + "/" + constants.db.implicit_chain_meta + "/"
let explicitChainDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
explicitChainDB = explicitChainDB + "/" + constants.db.explicit_chain_meta + "/"
module.exports = {
db, functionBranchTree, functionToResource, workerNodes, resourceMap,
conditionProbabilityExplicit,
metadataDB, metricsDB, implicitChainDB, explicitChainDB
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment