Commit 901024e7 authored by Nilanjan Daw's avatar Nilanjan Daw

Merge branch 'function_chain_predict'

parents c945c9a1 6c6e37e5
...@@ -8,6 +8,15 @@ ...@@ -8,6 +8,15 @@
"couchdb_host": "10.129.6.5:5984", "couchdb_host": "10.129.6.5:5984",
"couchdb_db_name": "serverless", "couchdb_db_name": "serverless",
"topics": { "topics": {
"request_dm_2_rm": "request",
} "heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"speculative_deployment": true
} }
\ No newline at end of file
{"id":"10.196.6.51","master_node":"10.129.6.5"} {"id":"192.168.31.51","master_node":"10.129.6.5"}
\ No newline at end of file \ No newline at end of file
...@@ -27,7 +27,8 @@ let usedPort = new Map(), // TODO: remove after integration with RM ...@@ -27,7 +27,8 @@ let usedPort = new Map(), // TODO: remove after integration with RM
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map(), // a function to resource map. Each map contains a minheap of functionToResource = new Map(), // a function to resource map. Each map contains a minheap of
// resources associated with the function // resources associated with the function
workerNodes = new Map() workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -39,11 +40,11 @@ let kafka = require('kafka-node'), ...@@ -39,11 +40,11 @@ let kafka = require('kafka-node'),
Consumer = kafka.Consumer, Consumer = kafka.Consumer,
consumer = new Consumer(client, consumer = new Consumer(client,
[ [
{ topic: 'heartbeat' }, // receives heartbeat messages from workers, also acts as worker join message { topic: constants.topics.heartbeat }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: "deployed" }, // receives deployment confirmation from workers { topic: constants.topics.deployed }, // receives deployment confirmation from workers
{ topic: "removeWorker" }, // received when a executor environment is blown at the worker { topic: constants.topics.remove_worker }, // received when a executor environment is blown at the worker
{ topic: "RESPONSE_RM_2_DM" }, // receives deployment details from RM { topic: constants.topics.response_rm_2_dm }, // receives deployment details from RM
{ topic: "hscale" } // receives signals for horizontal scaling { topic: constants.topics.hscale } // receives signals for horizontal scaling
], ],
[ [
{ autoCommit: true } { autoCommit: true }
...@@ -65,7 +66,6 @@ let requestQueue = [] ...@@ -65,7 +66,6 @@ let requestQueue = []
const WINDOW_SIZE = 10 const WINDOW_SIZE = 10
const port = constants.master_port const port = constants.master_port
const registry_url = constants.registry_url const registry_url = constants.registry_url
const AUTOSCALAR_THRESHOLD = 100;
/** /**
* REST API to receive deployment requests * REST API to receive deployment requests
...@@ -207,8 +207,9 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -207,8 +207,9 @@ app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime let runtime = req.body.runtime
let id = req.params.id + runtime let id = req.params.id + runtime
if (functionToResource.has(id)) { if (functionToResource.has(id)) {
libSupport.reverseProxy(req, res, functionToResource, resourceMap) libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
} else { } else {
/** /**
* Requests are queued up before being dispatched. To prevent requests coming in for the * Requests are queued up before being dispatched. To prevent requests coming in for the
...@@ -254,12 +255,13 @@ function dispatch() { ...@@ -254,12 +255,13 @@ function dispatch() {
/** uncomment when RM is unavailable */ /** uncomment when RM is unavailable */
resourceMap.set(resource_id, { resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null runtime, functionHash, port: null, node_id: null,
deployed: false
}) })
let payloadToRM = [{ let payloadToRM = [{
topic: "REQUEST_DM_2_RM", // changing from REQUEST_DM_2_RM topic: constants.topics.request_dm_2_rm, // changing from REQUEST_DM_2_RM
messages: JSON.stringify({ messages: JSON.stringify({
resource_id, resource_id,
"memory": 332, "memory": 332,
...@@ -270,8 +272,39 @@ function dispatch() { ...@@ -270,8 +272,39 @@ function dispatch() {
producer.send(payloadToRM, () => { producer.send(payloadToRM, () => {
// db.set(functionHash + runtime, { req, res }) // db.set(functionHash + runtime, { req, res })
console.log("sent rm"); console.log("sent rm");
}) })
/**
* Speculative deployment:
* If function MLE path is present then deploy those parts of the path which are
* not already running
*/
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node })
}]
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
}
}
}
}
} else { } else {
logger.info("deployment process already started waiting") logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res }) db.get(functionHash + runtime).push({ req, res })
...@@ -321,8 +354,10 @@ function postDeploy(message) { ...@@ -321,8 +354,10 @@ function postDeploy(message) {
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime))); + JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
} }
let resource = resourceMap.get(message.resource_id)
try { try {
let resource = resourceMap.get(message.resource_id)
resource.deployed = true
let confirmRM = [{ let confirmRM = [{
topic: log_channel, topic: log_channel,
messages: JSON.stringify({ messages: JSON.stringify({
...@@ -347,7 +382,7 @@ function postDeploy(message) { ...@@ -347,7 +382,7 @@ function postDeploy(message) {
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource)); logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) { while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift() let { req, res } = sendQueue.shift()
libSupport.reverseProxy(req, res, functionToResource, resourceMap) libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
.then(() => { .then(() => {
}) })
...@@ -369,7 +404,7 @@ consumer.on('message', function (message) { ...@@ -369,7 +404,7 @@ consumer.on('message', function (message) {
logger.info("response " + message); logger.info("response " + message);
} else if (topic === "heartbeat") { } else if (topic === constants.topics.heartbeat) {
message = JSON.parse(message) message = JSON.parse(message)
if (Date.now() - message.timestamp < 1000) if (Date.now() - message.timestamp < 1000)
if (!workerNodes.has(message.address)) { if (!workerNodes.has(message.address)) {
...@@ -377,7 +412,7 @@ consumer.on('message', function (message) { ...@@ -377,7 +412,7 @@ consumer.on('message', function (message) {
logger.warn("New worker discovered. Worker List: ") logger.warn("New worker discovered. Worker List: ")
logger.warn(workerNodes) logger.warn(workerNodes)
} }
} else if (topic == "deployed") { } else if (topic == constants.topics.deployed) {
try { try {
message = JSON.parse(message) message = JSON.parse(message)
} catch (e) { } catch (e) {
...@@ -385,7 +420,7 @@ consumer.on('message', function (message) { ...@@ -385,7 +420,7 @@ consumer.on('message', function (message) {
} }
postDeploy(message) postDeploy(message)
} else if (topic == "removeWorker") { } else if (topic == constants.topics.remove_worker) {
logger.warn("Worker blown: Removing Metadata " + message); logger.warn("Worker blown: Removing Metadata " + message);
try { try {
message = JSON.parse(message) message = JSON.parse(message)
...@@ -410,7 +445,7 @@ consumer.on('message', function (message) { ...@@ -410,7 +445,7 @@ consumer.on('message', function (message) {
} }
} else if (topic == "hscale") { } else if (topic == constants.topics.hscale) {
message = JSON.parse(message) message = JSON.parse(message)
let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID
runtime = message.runtime, runtime = message.runtime,
...@@ -425,12 +460,13 @@ consumer.on('message', function (message) { ...@@ -425,12 +460,13 @@ consumer.on('message', function (message) {
/** uncomment when RM is unavailable */ /** uncomment when RM is unavailable */
resourceMap.set(resource_id, { resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null runtime, functionHash, port: null, node_id: null,
deployed: false
}) })
let payloadToRM = [{ let payloadToRM = [{
topic: "request", // changing from REQUEST_DM_2_RM topic: constants.topics.request_dm_2_rm, // changing from REQUEST_DM_2_RM
messages: JSON.stringify({ messages: JSON.stringify({
resource_id, resource_id,
"memory": 332, "memory": 332,
...@@ -442,7 +478,7 @@ consumer.on('message', function (message) { ...@@ -442,7 +478,7 @@ consumer.on('message', function (message) {
console.log("sent rm"); console.log("sent rm");
}) })
} else if (topic == "RESPONSE_RM_2_DM") { } else if (topic == constants.topics.response_rm_2_dm) {
logger.info("Response from RM: " + message); logger.info("Response from RM: " + message);
message = JSON.parse(message) message = JSON.parse(message)
...@@ -481,11 +517,12 @@ consumer.on('message', function (message) { ...@@ -481,11 +517,12 @@ consumer.on('message', function (message) {
function autoscalar() { function autoscalar() {
functionToResource.forEach((resourceList, functionKey, map) => { functionToResource.forEach((resourceList, functionKey, map) => {
if (resourceList.length > 0 && resourceList[resourceList.length - 1].open_request_count > AUTOSCALAR_THRESHOLD) { if (resourceList.length > 0 &&
resourceList[resourceList.length - 1].open_request_count > constants.autoscalar_metrics.open_request_threshold) {
let resource = resourceMap.get(resourceList[resourceList.length - 1].resource_id) let resource = resourceMap.get(resourceList[resourceList.length - 1].resource_id)
logger.warn(`resource ${resourceList[resourceList.length - 1]} exceeded autoscalar threshold. Scaling up!`) logger.warn(`resource ${resourceList[resourceList.length - 1]} exceeded autoscalar threshold. Scaling up!`)
let payload = [{ let payload = [{
topic: "hscale", topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": resource.runtime, "functionHash": resource.functionHash }) messages: JSON.stringify({ "runtime": resource.runtime, "functionHash": resource.functionHash })
}] }]
producer.send(payload, function () { }) producer.send(payload, function () { })
...@@ -494,7 +531,7 @@ function autoscalar() { ...@@ -494,7 +531,7 @@ function autoscalar() {
} }
setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000); setInterval(autoscalar, 1000);
setInterval(dispatch, 1000); setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`)) app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
...@@ -7,7 +7,6 @@ const constants = require('.././constants.json') ...@@ -7,7 +7,6 @@ const constants = require('.././constants.json')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap') const heap = require('heap')
functionBranchTree = new Map() // a tree to store function branch predictions
/** /**
* Generates unique IDs of arbitrary length * Generates unique IDs of arbitrary length
...@@ -46,8 +45,8 @@ function generateExecutor(functionPath, functionHash) { ...@@ -46,8 +45,8 @@ function generateExecutor(functionPath, functionHash) {
return hash return hash
} }
function reverseProxy(req, res, functionToResource, resourceMap) { function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) {
branchChainPredictor(req, resourceMap) branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree)
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let runtime = req.body.runtime let runtime = req.body.runtime
let id = req.params.id + runtime let id = req.params.id + runtime
...@@ -140,7 +139,7 @@ function compare(a, b) { ...@@ -140,7 +139,7 @@ function compare(a, b) {
return a.open_request_count - b.open_request_count return a.open_request_count - b.open_request_count
} }
function branchChainPredictor(req, resourceMap) { function branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree) {
// console.log(req.headers['x-resource-id']); // console.log(req.headers['x-resource-id']);
if (req.headers['x-resource-id'] === undefined) { if (req.headers['x-resource-id'] === undefined) {
...@@ -191,13 +190,14 @@ function branchChainPredictor(req, resourceMap) { ...@@ -191,13 +190,14 @@ function branchChainPredictor(req, resourceMap) {
} }
} }
console.log("branch tree", functionBranchTree); // console.log("branch tree", functionBranchTree);
} }
function viterbi() { function viterbi(functionBranchTree) {
let path = []
functionBranchTree.forEach((metadata, node) => { functionBranchTree.forEach((metadata, node) => {
if (metadata.parent) { if (metadata.parent && metadata.req_count % 5 == 0) {
let path = []
let parents = [[node, { let parents = [[node, {
prob: 1, prob: 1,
metadata metadata
...@@ -252,15 +252,17 @@ function viterbi() { ...@@ -252,15 +252,17 @@ function viterbi() {
path.push({node: maxSibling, probability: maxProb}) path.push({node: maxSibling, probability: maxProb})
siblings = new Map() siblings = new Map()
} }
}
});
if (path.length > 0) if (path.length > 0)
console.log("path", path); console.log("path", path);
metadata.mle_path = path
}
});
}
setInterval(viterbi, 5000) }
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, getPort, logger, compare makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi
} }
\ No newline at end of file
...@@ -5,7 +5,7 @@ let request = require('request') ...@@ -5,7 +5,7 @@ let request = require('request')
const process = require('process') const process = require('process')
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime let port = 5000, resource_id, functionHash, runtime, idleTime = 30
resource_id = process.argv[2] resource_id = process.argv[2]
functionHash = process.argv[3] functionHash = process.argv[3]
...@@ -25,46 +25,54 @@ let kafka = require('kafka-node'), ...@@ -25,46 +25,54 @@ let kafka = require('kafka-node'),
app.use(bodyParser.urlencoded({ extended: true })) app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json()) app.use(bodyParser.json())
let lastRequest = Date.now()
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => { app.post('/serverless/function/execute/', (req, res) => {
let payload = req.body let payload = req.body
lastRequest = Date.now() lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => { executor(payload).then((result) => {
res.json(result) res.json(result)
}) })
}) })
app.post('/serverless/worker/timeout', (req, res) => {
idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
})
function executor(payload) { function executor(payload) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
}) })
} }
app.post('/serverless/function/execute/2', (req, res) => {
console.log("2", JSON.stringify(req.headers))
res.send("done")
})
app.listen(port, () => { app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`) console.log(`Resource ${resource_id} Server listening on port ${port}!`)
producer.send( producer.send(
[{ [{
topic: "deployed", topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id, entity_id: process.pid}), messages: JSON.stringify({ functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid}),
"status": true "status": true
}], () => { }) }], () => { })
}) })
function shouldDie() { function shouldDie() {
if (Date.now() - lastRequest > 30 * 1000) { if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest
})
console.log("Idle for too long. Exiting"); console.log("Idle for too long. Exiting");
producer.send( producer.send(
[{ [
topic: "removeWorker", {topic: "removeWorker", messages: message }
messages: JSON.stringify({ functionHash, portExternal: port, runtime, resource_id, entity_id: process.pid}) ], () => {
}], () => {
console.log("Ending worker for function", functionHash, "resource_id", resource_id); console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0) process.exit(0)
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment