Commit 1e2d3ca4 authored by Naman Dixit's avatar Naman Dixit

Merge on pull

parents 85c0cfb7 0198dbc7
......@@ -6,3 +6,4 @@ firecracker*
secrets.json
resource_system/bin/**
resource_system/version.linux
local_experiments/
{
"registry_url" :"10.129.6.5:5000/",
"registry_url" :"localhost:5000/",
"master_port": 8080,
"master_address": "localhost",
"kafka_host": "10.129.6.5:9092",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON",
"couchdb_host": "10.129.6.5:5984",
"couchdb_db_name": "serverless",
"couchdb_host": "localhost:5984",
"function_db_name": "serverless",
"metrics_db_name": "metrics",
"implicit_chain_db_name": "implicit_chain",
"network": {
"network_bridge": "hybrid_kafka-serverless",
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "localhost:29092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale"
"hscale": "hscale",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"speculative_deployment": true
}
\ No newline at end of file
"speculative_deployment": false,
"JIT_deployment": false,
"id_size": 20
}
{"id":"192.168.31.51","master_node":"10.129.6.5"}
\ No newline at end of file
{"id":"192.168.0.105","master_node":"192.168.0.105"}
\ No newline at end of file
......@@ -17,7 +17,7 @@ function runIsolate(local_repository, metadata) {
return new Promise((resolve, reject) => {
const worker = new Worker(filename, {
argv: [resource_id, functionHash, port, "isolate"],
argv: [resource_id, functionHash, port, "isolate", constants.network.external.kafka_host],
resourceLimits: {
maxOldGenerationSizeMb: memory
}
......@@ -43,7 +43,8 @@ function runProcess(local_repository, metadata) {
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('node', [filename, resource_id, functionHash, port, "process", `--max-old-space-size=${memory}` ]);
const process = spawn('node', [filename, resource_id, functionHash, port, "process",
constants.network.external.kafka_host, `--max-old-space-size=${memory}` ]);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
......@@ -91,8 +92,9 @@ function runContainer(metadata) {
if (code != 0)
reject("error")
else {
const process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`, "--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container"]);
const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`,
"--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
......@@ -118,8 +120,9 @@ function runContainer(metadata) {
} else {
logger.info("container starting at port", port);
const process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container"]);
const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`,
"-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
......
......@@ -11,7 +11,7 @@ const fs = require('fs')
const fetch = require('node-fetch');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
metadataDB = metadataDB + "/" + constants.function_db_name + "/"
const kafka = require('kafka-node')
const logger = libSupport.logger
......@@ -21,7 +21,7 @@ const host_url = "http://" + constants.master_address + ":" + constants.master_p
let Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client),
......@@ -77,9 +77,9 @@ libSupport.makeTopic(node_id).then(() => {
/**
* download and start grunt
*/
libSupport.download(constants.grunt_host, "grunt").then(() => {
libSupport.download(constants.grunt_host, "grunt", false).then(() => {
logger.info("Downloaded grunt binary from repository")
fs.chmod('grunt', 0o555, (err) => {
fs.chmod('grunt', 0o755, (err) => {
logger.info("grunt made executable. Starting grunt")
let grunt = spawn('./grunt', [node_id])
grunt.stdout.on('data', data => {
......
const http = require('http');
const fetch = require('node-fetch');
const fs = require('fs');
const process = require('process')
const { spawnSync } = require('child_process');
......@@ -30,7 +30,7 @@ function makeTopic(id) {
console.log("Using Primary IP", id, "as topic");
let client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
Producer = kafka.Producer,
......@@ -50,28 +50,48 @@ function makeTopic(id) {
})
}
var download = function (url, dest, cb) {
return new Promise((resolve, reject) => {
// var download = function (url, dest, check = true, cb) {
// return new Promise((resolve, reject) => {
// console.log(url);
// if (!check || !fs.existsSync(dest)) {
// var file = fs.createWriteStream(dest);
// var request = https.get(url, function (response) {
// response.pipe(file);
// file.on('finish', function () {
// file.close(cb); // close() is async, call cb after close completes.
// resolve();
// });
// }).on('error', function (err) { // Handle errors
// fs.unlink(dest); // Delete the file async. (But we don't check the result)
// logger.error("download failed" + err.message);
// if (cb) cb(err.message);
// reject(err);
// });
// } else {
// resolve();
// }
// })
// };
const download = (async (url, path, check = true) => {
if (!check || !fs.existsSync(path)) {
console.log(url);
if (!fs.existsSync(dest)) {
var file = fs.createWriteStream(dest);
var request = https.get(url, function (response) {
response.pipe(file);
file.on('finish', function () {
file.close(cb); // close() is async, call cb after close completes.
resolve();
});
}).on('error', function (err) { // Handle errors
fs.unlink(dest); // Delete the file async. (But we don't check the result)
if (cb) cb(err.message);
const res = await fetch(url);
const fileStream = fs.createWriteStream(path);
await new Promise((resolve, reject) => {
res.body.pipe(fileStream);
res.body.on("error", (err) => {
reject(err);
});
} else {
resolve();
}
})
};
fileStream.on("finish", function () {
resolve();
});
});
}
});
function makeid(length) {
var result = '';
......
......@@ -16,6 +16,7 @@
"kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
"node-fetch": "^2.6.0",
"redis": "^2.8.0",
"request": "^2.88.2",
"winston": "^3.2.1"
......
'use strict';
const express = require('express')
const libSupport = require('./lib')
const router = express.Router()
const fs = require('fs')
const { spawn } = require('child_process')
const fetch = require('node-fetch')
const constants = require('../constants.json')
const secrets = require('./secrets.json')
const operator = require('./operator')
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.function_db_name + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const logger = libSupport.logger
const registry_url = constants.registry_url
router.post('/deploy', (req, res) => {
// let runtime = req.body.runtime
let files = req.files
const chain_id = libSupport.makeid(constants.id_size)
const file_path = __dirname + "/repository/"
let aliases = {}
let deployHandles = []
createDirectory(file_path).then(() => {
for (const [file_alias, file] of Object.entries(files)) {
let functionHash = file.md5
if (file_alias === 'map') {
file.mv(file_path + 'map' + chain_id + ".json")
continue
}
// aliases[file_alias] = functionHash
deployHandles.push(deploy(file_path, functionHash, file, aliases, file_alias))
}
console.log("aliases", aliases);
Promise.all(deployHandles).then(() => {
console.log("done");
fs.writeFile(file_path + `aliases${chain_id}.json`, JSON.stringify(aliases, null, 2), function(err) {
res.json({
status: "success",
function_id: chain_id
})
})
}).catch(err => {
res.json({
status: "error",
reason: err
}).status(400)
})
})
})
async function deploy(file_path, functionHash, file, aliases, file_alias) {
let runtime = "container", memory = 330
try {
await moveFile(file, file_path, functionHash)
functionHash = libSupport.generateExecutor(file_path, functionHash)
aliases[file_alias] = functionHash
/**
* Adding meta caching via couchdb
* This will create / update function related metadata like resource limits etc
* on a database named "serverless".
*/
let res = await fetch(metadataDB + functionHash)
let json = await res.json()
console.log(json);
if (json.error === "not_found") {
logger.warn("New function, creating metadata")
await fetch(metadataDB + functionHash, {
method: 'put',
body: JSON.stringify({
memory: memory
}),
headers: { 'Content-Type': 'application/json' },
})
// let json = await res.json()
// console.log(json)
} else {
logger.warn('Repeat deployment, updating metadata')
try {
await fetch(metadataDB + functionHash, {
method: 'put',
body: JSON.stringify({
memory: memory,
_rev: json._rev
}),
headers: { 'Content-Type': 'application/json' },
})
// let json = await res.json()
// console.log(json)
} catch (err) {
console.log(err);
}
}
if (runtime === "container") {
try {
await deployContainer(file_path, functionHash)
console.log("called");
return Promise.resolve(functionHash)
} catch(err) {
return Promise.reject(err)
}
} else {
return Promise.resolve(functionHash)
}
} catch (err) {
logger.error(err)
return Promise.reject(err)
}
}
function moveFile(file, file_path, functionHash) {
return new Promise((resolve, reject) =>{
file.mv(file_path + functionHash, function (err) {
if (err)
reject(err)
resolve()
})
})
}
async function deployContainer(path, imageName) {
return new Promise((resolve, reject) => {
let buildStart = Date.now()
fs.writeFile('./repository/Dockerfile' + imageName,
`FROM node:latest
WORKDIR /app
COPY ./worker_env/package.json /app
ADD ./worker_env/node_modules /app/node_modules
COPY ${imageName}.js /app
ENTRYPOINT ["node", "${imageName}.js"]`
, function (err) {
if (err) {
logger.error("failed", err);
reject(err);
}
else {
logger.info('Dockerfile created');
const process = spawn('docker', ["build", "-t", registry_url + imageName, path, "-f", path + `Dockerfile${imageName}`]);
process.stdout.on('data', (data) => {
logger.info(`stdout: ${data}`);
});
process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
});
process.on('close', (code) => {
logger.warn(`child process exited with code ${code}`);
let timeDifference = Math.ceil((Date.now() - buildStart))
logger.info("image build time taken: ", timeDifference);
const process_push = spawn('docker', ["push", registry_url + imageName]);
process_push.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});
process_push.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
});
process_push.on('close', (code) => {
logger.info("image pushed to repository");
resolve();
})
});
}
});
})
}
router.post('/execute/:id', (req, res) => {
let map, aliases
// if (req.body.map)
// map = req.body.map
// else {
if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString());
let mapPlanner = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${req.params.id}.json`, true)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
console.log(payload);
speculative_deployment(aliases, mapPlanner);
orchestrator(res, payload, map, aliases, {})
})
} else {
readMap(`./repository/map${req.params.id}.json`)
.then(data => {
map = data
let mapPlanner = JSON.parse(JSON.stringify(map))
readMap(`./repository/aliases${req.params.id}.json`, true)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
speculative_deployment(aliases, mapPlanner);
orchestrator(res, payload, map, aliases, {})
})
})
}
})
async function orchestrator(res, payload, map, aliases, result) {
if (Object.keys(map).length == 0) {
console.log("time to resolve", result);
res.json(result)
// return resolve(result)
}
else {
for (const [functionName, metadata] of Object.entries(map)) {
// console.log(functionName, metadata, aliases[functionName]);
// console.log(metadata);
if (metadata.type === "function" && metadata.wait_for.length == 0) {
let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName].alias}`
console.log(url);
let data = {
method: 'post',
body: JSON.stringify({
runtime: metadata.runtime,
payload
}),
headers: { 'Content-Type': 'application/json' }
}
delete map[functionName]
aliases[functionName].status = "running"
fetch(url, data).then(res => res.json())
.then(json => {
// console.log(json);
result[functionName] = json
aliases[functionName].status = "done"
let branchMap = null
for (const [_key, metadata] of Object.entries(map)) {
if (metadata.type === "function" || metadata.type === "conditional") {
let index = metadata.wait_for.indexOf(functionName)
if (index >= 0)
metadata.wait_for.splice(index, 1);
}
if (metadata.type === "conditional" && metadata.wait_for.length == 0) {
let conditionResult = checkCondition(metadata.condition.op1, metadata.condition.op2, metadata.condition.op, result)
console.log(conditionResult, "aliases", aliases);
let branchToTake = metadata[conditionResult]
branchMap = map[branchToTake]
delete map[_key]
makeBranchRunnable(branchMap, aliases)
}
}
orchestrator(res, payload, (branchMap == null)? map: branchMap, aliases, result)
})
}
}
}
}
function makeBranchRunnable(branchMap, aliases) {
delete branchMap['type']
for (const [_key, metadata] of Object.entries(branchMap)) {
if (metadata.type === "function" || metadata.type === "conditional") {
let wait_for = []
for (const dependent of metadata.wait_for) {
if (aliases[dependent].status !== "done")
wait_for.push(dependent)
metadata.wait_for = wait_for
}
}
}
}
function checkCondition(op1, op2, op, result) {
op1 = op1.split(".")
let data = result[op1[0]][op1[1]]
return (operator[op](data, op2))? "success": "fail"
}
async function speculative_deployment(aliases, map) {
if (constants.speculative_deployment) {
console.log(aliases);
let getData = []
for (const [mod, metadata] of Object.entries(map)) {
if (constants.JIT_deployment) {
console.log(mod, metadata, aliases[mod].alias);
let url = metricsDB + aliases[mod].alias
console.log(url);
let data = libSupport.fetchData(url)
console.log(data);
getData.push(data)
} else {
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias })
}]
notify(payload)
}
}
if (constants.JIT_deployment) {
Promise.all(getData).then((values) => {
let dataMap = new Map()
for (const data of values) {
dataMap[data._id] = data
}
let done = new Map()
let toBeDone = new Set()
// var plannerMap = new Map(map)
do {
for (const [mod, metadata] of Object.entries(map)) {
if (metadata.wait_for.length == 0 && done[mod] === undefined) {
done[mod] = dataMap[aliases[mod].alias][metadata.runtime].coldstart // expecting the first ones to run
// to be hit by coldstarts
// delete plannerMap[mod];
} else if (done[mod] === undefined) {
let flag = true
let maxWait = 0
for (const dependency of metadata.wait_for) {
console.log(dependency);
if (done[dependency] === undefined) {
flag = false
break
} else if (maxWait < done[dependency])
maxWait = done[dependency]
}
if (flag) {
console.log("notifying", mod);
let notifyTime = ((maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime) > 0) ?
maxWait - dataMap[aliases[mod].alias][metadata.runtime].starttime : 0
console.log(mod, "max wait", maxWait, "notify time:", notifyTime);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": metadata.runtime, "functionHash": aliases[mod].alias })
}]
setTimeout(notify, notifyTime, payload)
done[mod] = maxWait + dataMap[aliases[mod].alias][metadata.runtime].warmstart
if (toBeDone.has(mod))
delete toBeDone[mod]
// delete plannerMap[mod]
} else {
toBeDone.add(mod)
}
}
console.log(done, toBeDone);
}
} while (toBeDone.size != 0)
})
}
}
}
function readMap(filename, alias = false) {
return new Promise((resolve, reject) => {
fs.readFile(filename, (err, blob) => {
if (err)
reject(err)
else {
const data = JSON.parse(blob)
if (alias) {
for (const [key, functionHash] of Object.entries(data)) {
data[key] = {
alias: functionHash,
status: "waiting"
}
// libSupport.fetchData(metricsDB + functionHash, null)
// .then(metrics => {
// data[key]
// })
}
}
resolve(data)
}
})
})
}
function notify(payload) {
libSupport.producer.send(payload, function () { })
}
function createDirectory(path) {
return new Promise((resolve, reject) => {
if (!fs.existsSync(path)) {
fs.mkdir(path, err => {
if (err)
reject();
resolve();
})
} else {
resolve();
}
})
}
module.exports = router;
"use strict";
const express = require('express')
const bodyParser = require('body-parser')
const express = require('express');
const fileUpload = require('express-fileupload');
const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const fs = require('fs')
const constants = require('.././constants.json');
const chainHandler = require('./explicit_chain_handler');
const secrets = require('./secrets.json');
const fs = require('fs');
const { spawn } = require('child_process');
const morgan = require('morgan')
const heap = require('heap')
const morgan = require('morgan');
const heap = require('heap');
const fetch = require('node-fetch');
const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json');
const util = require('util')
/**
* URL to the couchdb database server used to store function metadata
*/
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
metadataDB = metadataDB + "/" + constants.function_db_name + "/"
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
const app = express()
const libSupport = require('./lib')
const logger = libSupport.logger
let date = new Date();
let log_channel = constants.log_channel
let log_channel = constants.topics.log_channel
let usedPort = new Map(), // TODO: remove after integration with RM
db = new Map(), // queue holding request to be dispatched
......@@ -29,11 +36,12 @@ let usedPort = new Map(), // TODO: remove after integration with RM
// resources associated with the function
workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map() // a tree to store function branch predictions
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client),
......@@ -53,20 +61,25 @@ let kafka = require('kafka-node'),
app.use(morgan('combined', {
skip: function (req, res) { return res.statusCode < 400 }
}))
app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json())
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path));
app.use('/repository', express.static(file_path)); // file server hosting deployed functions
app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec }));
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); // statistics middleware
app.use('/serverless/chain', chainHandler); // chain router (explicit_chain_handler.js) for handling explicit chains
let requestQueue = []
const WINDOW_SIZE = 10
const port = constants.master_port
const registry_url = constants.registry_url
app.get('/metrics', (req, res) => {
res.set('Content-Type', libSupport.metrics.register.contentType);
res.end(libSupport.metrics.register.metrics());
});
/**
* REST API to receive deployment requests
*/
......@@ -77,7 +90,7 @@ app.post('/serverless/deploy', (req, res) => {
let functionHash = file.md5
file.mv(file_path + functionHash, function (err) {
file.mv(file_path + functionHash, function (err) { // move function file to repository
functionHash = libSupport.generateExecutor(file_path, functionHash)
/**
* Adding meta caching via couchdb
......@@ -146,7 +159,9 @@ app.post('/serverless/deploy', (req, res) => {
function deployContainer(path, imageName) {
return new Promise((resolve, reject) => {
let buildStart = Date.now()
/**
* Generating dockerfile for the received function
*/
fs.writeFile('./repository/Dockerfile',
`FROM node:latest
WORKDIR /app
......@@ -163,7 +178,7 @@ function deployContainer(path, imageName) {
}
else {
logger.info('Dockerfile created');
const process = spawn('docker', ["build", "-t", registry_url + imageName, path]);
const process = spawn('docker', ["build", "-t", registry_url + imageName, path]); // docker build
process.stdout.on('data', (data) => {
logger.info(`stdout: ${data}`);
......@@ -178,7 +193,7 @@ function deployContainer(path, imageName) {
logger.warn(`child process exited with code ${code}`);
let timeDifference = Math.ceil((Date.now() - buildStart))
logger.info("image build time taken: ", timeDifference);
const process_push = spawn('docker', ["push", registry_url + imageName]);
const process_push = spawn('docker', ["push", registry_url + imageName]); // docker push image to local registry
process_push.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
......@@ -207,10 +222,12 @@ app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime
let id = req.params.id + runtime
res.timestamp = Date.now()
if (functionToResource.has(id)) {
res.start = 'warmstart'
libSupport.reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree)
} else {
res.start = 'coldstart'
/**
* Requests are queued up before being dispatched. To prevent requests coming in for the
* same function from starting too many workers, they are grouped together
......@@ -234,6 +251,10 @@ app.post('/serverless/execute/:id', (req, res) => {
* Send dispatch signal to Worker nodes and deploy resources after consultation with the RM
*/
function dispatch() {
/**
* The lookahead window will be used for optimisation purposes
* Ex. It might be used to co-group similar runtimes on same machines
*/
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
for (let i = 0; i < lookbackWindow; i++) {
let {req, res} = requestQueue.shift()
......@@ -244,67 +265,14 @@ function dispatch() {
if (!db.has(functionHash + runtime)) {
db.set(functionHash + runtime, [])
db.get(functionHash + runtime).push({ req, res })
let resource_id = libSupport.makeid(20) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
}))
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null,
deployed: false
})
let payloadToRM = [{
topic: constants.topics.request_dm_2_rm, // changing from REQUEST_DM_2_RM
messages: JSON.stringify({
resource_id,
"memory": 332,
timestamp: Date.now()
}),
partition: 0
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ runtime, functionHash })
}]
producer.send(payloadToRM, () => {
// db.set(functionHash + runtime, { req, res })
console.log("sent rm");
})
/**
* Speculative deployment:
* If function MLE path is present then deploy those parts of the path which are
* not already running
*/
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node })
}]
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
}
}
}
}
producer.send(payload, function () { })
speculative_deployment(req, runtime)
} else {
logger.info("deployment process already started waiting")
db.get(functionHash + runtime).push({ req, res })
......@@ -314,29 +282,32 @@ function dispatch() {
}
}
/**
* Handles post deployment metadata updates and starts reverse-proxying
* @param {string} message Message received from DD after deployment
*/
function postDeploy(message) {
logger.info("Deployed Resource: " + JSON.stringify(message));
let id = message.functionHash + message.runtime
if (message.status == false) {
let sendQueue = db.get(message.functionHash + message.runtime)
let sendQueue = db.get(id)
// TODO: handle failure
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
res.status(400).json({ reason: message.reason })
}
db.delete(message.functionHash + message.runtime)
db.delete(id)
return;
}
if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceHeap = functionToResource.get(message.functionHash + message.runtime)
if (functionToResource.has(id)) {
let resourceHeap = functionToResource.get(id)
heap.push(resourceHeap, {
resource_id: message.resource_id,
open_request_count: 0
}, libSupport.compare)
logger.warn("Horizontally scaling up: " +
JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
JSON.stringify(functionToResource.get(id)));
} else {
/**
......@@ -349,36 +320,24 @@ function postDeploy(message) {
resource_id: message.resource_id,
open_request_count: 0
}, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap)
functionToResource.set(id, resourceHeap)
logger.warn("Creating new resource pool"
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
+ JSON.stringify(functionToResource.get(id)));
}
try {
let resource = resourceMap.get(message.resource_id)
resource.deployed = true
let confirmRM = [{
topic: log_channel,
messages: JSON.stringify({
resource_id: message.resource_id,
// type: "deployment_launch",
node_id: resource.node_id,
runtime: resource.runtime,
function_id: resource.functionHash,
entity_id: message.entity_id,
"reason": "deployment",
"status": true,
"timestamp": Date.now()
}),
partition: 0
}]
producer.send(confirmRM, () => {
logger.info(`Confirmed RM for successful deployment resource_id: ${message.resource_id} deployment`)
})
if (db.has(message.functionHash + message.runtime)) {
let sendQueue = db.get(message.functionHash + message.runtime)
libSupport.logBroadcast({
entity_id: message.entity_id,
"reason": "deployment",
"status": true,
starttime: (Date.now() - resource.deploy_request_time)
}, message.resource_id, resourceMap)
if (db.has(id)) {
let sendQueue = db.get(id)
logger.info("forwarding request via reverse proxy to: " + JSON.stringify(resource));
while (sendQueue && sendQueue.length != 0) {
let { req, res } = sendQueue.shift()
......@@ -387,8 +346,13 @@ function postDeploy(message) {
})
}
db.delete(message.functionHash + message.runtime)
db.delete(id)
}
libSupport.metrics.collectMetrics({type: "scale", value:
functionToResource.get(id).length,
functionHash: message.functionHash, runtime: message.runtime,
starttime: (Date.now() - resource.deploy_request_time)})
} catch (e) {
logger.error(e.message)
}
......@@ -428,8 +392,9 @@ consumer.on('message', function (message) {
// process.exit(0)
}
usedPort.delete(message.port)
if (functionToResource.has(message.functionHash + message.runtime)) {
let resourceArray = functionToResource.get(message.functionHash + message.runtime)
let id = message.functionHash + message.runtime
if (functionToResource.has(id)) {
let resourceArray = functionToResource.get(id)
for (let i = 0; i < resourceArray.length; i++)
if (resourceArray[i].resource_id === message.resource_id) {
resourceArray.splice(i, 1);
......@@ -437,31 +402,44 @@ consumer.on('message', function (message) {
}
heap.heapify(resourceArray, libSupport.compare)
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
functionToResource.delete(message.functionHash + message.runtime)
libSupport.metrics.collectMetrics({type: "scale", value:
resourceArray.length,
functionHash: message.functionHash, runtime: message.runtime})
libSupport.logBroadcast({
entity_id: message.entity_id,
"reason": "terminate",
"total_request": message.total_request,
"status": true
}, message.resource_id, resourceMap)
.then(() => {
resourceMap.delete(message.resource_id)
if (resourceArray.length == 0)
functionToResource.delete(id)
})
}
} else if (topic == constants.topics.hscale) {
message = JSON.parse(message)
let resource_id = libSupport.makeid(20), // each function resource request is associated with an unique ID
let resource_id = libSupport.makeid(constants.id_size), // each function resource request is associated with an unique ID
runtime = message.runtime,
functionHash = message.functionHash
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
console.log("Resource Status: ", functionToResource);
/**
* Request RM for resource
*/
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
}))
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null,
deployed: false
deployed: false, deploy_request_time: Date.now()
})
......@@ -499,11 +477,13 @@ consumer.on('message', function (message) {
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id: message.resource_id,
runtime: resource.runtime, functionHash: resource.functionHash,
port: resource.port
port: resource.port, resources: {
memory: resource.memory
}
}),
partition: 0
}]
logger.info(resourceMap);
// logger.info(resourceMap);
producer.send(payload, () => {
logger.info(`Resource Deployment request sent to Dispatch Agent`)
})
......@@ -531,6 +511,53 @@ function autoscalar() {
}
/**
* Speculative deployment:
* If function MLE path is present then deploy those parts of the path which are
* not already running
*
* FIXME: Currently supports homogenous runtime chain i.e takes runtime as a param.
* Change it to also profile runtime
*/
async function speculative_deployment(req, runtime) {
if (constants.speculative_deployment && req.headers['x-resource-id'] === undefined) {
console.log(functionBranchTree, req.params.id);
if (functionBranchTree.has(req.params.id)) {
let branchInfo = functionBranchTree.get(req.params.id)
console.log("mle_path", branchInfo.mle_path);
if (branchInfo.mle_path && branchInfo.mle_path.length > 1) {
for (let node of branchInfo.mle_path)
node.id = node.node
let metrics = await libSupport.fetchData(metricsDB + "_bulk_get", {
method: 'post',
body: JSON.stringify({
docs: branchInfo.mle_path
}),
headers: { 'Content-Type': 'application/json' },
})
console.log(util.inspect(metrics, false, null, true /* enable colors */))
for (let node of branchInfo.mle_path) {
// console.log(functionToResource);
if (!functionToResource.has(node.node + runtime) && !db.has(node.node + runtime)) {
console.log("Deploying according to MLE path: ", node.node);
let payload = [{
topic: constants.topics.hscale,
messages: JSON.stringify({ "runtime": "container", "functionHash": node.node })
}]
producer.send(payload, function () { })
db.set(node.node + runtime, [])
}
}
}
}
}
}
setInterval(libSupport.metrics.broadcastMetrics, 5000)
setInterval(libSupport.viterbi, 1000, functionBranchTree)
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
......
......@@ -4,10 +4,23 @@ const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const metrics = require('./metrics')
const { createLogger, format, transports } = winston;
const heap = require('heap')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client)
let implicitChainDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
implicitChainDB = implicitChainDB + "/" + constants.implicit_chain_db_names + "/"
/**
* Generates unique IDs of arbitrary length
* @param {Length of the ID} length
......@@ -45,6 +58,14 @@ function generateExecutor(functionPath, functionHash) {
return hash
}
/**
* Reverse proxy to take user requests and forward them to appropriate workers using a loadbalacer
* @param {JSON} req the user request to be forwarded to the worker
* @param {JSON} res Object to use to return the response to the user
* @param {Map} functionToResource Function to resource Map
* @param {Map} resourceMap Map from resource ID to resource metadata
* @param {Map} functionBranchTree Holds the function path's and related probability distribution
*/
function reverseProxy(req, res, functionToResource, resourceMap, functionBranchTree) {
branchChainPredictor(req, resourceMap, functionToResource, functionBranchTree)
return new Promise((resolve, reject) => {
......@@ -54,6 +75,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
* Bypass deployment pipeline if resource available
*/
let functionHeap = functionToResource.get(id)
// loadbalancing by choosing worker with lowest load
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
// logger.info(`Choosing resource ${JSON.stringify(forwardTo.resource_id)}` +
......@@ -62,7 +84,7 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare)
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// logger.info(functionHeap);
var options = {
......@@ -71,22 +93,23 @@ function reverseProxy(req, res, functionToResource, resourceMap, functionBranchT
body: req.body,
json: true // Automatically stringifies the body to JSON
};
// console.log(options);
rp(options)
.then(function (parsedBody) {
let serviceTime = Date.now() - res.timestamp
res.json(parsedBody)
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
resolve()
})
.catch(function (err) {
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
logger.error("error" + err.error.errno);
logger.error("error" + err);
res.json(err.message).status(err.statusCode)
resolve()
});
......@@ -109,7 +132,6 @@ function getPort(usedPort) {
return port
}
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
......@@ -252,17 +274,61 @@ function viterbi(functionBranchTree) {
path.push({node: maxSibling, probability: maxProb})
siblings = new Map()
}
if (path.length > 0)
console.log("path", path);
// if (path.length > 0)
// console.log("path", path);
metadata.mle_path = path
if (path.length > 1) {
let payload = {
method: 'put',
body: JSON.stringify(path),
headers: { 'Content-Type': 'application/json' }
}
fetch(implicitChainDB + functionHash, payload)
}
}
});
}
function logBroadcast(message, resource_id, resourceMap) {
return new Promise((resolve, reject) => {
try {
message.timestamp = Date.now()
if (resource_id && resourceMap.has(resource_id)) {
let resource = resourceMap.get(resource_id)
message.resource_id = resource_id
message.node_id = resource.node_id
message.runtime = resource.runtime
message.function_id = resource.functionHash
}
let log = [{
topic: constants.topics.log_channel,
messages: JSON.stringify(message),
partition: 0
}]
producer.send(log, () => {
resolve()
})
} catch (err) {
console.log(err);
reject()
}
})
}
async function fetchData(url, data = null) {
let res
if (data === undefined || data === null)
res = await fetch(url)
else
res = await fetch(url, data)
return await res.json()
}
module.exports = {
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
viterbi
viterbi, logBroadcast, fetchData, metrics,
producer
}
\ No newline at end of file
'use strict';
const constants = require('.././constants.json');
const secrets = require('./secrets.json')
const fetch = require('node-fetch');
const util = require('util')
const prom = require('prom-client');
const Registry = prom.Registry;
const register = new Registry();
const alpha = 0.99
let log_channel = constants.topics.log_channel,
metrics = { }
const intervalCollector = prom.collectDefaultMetrics({ prefix: 'xanadu', timeout: 5000, register });
const workerCountMetric = new prom.Gauge({ name: "worker_count", help: "worker count" });
const warmstartMetric = new prom.Histogram({ name: "warmstart", help: "warm start latency" });
const coldstartMetric = new prom.Histogram({ name: "coldstart", help: "cold start latency"});
const starttimeMetric = new prom.Histogram({ name: "starttime", help: "worker start times" });
const requestMetric = new prom.Summary({ name: "requests", help: "request RTT times",
percentiles: [0.01, 0.05, 0.5, 0.9, 0.95, 0.99, 0.999]
});
register.registerMetric(workerCountMetric);
register.registerMetric(warmstartMetric);
register.registerMetric(coldstartMetric);
register.registerMetric(starttimeMetric);
register.registerMetric(requestMetric);
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.metrics_db_name + "/"
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client)
/**
* Function called to report metric data related to functions
* @param {JSON} metric
*/
function collectMetrics(metric) {
/**
* If metrics for a new function comes in,
* provision required structure for the function
*/
if (!(metric.functionHash in metrics)) {
metrics[metric.functionHash] = {}
}
if (!(metric.runtime in metrics[metric.functionHash])) {
metrics[metric.functionHash][metric.runtime] = {
shortterm: {
coldstart: 0,
coldstart_total_request: 0,
warm_total_request: 0,
scale_count: 0,
warmstart: 0,
worker_count: 0,
starttime: 0
}
}
}
if (metric.type === 'coldstart') {
metrics[metric.functionHash][metric.runtime].shortterm.coldstart += metric.value
metrics[metric.functionHash][metric.runtime].shortterm.coldstart_total_request += 1
coldstartMetric.observe(metric.value)
requestMetric.observe(metric.value)
} else if (metric.type === 'warmstart') {
metrics[metric.functionHash][metric.runtime].shortterm.warmstart += metric.value
metrics[metric.functionHash][metric.runtime].shortterm.warm_total_request += 1
warmstartMetric.observe(metric.value)
requestMetric.observe(metric.value)
} else if (metric.type === 'scale') {
metrics[metric.functionHash][metric.runtime].shortterm.worker_count = metric.value
workerCountMetric.set(metric.value)
if (metric.starttime !== undefined) {
metrics[metric.functionHash][metric.runtime].shortterm.starttime += metric.starttime
metrics[metric.functionHash][metric.runtime].shortterm.scale_count += 1
starttimeMetric.observe(metric.starttime)
}
}
}
/**
* Run periodically to calculate average runtime metrics like coldstart and
* warmstart latencies.
* The module provides two granularities for metrics - shortterm and longterm
* shortterm - realtime data at a granularity of 5s (set in dispatch_manager/lib.js)
* shortterm data is calculated using Simple Moving Average (SMA)
* longterm - longterm data is held and averaged out over a period of time.
* longterm data is calculated using Expontential Moving Average (EMA)
*/
async function broadcastMetrics() {
if (Object.keys(metrics).length !== 0) {
for (let [functionHash, data] of Object.entries(metrics)) {
for (let [runtime, metricData] of Object.entries(data)) {
if (metricData.shortterm.coldstart != 0 || metricData.shortterm.longterm != 0) {
let { metric, dbData } = await fetchData(functionHash, metricData, runtime)
/**
* Shortterm moving average
*/
metric.shortterm.coldstart /= (metric.shortterm.coldstart_total_request != 0) ?
metric.shortterm.coldstart_total_request : 1
metric.shortterm.starttime /= (metric.shortterm.scale_count != 0) ?
metric.shortterm.scale_count : 1
metric.shortterm.warmstart /= (metric.shortterm.warm_total_request != 0) ?
metric.shortterm.warm_total_request : 1
/**
* Longterm exponential moving average
*/
if (metric.shortterm.coldstart != 0)
metric.longterm.coldstart = (metric.longterm.coldstart != 0) ? metric.longterm.coldstart * alpha
+ metric.shortterm.coldstart * (1 - alpha) : metric.shortterm.coldstart
if (metric.shortterm.starttime && metric.shortterm.starttime != 0)
metric.longterm.starttime = (metric.longterm.starttime != 0) ? metric.longterm.starttime * alpha
+ metric.shortterm.starttime * (1 - alpha) : metric.shortterm.starttime
if (metric.shortterm.warmstart != 0)
metric.longterm.warmstart = (metric.longterm.warmstart != 0) ? metric.longterm.warmstart * alpha
+ metric.shortterm.warmstart * (1 - alpha) : metric.shortterm.warmstart
dbData[runtime] = {
coldstart: metric.longterm.coldstart,
warmstart: metric.longterm.warmstart,
starttime: metric.longterm.starttime
}
let payload = {
method: 'put',
body: JSON.stringify(dbData),
headers: { 'Content-Type': 'application/json' }
}
await fetch(metricsDB + functionHash, payload)
metric.timestamp = Date.now()
}
}
}
let log = [{
topic: log_channel,
messages: JSON.stringify({
metrics
}),
partition: 0
}]
producer.send(log, () => { })
for (let [functionHash, data] of Object.entries(metrics)) {
for (let [runtime, metric] of Object.entries(data)) {
metric.shortterm = {
coldstart: 0,
coldstart_total_request: 0,
warm_total_request: 0,
warmstart: 0,
worker_count: 0,
starttime: 0,
scale_count: 0
}
}
}
}
}
/**
* Function to fetch the latest data from metric DB
* @param {String} functionHash
* @param {JSON} metric
*/
async function fetchData(functionHash, metric, runtime) {
let res = await fetch(metricsDB + functionHash)
let json = await res.json()
if (json.error === "not_found" || json[runtime] === undefined) {
metric.longterm = {
coldstart: 0,
warmstart: 0,
starttime: 0
}
} else {
metric.longterm = {
coldstart: json[runtime].coldstart,
warmstart: json[runtime].warmstart,
starttime: (json[runtime].starttime) ? json[runtime].starttime: 0
}
}
return {
metric,
dbData: (json.error === "not_found")? {}: json
}
}
module.exports = {
collectMetrics, broadcastMetrics, register
}
\ No newline at end of file
const op = {
'lt': function (x, y) { return x < y },
'gt': function (x, y) { return x > y },
'lte': function (x, y) { return x <= y },
'gte': function (x, y) { return x >= y },
'eq': function (x, y) { return x === y },
'neq': function (x, y) { return x !== y },
};
module.exports = op
\ No newline at end of file
......@@ -20,6 +20,7 @@
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
"node-fetch": "^2.6.0",
"prom-client": "^12.0.0",
"redis": "^2.8.0",
"request": "^2.88.0",
"request-promise": "^4.2.5",
......
......@@ -5,7 +5,8 @@ let request = require('request')
const process = require('process')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 30
let port = 5000, resource_id, functionHash, runtime, idleTime = 60, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
......@@ -18,7 +19,7 @@ request = request.defaults({
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: '10.129.6.5:9092',
kafkaHost: process.argv[6],
autoConnect: true
}),
producer = new Producer(client)
......@@ -28,6 +29,10 @@ app.use(bodyParser.json())
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
......@@ -36,13 +41,15 @@ app.post('/serverless/function/execute/', (req, res) => {
})
})
app.post('/serverless/worker/timeout', (req, res) => {
app.post('/serverless/function/timeout', (req, res) => {
idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
function executor(payload) {
async function executor(payload) {
return new Promise((resolve, reject) => {
})
......@@ -57,6 +64,7 @@ app.listen(port, () => {
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
......@@ -65,14 +73,17 @@ function shouldDie() {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
producer.send(
[
{topic: "removeWorker", messages: message }
], () => {
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
......
const constants = require('./constants.json')
const util = require('util')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client),
Consumer = kafka.Consumer,
consumer = new Consumer(client,
[
{ topic: constants.topics.log_channel }
])
consumer.on('message', function (message) {
message = JSON.parse(message.value)
console.log(util.inspect(message, false, null, true /* enable colors */))
})
\ No newline at end of file
# my global config
global:
scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
# Attach these labels to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager).
external_labels:
monitor: 'codelab-monitor'
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first.rules"
# - "second.rules"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['localhost:9090']
- job_name: 'docker'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['localhost:9323']
- job_name: 'xanadu'
static_configs:
- targets: ['localhost:8080']
const constants = require('./constants.json')
var kafka = require('kafka-node');
let client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
})
var topicsToCreate = [];
for (const [key, value] of Object.entries(constants.topics)) {
topicsToCreate.push({ topic: value, partitions: 1, replicationFactor: 1 })
}
client.createTopics(topicsToCreate, (error, result) => {
console.log("topic created", result);
});
\ No newline at end of file
version: '2'
networks:
kafka-serverless:
driver: bridge
services:
zookeeper:
image: 'bitnami/zookeeper:3'
restart: unless-stopped
networks:
- kafka-serverless
ports:
- '2182:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2'
restart: unless-stopped
networks:
- kafka-serverless
ports:
- '9093:9092'
- '29092:29092'
volumes:
- 'kafka_data:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
<mxfile host="Electron" modified="2020-03-09T11:38:25.308Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/12.6.5 Chrome/80.0.3987.86 Electron/8.0.0 Safari/537.36" etag="MPKhp0QP3HoRdc4h3Pxj" version="12.6.5" type="device"><diagram id="_vj1HQFHM5RY5pbnoe9b" name="Page-1">5Zlbb5swGIZ/DZebwJzSy+bQbFNTVY20db2JPHDBHcHIOAn018+ACcdkzZoFRG8if58P2M/38kISSZ2sozmFgbsgNvIkINuRpE4lAEbGiH8miThL6IqaJRyK7SylFIklfkUiKYvsBtsorAxkhHgMB9WkRXwfWaySg5SSXXXYM/GqVw2ggxqJpQW9ZvYHtpkrjgXMIv8FYcfNr6wYV1nPGuaDxUlCF9pkV0qpM0mdUEJY1lpHE+Ql7HIu2bybA737jVHks7dMWDy8zFbfngC5vlvZ7sPcBNf4kyH2xuL8wJRsfBslc2RJHRPKXOIQH3q3hAQ8qfDkC2IsFqWCG0Z4ymVrT/SiCLPHUvunWCppT6NyEOeBz2j8WA7SOZ/1PCympVE+L2SU/N6XhQMdN5kITCHZUAsdAZFrC1IHsSPjhHqRXZGNID5HZI34DvkAijzI8LaqIijE6OzH7afeE8y3DGRx3wBNqEbcNkquonyJbKNiVlF13ihto0ilWjhBF+ZldKEMRhfg3LqoVPTU8ol1t9DbiCvdKI2K8nW4i/JgvHMxQ8sApiR23MerlYNhkFnrM44SBZwCeIsoQ9FRJKJXryl+JOJdyXZFyi057kh+P8On8I5tHKB/XdxOv9s0Nlexld/pZWC8uEsR+sRPyL3/rjizxmsC/DfRgzeKXruQxlvro3VaHwP8rUJpdI8o5sdFtE9l07u0JtC0JtB3awJG1ZtA197UYuYl7RcynxXZgVqV2jPNt5arKfCPXYxLPTeObbJsQC2P+n4b0AX953ABqwz1vjPUOzTxw5ZUhdjyTtNviF0/CJsMTX6pphb58ViNl4cdn7ctDiN5PRsnELAFvWvRsca2nT04UYhf4a90qcRCg+RLd3oQfSzp02Qtbs9h5tTKnnMDagv6g5wVtfZtSGuC1lpA138mOBtoY6CgNaUGWu4YtPlBQKugY9CjBmh1EKDr1tHm0RcFfTVQ0JpZA921R+d32PBI1946FP2/keZh8ZdM9mt68b+WOvsD</diagram></mxfile>
\ No newline at end of file
......@@ -145,6 +145,7 @@ to run and execute the function on the specified worker node.
- Docker
- Java
- Apache Kafka (Configure to allow auto-delete and auto-registration of topics)
- couchdb (needs a database named serverless)
### Starting the server
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment