Commit 190809a4 authored by Nilanjan Daw's avatar Nilanjan Daw

Modified viterbi for better support

parent f725ed87
...@@ -51,13 +51,13 @@ function makeid(length) { ...@@ -51,13 +51,13 @@ function makeid(length) {
* @param {string Function Hash value} functionHash * @param {string Function Hash value} functionHash
*/ */
function generateExecutor(functionPath, functionHash) { function generateExecutor(functionPath, functionHash) {
input = fs.readFileSync('./repository/worker_env/env.js') let input = fs.readFileSync('./repository/worker_env/env.js')
functionFile = fs.readFileSync(functionPath + functionHash) let functionFile = fs.readFileSync(functionPath + functionHash)
searchSize = "(resolve, reject) => {".length let searchSize = "(resolve, reject) => {".length
insertIndex = input.indexOf("(resolve, reject) => {") + searchSize let insertIndex = input.indexOf("(resolve, reject) => {") + searchSize
output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex) let output = input.slice(0, insertIndex) + functionFile + input.slice(insertIndex)
let hash = crypto.createHash('md5').update(output).digest("hex"); let hash = crypto.createHash('md5').update(output).digest("hex");
console.log(hash); console.log(hash);
...@@ -71,7 +71,7 @@ function generateExecutor(functionPath, functionHash) { ...@@ -71,7 +71,7 @@ function generateExecutor(functionPath, functionHash) {
* @param {JSON} req the user request to be forwarded to the worker * @param {JSON} req the user request to be forwarded to the worker
* @param {JSON} res Object to use to return the response to the user * @param {JSON} res Object to use to return the response to the user
*/ */
function reverseProxy(req, res) { async function reverseProxy(req, res) {
if (req.headers['x-chain-type'] !== 'explicit') if (req.headers['x-chain-type'] !== 'explicit')
branchChainPredictor(req) branchChainPredictor(req)
let runtime = req.body.runtime let runtime = req.body.runtime
...@@ -97,9 +97,8 @@ function reverseProxy(req, res) { ...@@ -97,9 +97,8 @@ function reverseProxy(req, res) {
json: true // Automatically stringifies the body to JSON json: true // Automatically stringifies the body to JSON
}; };
try {
rp(options) let parsedBody = await rp(options)
.then(function (parsedBody) {
let serviceTime = Date.now() - res.timestamp let serviceTime = Date.now() - res.timestamp
res.json(parsedBody) res.json(parsedBody)
...@@ -112,6 +111,11 @@ function reverseProxy(req, res) { ...@@ -112,6 +111,11 @@ function reverseProxy(req, res) {
if (functionData.parent) if (functionData.parent)
viterbi(functionHash, functionData) viterbi(functionHash, functionData)
else { else {
let head = await fetch(implicitChainDB + functionHash, {
method: "head"
})
functionData._rev = head.headers.get("etag").substring(1, head.headers.get("etag").length - 1)
functionData.branches = Array.from(functionData.branches.entries()) functionData.branches = Array.from(functionData.branches.entries())
let payload = { let payload = {
method: 'put', method: 'put',
...@@ -129,13 +133,13 @@ function reverseProxy(req, res) { ...@@ -129,13 +133,13 @@ function reverseProxy(req, res) {
} }
} }
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime}) metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime})
}) }
.catch(function (err) { catch(err) {
res.json(err.message).status(err.statusCode) res.json(err.message).status(err.statusCode)
forwardTo.open_request_count -= 1 forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
logger.error("error" + err) logger.error("error" + err)
}); }
} }
function getPort(usedPort) { function getPort(usedPort) {
...@@ -194,7 +198,7 @@ async function branchChainPredictor(req) { ...@@ -194,7 +198,7 @@ async function branchChainPredictor(req) {
} }
if (functionBranchTree.has(req.params.id) && functionBranchTree.get(req.params.id).branches.size > 0) { if (functionBranchTree.has(req.params.id) && functionBranchTree.get(req.params.id).branches.size > 0) {
console.log(timelineQueue.has(req.params.id), timelineQueue.get(req.params.id)); // console.log(timelineQueue.has(req.params.id), timelineQueue.get(req.params.id));
if (!timelineQueue.has(req.params.id)) { if (!timelineQueue.has(req.params.id)) {
timelineQueue.set(req.params.id, []) timelineQueue.set(req.params.id, [])
...@@ -222,7 +226,7 @@ async function branchChainPredictor(req) { ...@@ -222,7 +226,7 @@ async function branchChainPredictor(req) {
} else { } else {
let resource_id = req.headers['x-resource-id'] let resource_id = req.headers['x-resource-id']
let resource = resourceMap.get(resource_id) let resource = resourceMap.get(resource_id)
let forwardBranch = req.params.id, callDelay let forwardBranch = req.params.id, callDelay = 0
if (timelineQueue.has(resource.functionHash)) { if (timelineQueue.has(resource.functionHash)) {
let sourceTimestamp = timelineQueue.get(resource.functionHash).shift() let sourceTimestamp = timelineQueue.get(resource.functionHash).shift()
...@@ -246,6 +250,8 @@ async function branchChainPredictor(req) { ...@@ -246,6 +250,8 @@ async function branchChainPredictor(req) {
if (branchInfo.branches.has(forwardBranch)) { if (branchInfo.branches.has(forwardBranch)) {
callDelay = constants.metrics.alpha * branchInfo.branches.get(forwardBranch)[1] callDelay = constants.metrics.alpha * branchInfo.branches.get(forwardBranch)[1]
+ callDelay * (1 - constants.metrics.alpha) + callDelay * (1 - constants.metrics.alpha)
console.log("call delay", callDelay);
let branchProb = branchInfo.branches.get(forwardBranch)[0] let branchProb = branchInfo.branches.get(forwardBranch)[0]
branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0) branchProb = (branchProb * (branchInfo.req_count - 1) + 1.0)
branchInfo.branches.set(forwardBranch, [branchProb, callDelay]) branchInfo.branches.set(forwardBranch, [branchProb, callDelay])
...@@ -326,6 +332,12 @@ async function viterbi(node, metadata) { ...@@ -326,6 +332,12 @@ async function viterbi(node, metadata) {
if (path.length > 1) { if (path.length > 1) {
metadata.mle_path = path metadata.mle_path = path
metadata.branches = Array.from(metadata.branches.entries()) metadata.branches = Array.from(metadata.branches.entries())
let head = await fetch(implicitChainDB + node, {
method: "head"
})
metadata._rev = head.headers.get("etag").substring(1, head.headers.get("etag").length - 1)
let payload = { let payload = {
method: 'put', method: 'put',
body: JSON.stringify(functionBranchTree.get(node)), body: JSON.stringify(functionBranchTree.get(node)),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment