Commit caa2d435 authored by Nilanjan Daw's avatar Nilanjan Daw

Merge branch 'master' into explicit_function_chaining

parents 9eabd6c1 9d5f6ec9
[submodule "resource_manager/src/common/cJSON"]
[submodule "resource_system/src/common/cJSON"]
path = resource_system/src/common/cJSON
url = https://github.com/DaveGamble/cJSON
[submodule "resource_manager/src/common/nlib"]
[submodule "resource_system/src/common/nlib"]
path = resource_system/src/common/nlib
url = https://github.com/namandixit/nlib
[submodule "resource_system/src/common/inih"]
......
{
"registry_url" :"10.129.6.5:5000/",
"registry_url" :"localhost:5000/",
"master_port": 8080,
"master_address": "localhost",
"kafka_host": "10.129.6.5:9092",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON",
"couchdb_host": "10.129.6.5:5984",
"couchdb_host": "localhost:5984",
"couchdb_db_name": "serverless",
"network": {
"network_bridge": "hybrid_kafka-serverless",
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "localhost:29092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
......
{"id":"10.196.6.51","master_node":"10.129.6.5"}
\ No newline at end of file
{"id":"192.168.0.105","master_node":"192.168.0.105"}
\ No newline at end of file
......@@ -17,7 +17,7 @@ function runIsolate(local_repository, metadata) {
return new Promise((resolve, reject) => {
const worker = new Worker(filename, {
argv: [resource_id, functionHash, port, "isolate"],
argv: [resource_id, functionHash, port, "isolate", constants.network.external.kafka_host],
resourceLimits: {
maxOldGenerationSizeMb: memory
}
......@@ -43,7 +43,8 @@ function runProcess(local_repository, metadata) {
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('node', [filename, resource_id, functionHash, port, "process", `--max-old-space-size=${memory}` ]);
const process = spawn('node', [filename, resource_id, functionHash, port, "process",
constants.network.external.kafka_host, `--max-old-space-size=${memory}` ]);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
......@@ -91,8 +92,9 @@ function runContainer(metadata) {
if (code != 0)
reject("error")
else {
const process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`, "--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container"]);
const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`,
"--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
......@@ -118,8 +120,9 @@ function runContainer(metadata) {
} else {
logger.info("container starting at port", port);
const process = spawn('docker', ["run", "--rm", "-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container"]);
const process = spawn('docker', ["run", "--rm", `--network=${constants.network.network_bridge}`,
"-p", `${port}:${port}`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
......
......@@ -21,7 +21,7 @@ const host_url = "http://" + constants.master_address + ":" + constants.master_p
let Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client),
......
......@@ -30,7 +30,7 @@ function makeTopic(id) {
console.log("Using Primary IP", id, "as topic");
let client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
Producer = kafka.Producer,
......
"use strict";
const express = require('express');
const bodyParser = require('body-parser');
const fileUpload = require('express-fileupload');
const constants = require('.././constants.json');
const chainHandler = require('./explicit_chain_handler');
......@@ -14,6 +13,9 @@ const fetch = require('node-fetch');
const swStats = require('swagger-stats');
const apiSpec = require('./swagger.json');
/**
* URL to the couchdb database server used to store function metadata
*/
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
......@@ -35,7 +37,7 @@ let usedPort = new Map(), // TODO: remove after integration with RM
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client),
......@@ -59,10 +61,10 @@ app.use(express.json());
app.use(express.urlencoded({ extended: true }));
const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path));
app.use('/repository', express.static(file_path)); // file server hosting deployed functions
app.use(fileUpload())
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec }));
app.use('/serverless/chain', chainHandler);
app.use(swStats.getMiddleware({ swaggerSpec: apiSpec })); // statistics middleware
app.use('/serverless/chain', chainHandler); // chain router (explicit_chain_handler.js) for handling explicit chains
let requestQueue = []
const WINDOW_SIZE = 10
......@@ -79,7 +81,7 @@ app.post('/serverless/deploy', (req, res) => {
let functionHash = file.md5
file.mv(file_path + functionHash, function (err) {
file.mv(file_path + functionHash, function (err) { // move function file to repository
functionHash = libSupport.generateExecutor(file_path, functionHash)
/**
* Adding meta caching via couchdb
......@@ -148,7 +150,9 @@ app.post('/serverless/deploy', (req, res) => {
function deployContainer(path, imageName) {
return new Promise((resolve, reject) => {
let buildStart = Date.now()
/**
* Generating dockerfile for the received function
*/
fs.writeFile('./repository/Dockerfile',
`FROM node:latest
WORKDIR /app
......@@ -165,7 +169,7 @@ function deployContainer(path, imageName) {
}
else {
logger.info('Dockerfile created');
const process = spawn('docker', ["build", "-t", registry_url + imageName, path]);
const process = spawn('docker', ["build", "-t", registry_url + imageName, path]); // docker build
process.stdout.on('data', (data) => {
logger.info(`stdout: ${data}`);
......@@ -180,7 +184,7 @@ function deployContainer(path, imageName) {
logger.warn(`child process exited with code ${code}`);
let timeDifference = Math.ceil((Date.now() - buildStart))
logger.info("image build time taken: ", timeDifference);
const process_push = spawn('docker', ["push", registry_url + imageName]);
const process_push = spawn('docker', ["push", registry_url + imageName]); // docker push image to local registry
process_push.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
......@@ -238,6 +242,10 @@ app.post('/serverless/execute/:id', (req, res) => {
* Send dispatch signal to Worker nodes and deploy resources after consultation with the RM
*/
function dispatch() {
/**
* The lookahead window will be used for optimisation purposes
* Ex. It might be used to co-group similar runtimes on same machines
*/
let lookbackWindow = Math.min(WINDOW_SIZE, requestQueue.length)
for (let i = 0; i < lookbackWindow; i++) {
let {req, res} = requestQueue.shift()
......@@ -251,13 +259,14 @@ function dispatch() {
let resource_id = libSupport.makeid(constants.id_size) // each function resource request is associated with an unique ID
logger.info(`Generated new resource ID: ${resource_id} for runtime: ${runtime}`);
/**
* Request RM for resource
*/
logger.info("Requesting RM " + JSON.stringify({
resource_id,
"memory": 332,
}))
/** uncomment when RM is unavailable */
resourceMap.set(resource_id, {
runtime, functionHash, port: null, node_id: null,
deployed: false, deploy_request_time: Date.now()
......@@ -510,7 +519,9 @@ consumer.on('message', function (message) {
"type": "execute", // Request sent to Dispatch Daemon via Kafka for actual deployment at the Worker
resource_id: message.resource_id,
runtime: resource.runtime, functionHash: resource.functionHash,
port: resource.port
port: resource.port, resources: {
memory: resource.memory
}
}),
partition: 0
}]
......
......@@ -11,7 +11,7 @@ const heap = require('heap')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client)
......
......@@ -6,7 +6,7 @@ let log_channel = constants.log_channel,
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.kafka_host,
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client)
......
......@@ -5,7 +5,7 @@ let request = require('request')
const process = require('process')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 300
let port = 5000, resource_id, functionHash, runtime, idleTime = 30
resource_id = process.argv[2]
functionHash = process.argv[3]
......@@ -18,7 +18,7 @@ request = request.defaults({
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: '10.129.6.5:9092',
kafkaHost: process.argv[6],
autoConnect: true
}),
producer = new Producer(client)
......@@ -74,7 +74,10 @@ function shouldDie() {
producer.send(
[
{topic: "removeWorker", messages: message }
], () => {
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
......
const constants = require('./constants.json')
var kafka = require('kafka-node');
let client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
})
var topicsToCreate = [];
for (const [key, value] of Object.entries(constants.topics)) {
topicsToCreate.push({ topic: value, partitions: 1, replicationFactor: 1 })
}
client.createTopics(topicsToCreate, (error, result) => {
console.log("topic created", result);
});
\ No newline at end of file
version: '2'
networks:
kafka-serverless:
driver: bridge
services:
zookeeper:
image: 'bitnami/zookeeper:3'
networks:
- kafka-serverless
ports:
- '2182:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2'
networks:
- kafka-serverless
ports:
- '9093:9092'
- '29092:29092'
volumes:
- 'kafka_data:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
......@@ -126,6 +126,7 @@ to run and execute the function on the specified worker node.
- Docker
- Java
- Apache Kafka (Configure to allow auto-delete and auto-registration of topics)
- couchdb (needs a database named serverless)
### Starting the server
......
cJSON @ f790e17b
Subproject commit f790e17b6cecef030c4eda811149d238c2085fcf
nlib @ 75bc1a11
Subproject commit 75bc1a11e2a10cf249f566b40c85d6526c16f123
......@@ -338,9 +338,9 @@ int main(int argc, char** argv)
JSON_Print_Command command = {0};
while (instrumentCommandDequeue(&command)) {
// TODO(naman): Enable this after proper testing
/* if (!kafkaWrite(kafka.writer, command.topic, "resource_daemon", command.msg)) { */
/* return -1; */
/* } */
if (!kafkaWrite(kafka.writer, command.topic, "resource_daemon", command.msg)) {
return -1;
}
}
}
......
......@@ -12,10 +12,9 @@ void* dockerProcessLoop (void *arg)
while (true) {
// TODO(naman): Get data
Char *data_cmd = NULL;
sbufPrint(data_cmd, "docker stats %s", (Char*)arg);
sbufPrint(data_cmd, "docker stats --no-stream --format \"{{ json . }}\" %s", (Char*)arg);
FILE* data_file = popen(data_cmd, "r");
fseek(data_file, 0, SEEK_END);
long size = ftell(data_file);
fseek(data_file, 0, SEEK_SET);
......@@ -23,17 +22,49 @@ void* dockerProcessLoop (void *arg)
fread(data, 1, (Size)size + 1, data_file);
fclose(data_file);
const Char *json_error = NULL;
cJSON *data_json = cJSON_ParseWithOpts(data, &json_error, true);
Char *json = NULL;
sbufPrint(json, "{\"cpu_percentage\": %f",
atof(cJSON_GetObjectItem(data_json, "CPUPerc")->valuestring));
sbufPrint(json, ",\n\"memory_percentage\": %f",
atof(cJSON_GetObjectItem(data_json, "MemPerc")->valuestring));
sbufPrint(json, ",\n\"memory_used\": %f",
atof(cJSON_GetObjectItem(data_json, "MemUsage")->valuestring));
sbufPrint(json, ",\n\"memory_limit\": %f",
atof(strchr(cJSON_GetObjectItem(data_json, "MemUsage")->valuestring, '/') + 1));
sbufPrint(json, ",\n\"block_input\": %f",
atof(cJSON_GetObjectItem(data_json, "BlockIO")->valuestring));
sbufPrint(json, ",\n\"block_output\": %f",
atof(strchr(cJSON_GetObjectItem(data_json, "BlockIO")->valuestring, '/') + 1));
sbufPrint(json, ",\n\"net_down\": %f",
atof(cJSON_GetObjectItem(data_json, "NetIO")->valuestring));
sbufPrint(json, ",\n\"net_up\": %f",
atof(strchr(cJSON_GetObjectItem(data_json, "NetIO")->valuestring, '/') + 1));
sbufPrint(json, "\n}\n");
cJSON *json_parse = cJSON_Parse(json);
Char *json_pretty = cJSON_Print(json_parse);
Char *output = NULL;
sbufPrint(output, "{\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"type\": \"%s\"", "docker");
sbufPrint(output, ",\n\"data\": %s", json ? json : "{}");
sbufPrint(output, ",\n\"entity_id\": \"%s\"", (Char*)arg);
sbufPrint(output, ",\n\"type\": \"%s\"", "instrumentation");
sbufPrint(output, ",\n\"backend\": \"%s\"", "docker");
sbufPrint(output, ",\n\"data\": %s", json_pretty);
sbufPrint(output, "\n}\n");
JSON_Print_Command jpc = {.msg = output,
.topic = "LOG_CHANNEL"};
cJSON_free(json_pretty);
cJSON_Delete(json_parse);
sbufDelete(output);
sbufDelete(json);
free(data);
sbufDelete(data_cmd);
U64 time_before = timeMilli();
instrumentCommandEnqueue(jpc);
U64 time_after = timeMilli();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment