Commit 9d5f6ec9 authored by Nilanjan Daw's avatar Nilanjan Daw

Moved kafka Host to docker hosted setup

parent 91583a4b
{ {
"registry_url" :"10.129.6.5:5000/", "registry_url" :"localhost:5000/",
"master_port": 8080, "master_port": 8080,
"master_address": "localhost", "master_address": "localhost",
"kafka_host": "10.129.6.5:9092",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON", "log_channel": "LOG_COMMON",
"couchdb_host": "10.129.6.5:5984", "couchdb_host": "localhost:5984",
"couchdb_db_name": "serverless", "couchdb_db_name": "serverless",
"network": {
"network_bridge": "hybrid_kafka-serverless",
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "localhost:29092"
}
},
"topics": { "topics": {
"request_dm_2_rm": "request", "request_dm_2_rm": "request",
"heartbeat": "heartbeat", "heartbeat": "heartbeat",
...@@ -20,4 +28,4 @@ ...@@ -20,4 +28,4 @@
}, },
"speculative_deployment": true, "speculative_deployment": true,
"id_size": 20 "id_size": 20
} }
\ No newline at end of file
{"id":"10.196.6.51","master_node":"10.129.6.5"} {"id":"192.168.0.105","master_node":"192.168.0.105"}
\ No newline at end of file \ No newline at end of file
...@@ -17,7 +17,7 @@ function runIsolate(local_repository, metadata) { ...@@ -17,7 +17,7 @@ function runIsolate(local_repository, metadata) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const worker = new Worker(filename, { const worker = new Worker(filename, {
argv: [resource_id, functionHash, port, "isolate"], argv: [resource_id, functionHash, port, "isolate", constants.network.external.kafka_host],
resourceLimits: { resourceLimits: {
maxOldGenerationSizeMb: memory maxOldGenerationSizeMb: memory
} }
...@@ -43,7 +43,8 @@ function runProcess(local_repository, metadata) { ...@@ -43,7 +43,8 @@ function runProcess(local_repository, metadata) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timeStart = Date.now() let timeStart = Date.now()
const process = spawn('node', [filename, resource_id, functionHash, port, "process", `--max-old-space-size=${memory}` ]); const process = spawn('node', [filename, resource_id, functionHash, port, "process",
constants.network.external.kafka_host, `--max-old-space-size=${memory}` ]);
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
...@@ -91,8 +92,9 @@ function runContainer(metadata) { ...@@ -91,8 +92,9 @@ function runContainer(metadata) {
if (code != 0) if (code != 0)
reject("error") reject("error")
else { else {
const process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`, "--name", resource_id, registry_url + imageName, const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`,
resource_id, imageName, port, "container"]); "--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = ""; let result = "";
// timeStart = Date.now() // timeStart = Date.now()
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
...@@ -118,8 +120,9 @@ function runContainer(metadata) { ...@@ -118,8 +120,9 @@ function runContainer(metadata) {
} else { } else {
logger.info("container starting at port", port); logger.info("container starting at port", port);
const process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`, "--name", resource_id, const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`,
registry_url + imageName, resource_id, imageName, port, "container"]); "-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = ""; let result = "";
// timeStart = Date.now() // timeStart = Date.now()
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
......
...@@ -21,7 +21,7 @@ const host_url = "http://" + constants.master_address + ":" + constants.master_p ...@@ -21,7 +21,7 @@ const host_url = "http://" + constants.master_address + ":" + constants.master_p
let Producer = kafka.Producer, let Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host, kafkaHost: constants.network.external.kafka_host,
autoConnect: true autoConnect: true
}), }),
producer = new Producer(client), producer = new Producer(client),
......
...@@ -30,7 +30,7 @@ function makeTopic(id) { ...@@ -30,7 +30,7 @@ function makeTopic(id) {
console.log("Using Primary IP", id, "as topic"); console.log("Using Primary IP", id, "as topic");
let client = new kafka.KafkaClient({ let client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host, kafkaHost: constants.network.external.kafka_host,
autoConnect: true autoConnect: true
}), }),
Producer = kafka.Producer, Producer = kafka.Producer,
......
"use strict"; "use strict";
const express = require('express'); const express = require('express');
const bodyParser = require('body-parser');
const fileUpload = require('express-fileupload'); const fileUpload = require('express-fileupload');
const constants = require('.././constants.json'); const constants = require('.././constants.json');
const chainHandler = require('./explicit_chain_handler'); const chainHandler = require('./explicit_chain_handler');
...@@ -14,6 +13,9 @@ const fetch = require('node-fetch'); ...@@ -14,6 +13,9 @@ const fetch = require('node-fetch');
const swStats = require('swagger-stats'); const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json'); const apiSpec = require('./swagger.json');
/**
* URL to the couchdb database server used to store function metadata
*/
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}` let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/" metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
...@@ -35,7 +37,7 @@ let usedPort = new Map(), // TODO: remove after integration with RM ...@@ -35,7 +37,7 @@ let usedPort = new Map(), // TODO: remove after integration with RM
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host, kafkaHost: constants.network.external.kafka_host,
autoConnect: true autoConnect: true
}), }),
producer = new Producer(client), producer = new Producer(client),
...@@ -59,10 +61,10 @@ app.use(express.json()); ...@@ -59,10 +61,10 @@ app.use(express.json());
app.use(express.urlencoded({ extended: true })); app.use(express.urlencoded({ extended: true }));
const file_path = __dirname + "/repository/" const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path)); app.use('/repository', express.static(file_path)); // file server hosting deployed functions
app.use(fileUpload()) app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); // statistics middleware
app.use('/serverless/chain', chainHandler); app.use('/serverless/chain', chainHandler); // chain router (explicit_chain_handler.js) for handling explicit chains
let requestQueue = [] let requestQueue = []
const WINDOW_SIZE = 10 const WINDOW_SIZE = 10
...@@ -79,7 +81,7 @@ app.post('/serverless/deploy', (req, res) => { ...@@ -79,7 +81,7 @@ app.post('/serverless/deploy', (req, res) => {
let functionHash = file.md5 let functionHash = file.md5
file.mv(file_path + functionHash, function (err) { file.mv(file_path + functionHash, function (err) { // move function file to repository
functionHash = libSupport.generateExecutor(file_path, functionHash) functionHash = libSupport.generateExecutor(file_path, functionHash)
/** /**
* Adding meta caching via couchdb * Adding meta caching via couchdb
...@@ -148,7 +150,9 @@ app.post('/serverless/deploy', (req, res) => { ...@@ -148,7 +150,9 @@ app.post('/serverless/deploy', (req, res) => {
function deployContainer(path, imageName) { function deployContainer(path, imageName) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let buildStart = Date.now() let buildStart = Date.now()
/**
* Generating dockerfile for the received function
*/
fs.writeFile('./repository/Dockerfile', fs.writeFile('./repository/Dockerfile',
`FROM node:latest `FROM node:latest
WORKDIR /app WORKDIR /app
...@@ -165,7 +169,7 @@ function deployContainer(path, imageName) { ...@@ -165,7 +169,7 @@ function deployContainer(path, imageName) {
} }
else { else {
logger.info('Dockerfile created'); logger.info('Dockerfile created');
const process = spawn('docker', ["build", "-t", registry_url + imageName, path]); const process = spawn('docker', ["build", "-t", registry_url + imageName, path]); // docker build
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
logger.info(`stdout: ${data}`); logger.info(`stdout: ${data}`);
...@@ -180,7 +184,7 @@ function deployContainer(path, imageName) { ...@@ -180,7 +184,7 @@ function deployContainer(path, imageName) {
logger.warn(`child process exited with code ${code}`); logger.warn(`child process exited with code ${code}`);
let timeDifference = Math.ceil((Date.now() - buildStart)) let timeDifference = Math.ceil((Date.now() - buildStart))
logger.info("image build time taken: ", timeDifference); logger.info("image build time taken: ", timeDifference);
const process_push = spawn('docker', ["push", registry_url + imageName]); const process_push = spawn('docker', ["push", registry_url + imageName]); // docker push image to local registry
process_push.stdout.on('data', (data) => { process_push.stdout.on('data', (data) => {
console.log(`stdout: ${data}`); console.log(`stdout: ${data}`);
...@@ -238,6 +242,10 @@ app.post('/serverless/execute/:id', (req, res) => { ...@@ -238,6 +242,10 @@ app.post('/serverless/execute/:id', (req, res) => {
* Send dispatch signal to Worker nodes and deploy resources after consultation with the RM * Send dispatch signal to Worker nodes and deploy resources after consultation with the RM
*/ */
function dispatch() { function dispatch() {
/**
* The lookahead window will be used for optimisation purposes
* Ex. It might be used to co-group similar runtimes on same machines
*/
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length) let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
for (let i = 0; i < lookbackWindow; i++) { for (let i = 0; i < lookbackWindow; i++) {
let {req, res} = requestQueue.shift() let {req, res} = requestQueue.shift()
...@@ -251,12 +259,13 @@ function dispatch() { ...@@ -251,12 +259,13 @@ function dispatch() {
let resource_id = libSupport.makeid(constants.id_size) // each function resource request is associated with an unique ID let resource_id = libSupport.makeid(constants.id_size) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`); logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
/**
* Request RM for resource
*/
logger.info("Requesting RM " + JSON.stringify({ logger.info("Requesting RM " + JSON.stringify({
resource_id, resource_id,
"memory": 332, "memory": 332,
})) }))
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, { resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null, runtime, functionHash, port: null, node_id: null,
...@@ -510,7 +519,9 @@ consumer.on('message', function (message) { ...@@ -510,7 +519,9 @@ consumer.on('message', function (message) {
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker "type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id: message.resource_id, resource_id: message.resource_id,
runtime: resource.runtime, functionHash: resource.functionHash, runtime: resource.runtime, functionHash: resource.functionHash,
port: resource.port port: resource.port, resources: {
memory: resource.memory
}
}), }),
partition: 0 partition: 0
}] }]
......
...@@ -11,7 +11,7 @@ const heap = require('heap') ...@@ -11,7 +11,7 @@ const heap = require('heap')
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host, kafkaHost: constants.network.external.kafka_host,
autoConnect: true autoConnect: true
}), }),
producer = new Producer(client) producer = new Producer(client)
......
...@@ -6,7 +6,7 @@ let log_channel = constants.log_channel, ...@@ -6,7 +6,7 @@ let log_channel = constants.log_channel,
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host, kafkaHost: constants.network.external.kafka_host,
autoConnect: true autoConnect: true
}), }),
producer = new Producer(client) producer = new Producer(client)
......
...@@ -5,7 +5,7 @@ let request = require('request') ...@@ -5,7 +5,7 @@ let request = require('request')
const process = require('process') const process = require('process')
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 300 let port = 5000, resource_id, functionHash, runtime, idleTime = 30
resource_id = process.argv[2] resource_id = process.argv[2]
functionHash = process.argv[3] functionHash = process.argv[3]
...@@ -18,7 +18,7 @@ request = request.defaults({ ...@@ -18,7 +18,7 @@ request = request.defaults({
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
client = new kafka.KafkaClient({ client = new kafka.KafkaClient({
kafkaHost: '10.129.6.5:9092', kafkaHost: process.argv[6],
autoConnect: true autoConnect: true
}), }),
producer = new Producer(client) producer = new Producer(client)
...@@ -74,7 +74,10 @@ function shouldDie() { ...@@ -74,7 +74,10 @@ function shouldDie() {
producer.send( producer.send(
[ [
{topic: "removeWorker", messages: message } {topic: "removeWorker", messages: message }
], () => { ], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id); console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0) process.exit(0)
}) })
......
const constants = require('./constants.json')
var kafka = require('kafka-node');
let client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
})
var topicsToCreate = [];
for (const [key, value] of Object.entries(constants.topics)) {
topicsToCreate.push({ topic: value, partitions: 1, replicationFactor: 1 })
}
client.createTopics(topicsToCreate, (error, result) => {
console.log("topic created", result);
});
\ No newline at end of file
version: '2'
networks:
kafka-serverless:
driver: bridge
services:
zookeeper:
image: 'bitnami/zookeeper:3'
networks:
- kafka-serverless
ports:
- '2182:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2'
networks:
- kafka-serverless
ports:
- '9093:9092'
- '29092:29092'
volumes:
- 'kafka_data:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
...@@ -126,6 +126,7 @@ to run and execute the function on the specified worker node. ...@@ -126,6 +126,7 @@ to run and execute the function on the specified worker node.
- Docker - Docker
- Java - Java
- Apache Kafka (Configure to allow auto-delete and auto-registration of topics) - Apache Kafka (Configure to allow auto-delete and auto-registration of topics)
- couchdb (needs a database named serverless)
### Starting the server ### Starting the server
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment