Commit 2b63e16f authored by Nilanjan Daw's avatar Nilanjan Daw

Explicit function chain support

Support added for explicit function chains via a map file. 
Static map support added - map provided during deployment added
TODO: add support for dynamic maps during request
parent 088d0e1a
...@@ -23,14 +23,15 @@ router.post('/deploy', (req, res) => { ...@@ -23,14 +23,15 @@ router.post('/deploy', (req, res) => {
let aliases = {} let aliases = {}
let deployHandles = [] let deployHandles = []
createDirectory(file_path).then(() => { createDirectory(file_path).then(() => {
for (const [file_alias, file] of Object.entries(files)) { for (const [file_alias, file] of Object.entries(files)) {
if (file_alias === 'map') {
file.mv(file_path + 'map' + chain_id)
continue
}
let functionHash = file.md5 let functionHash = file.md5
aliases[file_alias] = functionHash if (file_alias === 'map') {
deployHandles.push(deploy(file_path, functionHash, file)) file.mv(file_path + 'map' + chain_id + ".json")
continue
}
// aliases[file_alias] = functionHash
deployHandles.push(deploy(file_path, functionHash, file, aliases, file_alias))
} }
console.log("aliases", aliases); console.log("aliases", aliases);
...@@ -53,11 +54,12 @@ router.post('/deploy', (req, res) => { ...@@ -53,11 +54,12 @@ router.post('/deploy', (req, res) => {
}) })
async function deploy(file_path, functionHash, file) { async function deploy(file_path, functionHash, file, aliases, file_alias) {
let runtime = "container", memory = 330 let runtime = "container", memory = 330
try { try {
await moveFile(file, file_path, functionHash) await moveFile(file, file_path, functionHash)
functionHash = libSupport.generateExecutor(file_path, functionHash) functionHash = libSupport.generateExecutor(file_path, functionHash)
aliases[file_alias] = functionHash
/** /**
* Adding meta caching via couchdb * Adding meta caching via couchdb
* This will create / update function related metadata like resource limits etc * This will create / update function related metadata like resource limits etc
...@@ -101,12 +103,12 @@ async function deploy(file_path, functionHash, file) { ...@@ -101,12 +103,12 @@ async function deploy(file_path, functionHash, file) {
try { try {
await deployContainer(file_path, functionHash) await deployContainer(file_path, functionHash)
console.log("called"); console.log("called");
return Promise.resolve() return Promise.resolve(functionHash)
} catch(err) { } catch(err) {
return Promise.reject(err) return Promise.reject(err)
} }
} else { } else {
return Promise.resolve() return Promise.resolve(functionHash)
} }
} catch (err) { } catch (err) {
logger.error(err) logger.error(err)
...@@ -181,6 +183,87 @@ async function deployContainer(path, imageName) { ...@@ -181,6 +183,87 @@ async function deployContainer(path, imageName) {
}) })
} }
router.post('/execute/:id', (req, res) => {
let map, aliases
// if (req.body.map)
// map = req.body.map
// else {
readMap(`./repository/map${req.params.id}.json`)
.then(data => {
map = data
readMap(`./repository/aliases${req.params.id}.json`)
.then(data => {
aliases = data
let payload = JSON.parse(req.body.data)
console.log(payload);
orchestrator(res, payload, map, aliases, [])
})
})
// }
})
function orchestrator(res, payload, map, aliases, result) {
return new Promise((resolve, reject) => {
console.log("result before run", result);
if (Object.keys(map).length == 0) {
console.log("time to resolve");
res.json(result)
// return resolve(result)
}
else {
for (const [functionName, metadata] of Object.entries(map)) {
// console.log(functionName, metadata, aliases[functionName]);
if (metadata.wait_for.length == 0) {
let url = `http://${constants.master_address}:${constants.master_port}/serverless/execute/${aliases[functionName]}`
console.log(url);
let data = {
method: 'post',
body: JSON.stringify({
runtime: metadata.runtime,
payload
}),
headers: { 'Content-Type': 'application/json' }
}
delete map[functionName]
fetch(url, data).then(res => res.json())
.then(json => {
console.log(json);
result.push(json)
for (const [_key, metadata] of Object.entries(map)) {
let index = metadata.wait_for.indexOf(functionName)
if (index >= 0)
metadata.wait_for.splice(index, 1);
}
console.log(map, "after run");
orchestrator(res, payload, map, aliases, result)
})
}
}
// return resolve(result)
}
// await fetch(constants.master_address)
})
}
function readMap(filename) {
return new Promise((resolve, reject) => {
fs.readFile(filename, (err, data) => {
if (err)
reject(err)
else {
const object = JSON.parse(data)
resolve(object)
}
})
})
}
function createDirectory(path) { function createDirectory(path) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!fs.existsSync(path)) { if (!fs.existsSync(path)) {
...@@ -195,12 +278,4 @@ function createDirectory(path) { ...@@ -195,12 +278,4 @@ function createDirectory(path) {
}) })
} }
router.post('/execute', (req, res) => {
})
async function orchestrator(payload, res) {
}
module.exports = router; module.exports = router;
...@@ -5,7 +5,7 @@ let request = require('request') ...@@ -5,7 +5,7 @@ let request = require('request')
const process = require('process') const process = require('process')
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 30 let port = 5000, resource_id, functionHash, runtime, idleTime = 300
resource_id = process.argv[2] resource_id = process.argv[2]
functionHash = process.argv[3] functionHash = process.argv[3]
...@@ -36,13 +36,15 @@ app.post('/serverless/function/execute/', (req, res) => { ...@@ -36,13 +36,15 @@ app.post('/serverless/function/execute/', (req, res) => {
}) })
}) })
app.post('/serverless/worker/timeout', (req, res) => { app.post('/serverless/function/timeout', (req, res) => {
idleTime = req.body.timeout idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime); console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
}) })
function executor(payload) { async function executor(payload) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment