Commit f738058f authored by NILANJAN DAW's avatar NILANJAN DAW

Merge branch 'explicit_function_chaining'

parents d841d20b d72633d3
......@@ -5,4 +5,5 @@ firecracker*
secrets.json
grunt
.clinic
rm_dummy.js
\ No newline at end of file
rm_dummy.js
metrics_gatherer.js
\ No newline at end of file
......@@ -4,6 +4,7 @@
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"env": "env.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
......@@ -27,7 +28,8 @@
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON"
"log_channel": "LOG_COMMON",
"test": "test"
},
"autoscalar_metrics": {
"open_request_threshold": 100
......@@ -37,6 +39,6 @@
},
"speculative_deployment": true,
"JIT_deployment": true,
"aggressivity": 0.8,
"aggressivity": 1,
"id_size": 20
}
\ No newline at end of file
......@@ -9,6 +9,7 @@ const {spawn } = require('child_process')
const execute = require('./execute')
const fs = require('fs')
const fetch = require('node-fetch');
const os = require('os');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.db.function_meta + "/"
......@@ -37,7 +38,7 @@ libSupport.makeTopic(node_id).then(() => {
{ autoCommit: true }
])
consumer.on('message', function (message) {
logger.info(message);
// logger.info(message);
let topic = message.topic
message = message.value
message = JSON.parse(message)
......@@ -83,12 +84,12 @@ libSupport.download(constants.grunt_host, "grunt", false).then(() => {
logger.info("grunt made executable. Starting grunt")
let grunt = spawn('./grunt', [node_id])
grunt.stdout.on('data', data => {
logger.info(data.toString());
// logger.info(data.toString());
})
grunt.stderr.on('data', data => {
logger.info(data.toString());
// logger.info(data.toString());
})
grunt.on('close', (code) => {
......@@ -154,9 +155,19 @@ function startWorker(local_repository, producer, metadata) {
}
function heartbeat() {
let info = {
free_mem: os.freemem(),
cpu_count: os.cpus().length,
total_mem: os.totalmem(),
avg_load: os.loadavg()
}
let payload = [{
topic: "heartbeat",
messages: JSON.stringify({"address": node_id, "timestamp": Date.now()})
messages: JSON.stringify({
"address": node_id,
"system_info": info,
"timestamp": Date.now()
})
}]
producer.send(payload, function(cb) {})
}
......
......@@ -206,6 +206,8 @@ router.post('/execute/:id', (req, res) => {
}
if (chainData.error !== "not_found")
conditionProbabilityExplicit[chain_id] = chainData
else
conditionProbabilityExplicit[chain_id] = {}
if (req.files && req.files.map) {
map = JSON.parse(req.files.map.data.toString());
readMap(`./repository/aliases${chain_id}.json`, true)
......
......@@ -51,7 +51,7 @@ function makeid(length) {
* @param {string Function Hash value} functionHash
*/
function generateExecutor(functionPath, functionHash) {
let input = fs.readFileSync('./repository/worker_env/env.js')
let input = fs.readFileSync(`./repository/worker_env/${constants.env}`)
let functionFile = fs.readFileSync(functionPath + functionHash)
let searchSize = "(resolve, reject) => {".length
......
'use strict';
const express = require('express')
const bodyParser = require('body-parser')
let request = require('request')
const process = require('process')
......@@ -24,8 +23,8 @@ let kafka = require('kafka-node'),
}),
producer = new Producer(client)
app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json())
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
......@@ -42,7 +41,9 @@ app.post('/serverless/function/execute/', (req, res) => {
})
app.post('/serverless/function/timeout', (req, res) => {
idleTime = req.body.timeout
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
......
'use strict';
const express = require('express')
const fileUpload = require('express-fileupload');
let request = require('request')
const process = require('process')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 60, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
}),
producer = new Producer(client)
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
app.use(fileUpload())
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
producer.send(
[{
topic: "test",
messages: JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
}),
"status": true
}], () => { })
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
idleTime = 0
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = parseInt(req.body.timeout)
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
setInterval(shouldDie, 1000);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment