Commit 178f8a33 authored by Nilanjan Daw's avatar Nilanjan Daw

added checks to prevent repeat allocation in Speculative JIT

parent db8d9fb6
...@@ -28,7 +28,10 @@ ...@@ -28,7 +28,10 @@
"autoscalar_metrics": { "autoscalar_metrics": {
"open_request_threshold": 100 "open_request_threshold": 100
}, },
"speculative_deployment": false, "metrics": {
"JIT_deployment": false, "alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20 "id_size": 20
} }
...@@ -8,6 +8,8 @@ const fetch = require('node-fetch') ...@@ -8,6 +8,8 @@ const fetch = require('node-fetch')
const constants = require('../constants.json') const constants = require('../constants.json')
const secrets = require('./secrets.json') const secrets = require('./secrets.json')
const operator = require('./operator') const operator = require('./operator')
const sharedStructures = require('./shared_structures')
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.function_db_name + "/" metadataDB = metadataDB + "/" + constants.function_db_name + "/"
...@@ -17,7 +19,8 @@ metricsDB = metricsDB + "/" + constants.metrics_db_name + "/" ...@@ -17,7 +19,8 @@ metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const logger = libSupport.logger const logger = libSupport.logger
const registry_url = constants.registry_url const registry_url = constants.registry_url
let functionToResource = sharedStructures.functionToResource,
db = sharedStructures.db
router.post('/deploy', (req, res) => { router.post('/deploy', (req, res) => {
// let runtime = req.body.runtime // let runtime = req.body.runtime
...@@ -200,7 +203,6 @@ router.post('/execute/:id', (req, res) => { ...@@ -200,7 +203,6 @@ router.post('/execute/:id', (req, res) => {
.then(data => { .then(data => {
aliases = data aliases = data
let payload = JSON.parse(req.body.data) let payload = JSON.parse(req.body.data)
console.log(payload);
speculative_deployment(aliases, mapPlanner); speculative_deployment(aliases, mapPlanner);
orchestrator(res, payload, map, aliases, {}) orchestrator(res, payload, map, aliases, {})
}) })
...@@ -311,18 +313,10 @@ async function speculative_deployment(aliases, map) { ...@@ -311,18 +313,10 @@ async function speculative_deployment(aliases, map) {
console.log(mod, metadata, aliases[mod].alias); console.log(mod, metadata, aliases[mod].alias);
let url = metricsDB + aliases[mod].alias let url = metricsDB + aliases[mod].alias
console.log(url);
let data = libSupport.fetchData(url) let data = libSupport.fetchData(url)
console.log(data);
getData.push(data) getData.push(data)
} else { } else {
let payload = [{ notify(metadata.runtime, aliases[mod].alias)
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias })
}]
notify(payload)
} }
} }
if (constants.JIT_deployment) { if (constants.JIT_deployment) {
...@@ -354,15 +348,11 @@ async function speculative_deployment(aliases, map) { ...@@ -354,15 +348,11 @@ async function speculative_deployment(aliases, map) {
maxWait = done[dependency] maxWait = done[dependency]
} }
if (flag) { if (flag) {
console.log("notifying", mod); console.log("notification set", mod);
let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ? let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ?
maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0 maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0
console.log(mod, "max wait", maxWait, "notify time:", notifyTime); console.log(mod, "max wait", maxWait, "notify time:", notifyTime);
let payload = [{ setTimeout(notify, notifyTime, metadata.runtime, aliases[mod].alias)
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias })
}]
setTimeout(notify, notifyTime, payload)
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart
if (toBeDone.has(mod)) if (toBeDone.has(mod))
delete toBeDone[mod] delete toBeDone[mod]
...@@ -405,8 +395,18 @@ function readMap(filename, alias = false) { ...@@ -405,8 +395,18 @@ function readMap(filename, alias = false) {
}) })
} }
function notify(payload) { function notify(runtime, functionHash) {
console.log("check map: ", functionToResource.has(functionHash + runtime));
if (!functionToResource.has(functionHash + runtime) && !db.has(functionHash + runtime)) {
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ runtime, functionHash })
}]
libSupport.producer.send(payload, function () { }) libSupport.producer.send(payload, function () { })
} else {
console.log("resource already present: skipping speculation");
}
} }
function createDirectory(path) { function createDirectory(path) {
...@@ -423,4 +423,10 @@ function createDirectory(path) { ...@@ -423,4 +423,10 @@ function createDirectory(path) {
}) })
} }
module.exports = router; function initialise(functionToResource) {
this.functionToResource = functionToResource
}
module.exports = {
router, initialise
}
...@@ -13,6 +13,7 @@ const fetch = require('node-fetch'); ...@@ -13,6 +13,7 @@ const fetch = require('node-fetch');
const swStats = require('swagger-stats'); const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json'); const apiSpec = require('./swagger.json');
const util = require('util') const util = require('util')
const sharedStructures = require('./shared_structures')
/** /**
* URL to the couchdb database server used to store function metadata * URL to the couchdb database server used to store function metadata
...@@ -30,14 +31,14 @@ let date = new Date(); ...@@ -30,14 +31,14 @@ let date = new Date();
let log_channel = constants.topics.log_channel let log_channel = constants.topics.log_channel
let usedPort = new Map(), // TODO: remove after integration with RM let usedPort = new Map(), // TODO: remove after integration with RM
db = new Map(), // queue holding request to be dispatched db = sharedStructures.db, // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc resourceMap = sharedStructures.resourceMap, // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map(), // a function to resource map. Each map contains a minheap of functionToResource = sharedStructures.functionToResource, // a function to resource map. Each map contains a minheap of
// resources associated with the function // resources associated with the function
workerNodes = new Map(), // list of worker nodes currently known to the DM workerNodes = sharedStructures.workerNodes, // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions functionBranchTree = sharedStructures.functionBranchTree // a tree to store function branch predictions
chainHandler.initialise(functionToResource)
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
...@@ -64,11 +65,10 @@ app.use(morgan('combined', { ...@@ -64,11 +65,10 @@ app.use(morgan('combined', {
app.use(express.json()); app.use(express.json());
app.use(express.urlencoded({ extended: true })); app.use(express.urlencoded({ extended: true }));
const file_path = __dirname + "/repository/" const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path)); // file server hosting deployed functions app.use('/repository', express.static(file_path)); // file server hosting deployed functions
app.use(fileUpload()) app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); // statistics middleware app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); // statistics middleware
app.use('/serverless/chain', chainHandler); // chain router (explicit_chain_handler.js) for handling explicit chains app.use('/serverless/chain', chainHandler.router); // chain router (explicit_chain_handler.js) for handling explicit chains
let requestQueue = [] let requestQueue = []
const WINDOW_SIZE = 10 const WINDOW_SIZE = 10
...@@ -428,7 +428,11 @@ consumer.on('message', function (message) { ...@@ -428,7 +428,11 @@ consumer.on('message', function (message) {
functionHash = message.functionHash functionHash = message.functionHash
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`); logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
console.log("Resource Status: ", functionToResource); console.log("Resource Status: ", functionToResource);
if (!functionToResource.has(functionHash + runtime) && !db.has(functionHash + runtime)) {
console.log("adding db");
db.set(functionHash + runtime, [])
}
/** /**
* Request RM for resource * Request RM for resource
*/ */
...@@ -451,6 +455,7 @@ consumer.on('message', function (message) { ...@@ -451,6 +455,7 @@ consumer.on('message', function (message) {
}), }),
partition: 0 partition: 0
}] }]
producer.send(payloadToRM, () => { producer.send(payloadToRM, () => {
// db.set(functionHash + runtime, { req, res }) // db.set(functionHash + runtime, { req, res })
console.log("sent rm"); console.log("sent rm");
......
...@@ -9,7 +9,7 @@ const prom = require('prom-client'); ...@@ -9,7 +9,7 @@ const prom = require('prom-client');
const Registry = prom.Registry; const Registry = prom.Registry;
const register = new Registry(); const register = new Registry();
const alpha = 0.99 const alpha = constants.metrics.alpha
let log_channel = constants.topics.log_channel, let log_channel = constants.topics.log_channel,
metrics = { } metrics = { }
......
let db = new Map(), // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map(), // a function to resource map. Each map contains a minheap of
// resources associated with the function
workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions
module.exports = {
db, functionBranchTree, functionToResource, workerNodes, resourceMap
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment