Commit 91583a4b authored by Nilanjan Daw's avatar Nilanjan Daw

Merge branch 'explicit_function_chaining'

parents d52d8e53 85953f09
......@@ -6,3 +6,4 @@ firecracker*
secrets.json
resource_system/bin/**
resource_system/version.linux
local_experiments/
......@@ -18,5 +18,6 @@
"autoscalar_metrics": {
"open_request_threshold": 100
},
"speculative_deployment": true
"speculative_deployment": true,
"id_size": 20
}
\ No newline at end of file
{"id":"192.168.31.51","master_node":"10.129.6.5"}
\ No newline at end of file
{"id":"10.196.6.51","master_node":"10.129.6.5"}
\ No newline at end of file
......@@ -77,9 +77,9 @@ libSupport.makeTopic(node_id).then(() => {
/**
* download and start grunt
*/
libSupport.download(constants.grunt_host, "grunt").then(() => {
libSupport.download(constants.grunt_host, "grunt", false).then(() => {
logger.info("Downloaded grunt binary from repository")
fs.chmod('grunt', 0o555, (err) => {
fs.chmod('grunt', 0o755, (err) => {
logger.info("grunt made executable. Starting grunt")
let grunt = spawn('./grunt', [node_id])
grunt.stdout.on('data', data => {
......
const http = require('http');
const fetch = require('node-fetch');
const fs = require('fs');
const process = require('process')
const { spawnSync } = require('child_process');
......@@ -50,28 +50,48 @@ function makeTopic(id) {
})
}
var download = function (url, dest, cb) {
return new Promise((resolve, reject) => {
// var download = function (url, dest, check = true, cb) {
// return new Promise((resolve, reject) => {
// console.log(url);
// if (!check || !fs.existsSync(dest)) {
// var file = fs.createWriteStream(dest);
// var request = https.get(url, function (response) {
// response.pipe(file);
// file.on('finish', function () {
// file.close(cb); // close() is async, call cb after close completes.
// resolve();
// });
// }).on('error', function (err) { // Handle errors
// fs.unlink(dest); // Delete the file async. (But we don't check the result)
// logger.error("download failed" + err.message);
// if (cb) cb(err.message);
// reject(err);
// });
// } else {
// resolve();
// }
// })
// };
const download = (async (url, path, check = true) => {
if (!check || !fs.existsSync(path)) {
console.log(url);
if (!fs.existsSync(dest)) {
var file = fs.createWriteStream(dest);
var request = https.get(url, function (response) {
response.pipe(file);
file.on('finish', function () {
file.close(cb); // close() is async, call cb after close completes.
resolve();
});
}).on('error', function (err) { // Handle errors
fs.unlink(dest); // Delete the file async. (But we don't check the result)
if (cb) cb(err.message);
const res = await fetch(url);
const fileStream = fs.createWriteStream(path);
await new Promise((resolve, reject) => {
res.body.pipe(fileStream);
res.body.on("error", (err) => {
reject(err);
});
} else {
resolve();
}
})
};
fileStream.on("finish", function () {
resolve();
});
});
}
});
function makeid(length) {
var result = '';
......
......@@ -16,6 +16,7 @@
"kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
"node-fetch": "^2.6.0",
"redis": "^2.8.0",
"request": "^2.88.2",
"winston": "^3.2.1"
......
'use strict';
const express = require('express')
const libSupport = require('./lib')
const router = express.Router()
const fs = require('fs')
const { spawn } = require('child_process')
const fetch = require('node-fetch')
const constants = require('../constants.json')
const secrets = require('./secrets.json')
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
const logger = libSupport.logger
const registry_url = constants.registry_url
router.post('/deploy', (req, res) => {
// let runtime = req.body.runtime
let files = req.files
const chain_id = libSupport.makeid(constants.id_size)
const file_path = __dirname + "/repository/"
let aliases = {}
let deployHandles = []
createDirectory(file_path).then(() => {
for (const [file_alias, file] of Object.entries(files)) {
let functionHash = file.md5
if (file_alias === 'map') {
file.mv(file_path + 'map' + chain_id + ".json")
continue
}
// aliases[file_alias] = functionHash
deployHandles.push(deploy(file_path, functionHash, file, aliases, file_alias))
}
console.log("aliases", aliases);
Promise.all(deployHandles).then(() => {
console.log("done");
fs.writeFile(file_path + `aliases${chain_id}.json`, JSON.stringify(aliases, null, 2), function(err) {
res.json({
status: "success",
function_id: chain_id
})
})
}).catch(err => {
res.json({
status: "error",
reason: err
}).status(400)
})
})
})
async function deploy(file_path, functionHash, file, aliases, file_alias) {
let runtime = "container", memory = 330
try {
await moveFile(file, file_path, functionHash)
functionHash = libSupport.generateExecutor(file_path, functionHash)
aliases[file_alias] = functionHash
/**
* Adding meta caching via couchdb
* This will create / update function related metadata like resource limits etc
* on a database named "serverless".
*/
let res = await fetch(metadataDB + functionHash)
let json = await res.json()
console.log(json);
if (json.error === "not_found") {
logger.warn("New function, creating metadata")
await fetch(metadataDB + functionHash, {
method: 'put',
body: JSON.stringify({
memory: memory
}),
headers: { 'Content-Type': 'application/json' },
})
// let json = await res.json()
// console.log(json)
} else {
logger.warn('Repeat deployment, updating metadata')
try {
await fetch(metadataDB + functionHash, {
method: 'put',
body: JSON.stringify({
memory: memory,
_rev: json._rev
}),
headers: { 'Content-Type': 'application/json' },
})
// let json = await res.json()
// console.log(json)
} catch (err) {
console.log(err);
}
}
if (runtime === "container") {
try {
await deployContainer(file_path, functionHash)
console.log("called");
return Promise.resolve(functionHash)
} catch(err) {
return Promise.reject(err)
}
} else {
return Promise.resolve(functionHash)
}
} catch (err) {
logger.error(err)
return Promise.reject(err)
}
}
function moveFile(file, file_path, functionHash) {
return new Promise((resolve, reject) =>{
file.mv(file_path + functionHash, function (err) {
if (err)
reject(err)
resolve()
})
})
}
async function deployContainer(path, imageName) {
return new Promise((resolve, reject) => {
let buildStart = Date.now()
fs.writeFile('./repository/Dockerfile' + imageName,
`FROM node:latest
WORKDIR /app
COPY ./worker_env/package.json /app
ADD ./worker_env/node_modules /app/node_modules
COPY ${imageName}.js /app
ENTRYPOINT ["node", "${imageName}.js"]`
, function (err) {
if (err) {
logger.error("failed", err);
reject(err);
}
else {
logger.info('Dockerfile created');
const process = spawn('docker', ["build", "-t", registry_url + imageName, path, "-f", path + `Dockerfile${imageName}`]);
process.stdout.on('data', (data) => {
logger.info(`stdout: ${data}`);
});
process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
});
process.on('close', (code) => {
logger.warn(`child process exited with code ${code}`);
let timeDifference = Math.ceil((Date.now() - buildStart))
logger.info("image build time taken: ", timeDifference);
const process_push = spawn('docker', ["push", registry_url + imageName]);
process_push.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});
process_push.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
});
process_push.on('close', (code) => {
logger.info("image pushed to repository");
resolve();
})
});
}
});
})
}
router.post('/execute/:id', (req, res) => {
let map, aliases
// if (req.body.map)
// map = req.body.map
// else {
if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${req.params.id}.json`)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
console.log(payload);
orchestrator(res, payload, map, aliases, [])
})
} else {
readMap(`./repository/map${req.params.id}.json`)
.then(data => {
map = data
readMap(`./repository/aliases${req.params.id}.json`)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
console.log(payload);
orchestrator(res, payload, map, aliases, [])
})
})
}
})
function orchestrator(res, payload, map, aliases, result) {
return new Promise((resolve, reject) => {
console.log("result before run", result);
if (Object.keys(map).length == 0) {
console.log("time to resolve");
res.json(result)
// return resolve(result)
}
else {
for (const [functionName, metadata] of Object.entries(map)) {
// console.log(functionName, metadata, aliases[functionName]);
if (metadata.wait_for.length == 0) {
let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName]}`
console.log(url);
let data = {
method: 'post',
body: JSON.stringify({
runtime: metadata.runtime,
payload
}),
headers: { 'Content-Type': 'application/json' }
}
delete map[functionName]
fetch(url, data).then(res => res.json())
.then(json => {
console.log(json);
result.push(json)
for (const [_key, metadata] of Object.entries(map)) {
let index = metadata.wait_for.indexOf(functionName)
if (index >= 0)
metadata.wait_for.splice(index, 1);
}
console.log(map, "after run");
orchestrator(res, payload, map, aliases, result)
})
}
}
// return resolve(result)
}
// await fetch(constants.master_address)
})
}
function readMap(filename) {
return new Promise((resolve, reject) => {
fs.readFile(filename, (err, data) => {
if (err)
reject(err)
else {
const object = JSON.parse(data)
resolve(object)
}
})
})
}
function createDirectory(path) {
return new Promise((resolve, reject) => {
if (!fs.existsSync(path)) {
fs.mkdir(path, err => {
if (err)
reject();
resolve();
})
} else {
resolve();
}
})
}
module.exports = router;
"use strict";
const express = require('express')
const bodyParser = require('body-parser')
const express = require('express');
const bodyParser = require('body-parser');
const fileUpload = require('express-fileupload');
const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const fs = require('fs')
const constants = require('.././constants.json');
const chainHandler = require('./explicit_chain_handler');
const secrets = require('./secrets.json');
const fs = require('fs');
const { spawn } = require('child_process');
const morgan = require('morgan')
const heap = require('heap')
const morgan = require('morgan');
const heap = require('heap');
const fetch = require('node-fetch');
const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json');
......@@ -29,6 +30,7 @@ let usedPort = new Map(), // TODO: remove after integration with RM
// resources associated with the function
workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -53,14 +55,14 @@ let kafka = require('kafka-node'),
app.use(morgan('combined', {
skip: function (req, res) { return res.statusCode < 400 }
}))
app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json())
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path));
app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec }));
app.use('/serverless/chain', chainHandler);
let requestQueue = []
const WINDOW_SIZE = 10
......@@ -207,10 +209,12 @@ app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
res.timestamp = Date.now()
if (functionToResource.has(id)) {
res.start = 'warmstart'
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
} else {
res.start = 'coldstart'
/**
* Requests are queued up before being dispatched. To prevent requests coming in for the
* same function from starting too many workers, they are grouped together
......@@ -244,7 +248,7 @@ function dispatch() {
if (!db.has(functionHash + runtime)) {
db.set(functionHash + runtime, [])
db.get(functionHash + runtime).push({ req, res })
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
let resource_id = libSupport.makeid(constants.id_size) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
logger.info("Requesting RM " + JSON.stringify({
......@@ -256,7 +260,7 @@ function dispatch() {
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null,
deployed: false
deployed: false, deploy_request_time: Date.now()
})
......@@ -314,29 +318,32 @@ function dispatch() {
}
}
/**
* Handles post deployment metadata updates and starts reverse-proxying
* @param {string} message Message received from DD after deployment
*/
function postDeploy(message) {
logger.info("Deployed Resource: " + JSON.stringify(message));
let id = message.functionHash + message.runtime
if (message.status == false) {
let sendQueue = db.get(message.functionHash + message.runtime)
let sendQueue = db.get(id)
// TODO: handle failure
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
res.status(400).json({ reason: message.reason })
}
db.delete(message.functionHash + message.runtime)
db.delete(id)
return;
}
if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
if (functionToResource.has(id)) {
let resourceHeap = functionToResource.get(id)
heap.push(resourceHeap, {
resource_id: message.resource_id,
open_request_count: 0
}, libSupport.compare)
logger.warn("Horizontally scaling up: " +
JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
JSON.stringify(functionToResource.get(id)));
} else {
/**
......@@ -349,36 +356,24 @@ function postDeploy(message) {
resource_id: message.resource_id,
open_request_count: 0
}, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap)
functionToResource.set(id, resourceHeap)
logger.warn("Creating new resource pool"
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
+ JSON.stringify(functionToResource.get(id)));
}
try {
let resource = resourceMap.get(message.resource_id)
resource.deployed = true
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
// type: "deployment_launch",
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
entity_id: message.entity_id,
"reason": "deployment",
"status": true,
"timestamp": Date.now()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
libSupport.logBroadcast({
entity_id: message.entity_id,
"reason": "deployment",
"status": true,
starttime: (Date.now() - resource.deploy_request_time)
}, message.resource_id, resourceMap)
if (db.has(message.functionHash + message.runtime)) {
let sendQueue = db.get(message.functionHash + message.runtime)
if (db.has(id)) {
let sendQueue = db.get(id)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
......@@ -387,8 +382,12 @@ function postDeploy(message) {
})
}
db.delete(message.functionHash + message.runtime)
db.delete(id)
}
libSupport.metrics.collectMetrics({type: "scale", value:
functionToResource.get(id).length,
functionHash: id})
} catch (e) {
logger.error(e.message)
}
......@@ -428,8 +427,9 @@ consumer.on('message', function (message) {
// process.exit(0)
}
usedPort.delete(message.port)
if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceArray = functionToResource.get(message.functionHash + message.runtime)
let id = message.functionHash + message.runtime
if (functionToResource.has(id)) {
let resourceArray = functionToResource.get(id)
for (let i = 0; i < resourceArray.length; i++)
if (resourceArray[i].resource_id === message.resource_id) {
resourceArray.splice(i, 1);
......@@ -437,17 +437,28 @@ consumer.on('message', function (message) {
}
heap.heapify(resourceArray, libSupport.compare)
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
functionToResource.delete(message.functionHash + message.runtime)
libSupport.metrics.collectMetrics({type: "scale", value:
resourceArray.length,
functionHash: id})
libSupport.logBroadcast({
entity_id: message.entity_id,
"reason": "terminate",
"total_request": message.total_request,
"status": true
}, message.resource_id, resourceMap)
.then(() => {
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
functionToResource.delete(id)
})
}
} else if (topic == constants.topics.hscale) {
message = JSON.parse(message)
let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID
let resource_id = libSupport.makeid(constants.id_size), // each function resource request is associated with an unique ID
runtime = message.runtime,
functionHash = message.functionHash
console.log("Resource Status: ", functionToResource);
......@@ -461,7 +472,7 @@ consumer.on('message', function (message) {
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null,
deployed: false
deployed: false, deploy_request_time: Date.now()
})
......@@ -531,7 +542,18 @@ function autoscalar() {
}
function periodicMetricBroadcast() {
let message = {}, flag = false
functionToResource.forEach((functionHeap, functionHash) => {
if (functionHeap.length > 0) {
message[functionHash] = functionHeap.length
libSupport.metrics.collectMetrics({type: "scale", value: functionHeap.length, functionHash: functionHash})
}
})
}
setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
// setInterval(periodicMetricBroadcast, 5000)
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
......@@ -4,10 +4,18 @@ const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const constants = require('.././constants.json')
const metrics = require('./metrics')
const { createLogger, format, transports } = winston;
const heap = require('heap')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
autoConnect: true
}),
producer = new Producer(client)
/**
* Generates unique IDs of arbitrary length
* @param {Length of the ID} length
......@@ -77,16 +85,19 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
rp(options)
.then(function (parsedBody) {
let serviceTime = Date.now() - res.timestamp
res.json(parsedBody)
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: id})
resolve()
})
.catch(function (err) {
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
logger.error("error" + err.error.errno);
logger.error("error" + err);
res.json(err.message).status(err.statusCode)
resolve()
});
......@@ -257,12 +268,40 @@ function viterbi(functionBranchTree) {
metadata.mle_path = path
}
});
}
function logBroadcast(message, resource_id, resourceMap) {
return new Promise((resolve, reject) => {
try {
message.timestamp = Date.now()
if (resource_id && resourceMap.has(resource_id)) {
let resource = resourceMap.get(resource_id)
message.resource_id = resource_id
message.node_id = resource.node_id
message.runtime = resource.runtime
message.function_id = resource.functionHash
}
let log = [{
topic: constants.log_channel,
messages: JSON.stringify(message),
partition: 0
}]
producer.send(log, () => {
resolve()
})
} catch (err) {
console.log(err);
reject()
}
})
}
setInterval(metrics.broadcastMetrics, 5000)
module.exports = {
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi
viterbi, logBroadcast, metrics
}
\ No newline at end of file
const constants = require('.././constants.json');
let log_channel = constants.log_channel,
metrics = { }
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
autoConnect: true
}),
producer = new Producer(client)
function collectMetrics(metric) {
if (!(metric.functionHash in metrics)) {
metrics[metric.functionHash] = {
shortterm: {
coldstart: 0,
coldstart_total_request: 0,
warm_total_request: 0,
warmstart: 0,
worker_count: 0
}
}
}
if (metric.type === 'coldstart') {
metrics[metric.functionHash].shortterm.coldstart += metric.value
metrics[metric.functionHash].shortterm.coldstart_total_request += 1
} else if (metric.type === 'warmstart') {
metrics[metric.functionHash].shortterm.warmstart += metric.value
metrics[metric.functionHash].shortterm.warm_total_request += 1
} else if (metric.type === 'scale') {
metrics[metric.functionHash].worker_count = metric.value
}
}
function broadcastMetrics() {
if (Object.keys(metrics).length !== 0) {
for (let [functionHash, metric] of Object.entries(metrics)) {
if (metric.longterm === undefined) {
metric.longterm = {
coldstart: 0,
coldstart_total_request: 0,
warm_total_request: 0,
warmstart: 0
}
}
metric.longterm.coldstart = metric.longterm.coldstart
* metric.longterm.coldstart_total_request
+ metric.shortterm.coldstart
metric.longterm.coldstart_total_request += metric.shortterm.coldstart_total_request
metric.longterm.coldstart /= (metric.longterm.coldstart_total_request != 0)?
metric.longterm.coldstart_total_request: 1
metric.longterm.warmstart = metric.longterm.warmstart
* metric.longterm.warm_total_request
+ metric.shortterm.warmstart
metric.longterm.warm_total_request += metric.shortterm.warm_total_request
metric.longterm.warmstart /= (metric.longterm.warm_total_request != 0)?
metric.longterm.warm_total_request: 1
metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0)?
metric.shortterm.coldstart_total_request: 1
metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0)?
metric.shortterm.warm_total_request: 1
metric.timestamp = Date.now()
}
let log = [{
topic: log_channel,
messages: JSON.stringify({
metrics
}),
partition: 0
}]
producer.send(log, () => { })
for (let [functionHash, metric] of Object.entries(metrics)) {
metric.shortterm = {
coldstart: 0,
coldstart_total_request: 0,
warm_total_request: 0,
warmstart: 0,
worker_count: 0
}
}
}
}
module.exports = {
collectMetrics, broadcastMetrics
}
\ No newline at end of file
......@@ -5,7 +5,7 @@ let request = require('request')
const process = require('process')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 30
let port = 5000, resource_id, functionHash, runtime, idleTime = 300
resource_id = process.argv[2]
functionHash = process.argv[3]
......@@ -36,13 +36,15 @@ app.post('/serverless/function/execute/', (req, res) => {
})
})
app.post('/serverless/worker/timeout', (req, res) => {
app.post('/serverless/function/timeout', (req, res) => {
idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
function executor(payload) {
async function executor(payload) {
return new Promise((resolve, reject) => {
})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment