Commit 3d8147e1 authored by nilanjandaw's avatar nilanjandaw

added couchdb support for cross-run metadata storage

parent 7d2d4170
...@@ -4,5 +4,7 @@ ...@@ -4,5 +4,7 @@
"master_address": "localhost", "master_address": "localhost",
"kafka_host": "10.129.6.5:9092", "kafka_host": "10.129.6.5:9092",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"log_channel": "LOG_COMMON" "log_channel": "LOG_COMMON",
"couchdb_host": "10.129.6.5:5984",
"couchdb_db_name": "serverless"
} }
\ No newline at end of file
'use strict'; 'use strict';
const constants = require(".././constants.json") const constants = require(".././constants.json")
const secrets = require('./secrets.json')
const config = require('./config.json') const config = require('./config.json')
const libSupport = require('./lib') const libSupport = require('./lib')
libSupport.updateConfig() libSupport.updateConfig()
...@@ -7,6 +8,11 @@ const node_id = config.id ...@@ -7,6 +8,11 @@ const node_id = config.id
const {spawn } = require('child_process') const {spawn } = require('child_process')
const execute = require('./execute') const execute = require('./execute')
const fs = require('fs') const fs = require('fs')
const fetch = require('node-fetch');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
const kafka = require('kafka-node') const kafka = require('kafka-node')
const logger = libSupport.logger const logger = libSupport.logger
...@@ -44,9 +50,17 @@ libSupport.makeTopic(node_id).then(() => { ...@@ -44,9 +50,17 @@ libSupport.makeTopic(node_id).then(() => {
*/ */
if (message.type === "execute") { if (message.type === "execute") {
logger.info("Received Deployment request for resource_id: " + resource_id); logger.info("Received Deployment request for resource_id: " + resource_id);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => { fetch(metadataDB + functionHash).then(res => res.json())
.then(json => {
console.log("metadata", json);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
startWorker(local_repository, functionHash, resource_id, producer, runtime, port) startWorker(local_repository, functionHash, resource_id, producer, runtime, port)
}) })
}).catch(err => {
logger.error("something went wrong" + err.toString())
});
} }
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"express": "^4.17.1", "express": "^4.17.1",
"express-fileupload": "^1.1.6", "express-fileupload": "^1.1.6",
"isolated-vm": "^3.0.0",
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
......
...@@ -4,10 +4,15 @@ const express = require('express') ...@@ -4,10 +4,15 @@ const express = require('express')
const bodyParser = require('body-parser') const bodyParser = require('body-parser')
const fileUpload = require('express-fileupload'); const fileUpload = require('express-fileupload');
const constants = require('.././constants.json') const constants = require('.././constants.json')
const secrets = require('./secrets.json')
const fs = require('fs') const fs = require('fs')
const { spawn } = require('child_process'); const { spawn } = require('child_process');
const morgan = require('morgan') const morgan = require('morgan')
const heap = require('heap') const heap = require('heap')
const fetch = require('node-fetch');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.couchdb_db_name + "/"
const app = express() const app = express()
const libSupport = require('./lib') const libSupport = require('./lib')
...@@ -19,7 +24,8 @@ let usedPort = new Map(), // TODO: remove after integration with RM ...@@ -19,7 +24,8 @@ let usedPort = new Map(), // TODO: remove after integration with RM
rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation rmQueue = new Map(), // queue holding requests for which DM is waiting for RM allocation
db = new Map(), // queue holding request to be dispatched db = new Map(), // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
functionToResource = new Map() functionToResource = new Map() // a function to resource map. Each map contains a minheap of
// resources associated with the function
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -65,9 +71,39 @@ app.post('/serverless/deploy', (req, res) => { ...@@ -65,9 +71,39 @@ app.post('/serverless/deploy', (req, res) => {
let file = req.files.serverless let file = req.files.serverless
let functionHash = file.md5 let functionHash = file.md5
file.mv(file_path + functionHash, function (err) { file.mv(file_path + functionHash, function (err) {
functionHash = libSupport.generateExecutor(file_path, functionHash) functionHash = libSupport.generateExecutor(file_path, functionHash)
/**
* Adding meta caching via couchdb
* This will create / update function related metadata like resource limits etc
* on a database named "serverless".
*/
fetch(metadataDB + functionHash).then(res => res.json())
.then(json => {
if (json.error === "not_found") {
logger.warn("New function, creating metadata")
fetch(metadataDB + functionHash, {
method: 'put',
body: JSON.stringify({
memory: req.body.memory
}),
headers: { 'Content-Type': 'application/json' },
}).then(res => res.json())
.then(json => console.log(json));
} else {
logger.warn('Repeat deployment, updating metadata')
fetch(metadataDB + functionHash, {
method: 'put',
body: JSON.stringify({
memory: req.body.memory,
_rev: json._rev
}),
headers: { 'Content-Type': 'application/json' },
}).then(res => res.json())
.then(json => console.log(json));
}
});
if (err) { if (err) {
logger.error(err) logger.error(err)
res.send("error").status(400) res.send("error").status(400)
...@@ -292,7 +328,8 @@ consumer.on('message', function (message) { ...@@ -292,7 +328,8 @@ consumer.on('message', function (message) {
resource_id: message.resource_id, resource_id: message.resource_id,
metric: 0 metric: 0
}, libSupport.compare) }, libSupport.compare)
console.log("adding to list", functionToResource.get(message.functionHash + message.runtime)); logger.warn("Horizontally scaling up: " +
JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
} else { } else {
/** /**
...@@ -306,7 +343,8 @@ consumer.on('message', function (message) { ...@@ -306,7 +343,8 @@ consumer.on('message', function (message) {
metric: 0 metric: 0
}, libSupport.compare) }, libSupport.compare)
functionToResource.set(message.functionHash + message.runtime, resourceHeap) functionToResource.set(message.functionHash + message.runtime, resourceHeap)
console.log("creating new heap", functionToResource.get(message.functionHash + message.runtime)); logger.warn("Creating new resource pool"
+ JSON.stringify(functionToResource.get(message.functionHash + message.runtime)));
} }
let resource = resourceMap.get(message.resource_id) let resource = resourceMap.get(message.resource_id)
...@@ -335,7 +373,7 @@ consumer.on('message', function (message) { ...@@ -335,7 +373,7 @@ consumer.on('message', function (message) {
} }
} else if (topic == "removeWorker") { } else if (topic == "removeWorker") {
logger.info("Worker blown: Removing Metadata " + message); logger.warn("Worker blown: Removing Metadata " + message);
try { try {
message = JSON.parse(message) message = JSON.parse(message)
} catch(e) { } catch(e) {
...@@ -346,19 +384,15 @@ consumer.on('message', function (message) { ...@@ -346,19 +384,15 @@ consumer.on('message', function (message) {
let resourceArray = functionToResource.get(message.functionHash + message.runtime) let resourceArray = functionToResource.get(message.functionHash + message.runtime)
for (let i = 0; i < resourceArray.length; i++) for (let i = 0; i < resourceArray.length; i++)
if (resourceArray[i].resource_id === message.resource_id) { if (resourceArray[i].resource_id === message.resource_id) {
console.log("splicing");
resourceArray.splice(i, 1); resourceArray.splice(i, 1);
break; break;
} }
heap.heapify(resourceArray, libSupport.compare) heap.heapify(resourceArray, libSupport.compare)
console.log(functionToResource.get(message.functionHash + message.runtime));
resourceMap.delete(message.resource_id) resourceMap.delete(message.resource_id)
if (resourceArray.length == 0) if (resourceArray.length == 0)
functionToResource.delete(message.functionHash + message.runtime) functionToResource.delete(message.functionHash + message.runtime)
console.log(resourceArray);
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^3.0.0", "mqtt": "^3.0.0",
"nano": "^8.1.0", "node-fetch": "^2.6.0",
"redis": "^2.8.0", "redis": "^2.8.0",
"request": "^2.88.0", "request": "^2.88.0",
"request-promise": "^4.2.5", "request-promise": "^4.2.5",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment