Commit 4eb39c4a authored by Naman Dixit's avatar Naman Dixit

Basic Newt integration finished

parent f06ee5c7
......@@ -4,7 +4,7 @@
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"env": "env.js",
"env": "env.sm",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
......@@ -37,8 +37,8 @@
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"speculative_deployment": false,
"JIT_deployment": false,
"aggressivity": 1,
"id_size": 20
}
\ No newline at end of file
}
{"id":"192.168.0.105","master_node":"192.168.0.105"}
\ No newline at end of file
{"id":"10.0.2.15","master_node":"192.168.0.105"}
\ No newline at end of file
......@@ -7,6 +7,8 @@ const libSupport = require('./lib')
const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = constants.registry_url
const logger = libSupport.logger
const fetch = require('node-fetch');
const FormData = require('form-data')
function runIsolate(local_repository, metadata) {
let port = metadata.port,
......@@ -161,6 +163,43 @@ function runContainer(metadata) {
}
function runNewt(local_repository, metadata) {
let resource_id = metadata.resource_id,
functionHash = metadata.functionHash,
runtime = metadata.runtime,
port = metadata.port
let filename = local_repository + functionHash + ".sm"
let filedata = fs.readFileSync(filename, 'utf8')
var formdata = new FormData();
formdata.append("runtime", runtime);
formdata.append("port", port);
formdata.append("functionHash", functionHash);
formdata.append("resource_id", resource_id);
formdata.append("filedata", filedata);
var requestOptions = {
method: 'POST',
body: formdata,
redirect: 'follow'
};
fetchData("http://localhost:2610/serverless/function/deploy", requestOptions)
}
async function fetchData(url, data = null) {
let res
if (data === undefined || data === null)
res = await fetch(url)
else
res = await fetch(url, data)
return await res.json()
}
module.exports.runContainer = runContainer;
module.exports.runProcess = runProcess;
module.exports.runIsolate = runIsolate;
\ No newline at end of file
module.exports.runIsolate = runIsolate;
module.exports.runNewt = runNewt;
......@@ -55,7 +55,8 @@ libSupport.makeTopic(node_id).then(() => {
.then(json => {
console.log("metadata", json);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
libSupport.download(host_url + "/repository/" + functionHash + ".sm",
local_repository + functionHash + ".sm").then(() => {
let metadata = {
resource_id, functionHash,
runtime, port,
......@@ -142,7 +143,9 @@ function startWorker(local_repository, producer, metadata) {
})
else if (runtime === "container")
execute.runContainer(metadata)
else {
else if (runtime === "newt") {
execute.runNewt(local_repository, metadata)
} else {
producer.send(
[{
topic: "response",
......
......@@ -254,7 +254,7 @@ function dispatch() {
for (let i = 0; i < lookbackWindow; i++) {
let {req, res} = requestQueue.shift()
// logger.info(req.body)
// logger.info(req.body)
let runtime = req.body.runtime
let functionHash = req.params.id
if (!db.has(functionHash + runtime)) {
......@@ -623,4 +623,4 @@ async function speculative_deployment(req, runtime) {
setInterval(libSupport.metrics.broadcastMetrics, 5000)
setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
......@@ -53,16 +53,16 @@ function makeid(length) {
function generateExecutor(functionPath, functionHash) {
let input = fs.readFileSync(`./repository/worker_env/${constants.env}`)
let functionFile = fs.readFileSync(functionPath + functionHash)
let searchSize = "(resolve, reject) => {".length
let searchSize = "# START\n".length
let insertIndex = input.indexOf("(resolve, reject) => {") + searchSize
let insertIndex = input.indexOf("# START\n") + searchSize
let output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
let hash = crypto.createHash('md5').update(output).digest("hex");
console.log(hash);
fs.writeFileSync(functionPath + hash + ".js", output)
fs.writeFileSync(functionPath + hash + ".sm", output)
return hash
}
......@@ -143,18 +143,7 @@ async function reverseProxy(req, res) {
}
function getPort(usedPort) {
let port = -1, ctr = 0
do {
let min = Math.ceil(30000);
let max = Math.floor(60000);
port = Math.floor(Math.random() * (max - min + 1)) + min;
ctr += 1;
if (ctr > 30000) {
port = -1
break
}
} while (usedPort.has(port))
return port
return 2610
}
const logger = winston.createLogger({
......@@ -396,4 +385,4 @@ async function fetchData(url, data = null) {
getPort, logger, compare,
logBroadcast, fetchData, metrics,
producer
}
\ No newline at end of file
}
......@@ -18,7 +18,7 @@ let kafka = require('kafka-node'),
])
function getAddress() {
return Object.keys(workerNodes)[0];
}
consumer.on('message', function (message) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment