Commit 128b235b authored by NILANJAN DAW's avatar NILANJAN DAW

Added support for UDP based request dispatch

parent a38a16ae
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
"master_address": "localhost", "master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984", "couchdb_host": "10.129.6.5:5984",
"env": "env.js", "env": "env_udp.js",
"db": { "db": {
"function_meta": "serverless", "function_meta": "serverless",
"metrics": "metrics", "metrics": "metrics",
......
{"id":"192.168.0.105","master_node":"192.168.0.105"} {"id":"192.168.0.106","master_node":"192.168.0.105"}
\ No newline at end of file \ No newline at end of file
...@@ -23,7 +23,10 @@ function runIsolate(local_repository, metadata) { ...@@ -23,7 +23,10 @@ function runIsolate(local_repository, metadata) {
} }
}); });
worker.on('message', resolve); worker.on('message', resolve);
worker.on('error', reject); worker.on('error', (err) => {
logger.error("Isolate failed with error", err)
reject(err)
});
worker.on('exit', (code) => { worker.on('exit', (code) => {
if (code !== 0) if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`)); reject(new Error(`Worker stopped with exit code ${code}`));
......
...@@ -120,6 +120,7 @@ function startWorker(local_repository, producer, metadata) { ...@@ -120,6 +120,7 @@ function startWorker(local_repository, producer, metadata) {
execute.runIsolate(local_repository, metadata) execute.runIsolate(local_repository, metadata)
.catch(err => { .catch(err => {
logger.error("=====================deployment failed========================="); logger.error("=====================deployment failed=========================");
logger.error(err)
producer.send([{ producer.send([{
topic: "deployed", topic: "deployed",
messages: JSON.stringify({ messages: JSON.stringify({
......
...@@ -11,6 +11,8 @@ const sharedMeta = require('./shared_meta') ...@@ -11,6 +11,8 @@ const sharedMeta = require('./shared_meta')
const util = require('util') const util = require('util')
const { createLogger, format, transports } = winston; const { createLogger, format, transports } = winston;
const heap = require('heap') const heap = require('heap')
const dgram = require('dgram');
const udpProxy = dgram.createSocket('udp4');
let db = sharedMeta.db, // queue holding request to be dispatched let db = sharedMeta.db, // queue holding request to be dispatched
...@@ -18,7 +20,8 @@ let db = sharedMeta.db, // queue holding request to be dispatched ...@@ -18,7 +20,8 @@ let db = sharedMeta.db, // queue holding request to be dispatched
functionToResource = sharedMeta.functionToResource, // a function to resource map. Each map contains a minheap of functionToResource = sharedMeta.functionToResource, // a function to resource map. Each map contains a minheap of
// resources associated with the function // resources associated with the function
functionBranchTree = sharedMeta.functionBranchTree, // Holds the function path's and related probability distribution functionBranchTree = sharedMeta.functionBranchTree, // Holds the function path's and related probability distribution
timelineQueue = new Map() // a temporary map holding request timestamps to be used for calulcating implicit chain invocation delays timelineQueue = new Map(), // a temporary map holding request timestamps to be used for calulcating implicit chain invocation delays
requestFlightQueue = sharedMeta.requestFlightQueue
let kafka = require('kafka-node'), let kafka = require('kafka-node'),
Producer = kafka.Producer, Producer = kafka.Producer,
...@@ -96,49 +99,61 @@ async function reverseProxy(req, res) { ...@@ -96,49 +99,61 @@ async function reverseProxy(req, res) {
body: req.body, body: req.body,
json: true // Automatically stringifies the body to JSON json: true // Automatically stringifies the body to JSON
}; };
if (req.body.type === "tcp") {
try { try {
let parsedBody = await rp(options) let parsedBody = await rp(options)
let serviceTime = Date.now() - res.timestamp let serviceTime = Date.now() - res.timestamp
res.json(parsedBody) res.json(parsedBody)
forwardTo.open_request_count -= 1 forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare) heap.heapify(functionHeap, compare)
let functionHash = req.params.id let functionHash = req.params.id
let functionData = functionBranchTree.get(functionHash) let functionData = functionBranchTree.get(functionHash)
if (functionData && functionData.req_count % 5 == 0) { if (functionData && functionData.req_count % 5 == 0) {
if (functionData.parent) if (functionData.parent)
viterbi(functionHash, functionData) viterbi(functionHash, functionData)
else { else {
let head = await fetch(implicitChainDB + functionHash, { let head = await fetch(implicitChainDB + functionHash, {
method: "head" method: "head"
})
functionData._rev = head.headers.get("etag").substring(1, head.headers.get("etag").length - 1)
functionData.branches = Array.from(functionData.branches.entries())
let payload = {
method: 'put',
body: JSON.stringify(functionBranchTree.get(functionHash)),
headers: { 'Content-Type': 'application/json' }
}
fetchData(implicitChainDB + functionHash, payload)
.then((updateStatus) => {
console.log(updateStatus);
if (updateStatus.error === undefined)
functionData._rev = updateStatus.rev
}) })
functionData.branches = new Map(functionData.branches)
functionData._rev = head.headers.get("etag").substring(1, head.headers.get("etag").length - 1)
functionData.branches = Array.from(functionData.branches.entries())
let payload = {
method: 'put',
body: JSON.stringify(functionBranchTree.get(functionHash)),
headers: { 'Content-Type': 'application/json' }
}
fetchData(implicitChainDB + functionHash, payload)
.then((updateStatus) => {
console.log(updateStatus);
if (updateStatus.error === undefined)
functionData._rev = updateStatus.rev
})
functionData.branches = new Map(functionData.branches)
}
} }
metrics.collectMetrics({ type: res.start, value: serviceTime, functionHash: req.params.id, runtime })
} }
metrics.collectMetrics({type: res.start, value: serviceTime, functionHash: req.params.id, runtime}) catch (err) {
} res.json(err.message).status(err.statusCode)
catch(err) { forwardTo.open_request_count -= 1
res.json(err.message).status(err.statusCode) heap.heapify(functionHeap, compare)
forwardTo.open_request_count -= 1 logger.error("error" + err)
heap.heapify(functionHeap, compare) }
logger.error("error" + err) } else if (req.body.type === "udp") {
let request_id = makeid(4)
req.body.request_id = request_id
// res.request_id = request_id
requestFlightQueue.set(request_id, res)
let payload = req.body.payload
payload.request_id = request_id
payload = JSON.stringify(payload)
udpProxy.send(payload, 0, payload.length, resource.port, resource.node_id, function (err, bytes) {
logger.info("forwarded request via UDP")
})
} }
} }
...@@ -382,6 +397,22 @@ function logBroadcast(message, resource_id) { ...@@ -382,6 +397,22 @@ function logBroadcast(message, resource_id) {
} }
udpProxy.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
udpProxy.close();
});
udpProxy.on('message', (msg, rinfo) => {
let result = JSON.parse(msg)
let res = requestFlightQueue.get(result.request_id)
res.json(result)
});
udpProxy.on('listening', () => {
const address = udpProxy.address();
console.log(`server listening ${address.address}:${address.port}`);
});
async function fetchData(url, data = null) { async function fetchData(url, data = null) {
let res let res
if (data === undefined || data === null) if (data === undefined || data === null)
...@@ -391,6 +422,8 @@ async function fetchData(url, data = null) { ...@@ -391,6 +422,8 @@ async function fetchData(url, data = null) {
return await res.json() return await res.json()
} }
udpProxy.bind(constants.master_port); // starting UDP server for offloaded endpoints
module.exports = { module.exports = {
makeid, generateExecutor, reverseProxy, makeid, generateExecutor, reverseProxy,
getPort, logger, compare, getPort, logger, compare,
......
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 60, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
let payload = JSON.parse(msg)
console.log(payload, typeof payload);
executor(payload).then(result => {
result = JSON.stringify(result)
try {
udpProxy.send(result, 0, result.length, "8080", "localhost", function (err, bytes) {
if (err)
console.log(err)
console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
server.bind(port);
setInterval(shouldDie, 1000);
\ No newline at end of file
...@@ -18,7 +18,7 @@ let kafka = require('kafka-node'), ...@@ -18,7 +18,7 @@ let kafka = require('kafka-node'),
]) ])
function getAddress() { function getAddress() {
return Object.keys(workerNodes)[0];
} }
consumer.on('message', function (message) { consumer.on('message', function (message) {
......
...@@ -7,7 +7,8 @@ let db = new Map(), // queue holding request to be dispatched ...@@ -7,7 +7,8 @@ let db = new Map(), // queue holding request to be dispatched
// resources associated with the function // resources associated with the function
workerNodes = new Map(), // list of worker nodes currently known to the DM workerNodes = new Map(), // list of worker nodes currently known to the DM
functionBranchTree = new Map(), // a tree to store function branch predictions functionBranchTree = new Map(), // a tree to store function branch predictions
conditionProbabilityExplicit = new Map() // tree holding conditional probabilities for explicit chains conditionProbabilityExplicit = new Map(), // tree holding conditional probabilities for explicit chains
requestFlightQueue = new Map() // map to store in flight requests
/** /**
* URL to the couchdb database server used to store data * URL to the couchdb database server used to store data
...@@ -27,6 +28,6 @@ explicitChainDB = explicitChainDB + "/" + constants.db.explicit_chain_meta + "/" ...@@ -27,6 +28,6 @@ explicitChainDB = explicitChainDB + "/" + constants.db.explicit_chain_meta + "/"
module.exports = { module.exports = {
db, functionBranchTree, functionToResource, workerNodes, resourceMap, db, functionBranchTree, functionToResource, workerNodes, resourceMap,
conditionProbabilityExplicit, conditionProbabilityExplicit, requestFlightQueue,
metadataDB, metricsDB, implicitChainDB, explicitChainDB metadataDB, metricsDB, implicitChainDB, explicitChainDB
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment