Commit 26565f8c authored by nilanjandaw's avatar nilanjandaw

intermediate commit

parent d1b32115
{"id":"10.196.11.241","master_node":"10.129.6.5"}
\ No newline at end of file
{"id":"10.130.150.246","master_node":"10.129.6.5"}
\ No newline at end of file
......@@ -84,7 +84,7 @@ function runContainer(imageName, port) {
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference);
result += data;
workerEvent.emit('start', imageName, port)
workerEvent.emit('start', imageName, port, "container")
resolve(result);
});
......@@ -94,7 +94,7 @@ function runContainer(imageName, port) {
});
process.on('close', (code) => {
workerEvent.emit('end', port);
workerEvent.emit('end', port, "container");
})
}
......@@ -111,7 +111,7 @@ function runContainer(imageName, port) {
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference);
// result += data;
workerEvent.emit('start', imageName, port)
workerEvent.emit('start', imageName, port, "container")
resolve(result);
});
......@@ -121,7 +121,7 @@ function runContainer(imageName, port) {
});
process.on('close', (code) => {
workerEvent.emit('end', port);
workerEvent.emit('end', port, "container");
})
} else {
const process = spawn('docker', ["start", "-a", imageName]);
......@@ -132,7 +132,7 @@ function runContainer(imageName, port) {
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("container run time taken: ", timeDifference);
result += data;
workerEvent.emit('start', imageName, port)
workerEvent.emit('start', imageName, port, "container")
resolve(result);
});
......@@ -142,7 +142,7 @@ function runContainer(imageName, port) {
});
process.on('close', (code) => {
workerEvent.emit('end', port);
workerEvent.emit('end', port, "container");
})
}
})
......
......@@ -10,7 +10,7 @@ const kafka = require('kafka-node')
const local_repository = __dirname + "/local_repository/"
const host_url = "http://" + constants.master_address + ":" + constants.master_port
let functionToPort = new Map(), usedPort = new Map()
let usedPort = new Map()
let Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
......@@ -56,7 +56,6 @@ libSupport.makeTopic(node_id).then(() => {
function startWorker(local_repository, functionHash,function_id, producer, runtime) {
let port = libSupport.getPort(usedPort)
usedPort.set(port, functionHash)
functionToPort.set(functionHash, port)
if (runtime === "isolate")
execute.runIsolate(local_repository + functionHash).then(result => {
producer.send([{
......@@ -81,17 +80,18 @@ function startWorker(local_repository, functionHash,function_id, producer, runti
}], () => { })
})
else if (runtime === "container")
execute.runContainer(functionHash, port).then(result => {
producer.send(
[{
topic: "response",
messages: JSON.stringify({
status: "success",
result,
function_id
})
}], () => { })
})
execute.runContainer(functionHash, port)
// .then(result => {
// producer.send(
// [{
// topic: "response",
// messages: JSON.stringify({
// status: "success",
// result,
// function_id
// })
// }], () => { })
// })
else {
producer.send(
[{
......@@ -113,15 +113,24 @@ function heartbeat() {
})
}
execute.workerEvent.on("start", (port, functionHash) => {
execute.workerEvent.on("start", (functionHash, port, runtime) => {
console.log("started function Port: ", port, functionHash);
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, port, runtime, node_id })
}], () => { })
})
execute.workerEvent.on('end', (port) => {
let functionHash = usedPort.get(port)
usedPort.delete(port)
functionToPort.delete(functionHash)
producer.send(
[{
topic: "removeWorker",
messages: JSON.stringify({ functionHash, port, runtime, node_id })
}], () => { })
console.log("Ending worker for function", functionHash, usedPort, functionToPort);
......
......@@ -12,6 +12,8 @@ const mqtt = require('mqtt')
const app = express()
const libSupport = require('./lib')
let functionToPort = new Map()
let kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
......@@ -23,7 +25,7 @@ let kafka = require('kafka-node'),
Consumer = kafka.Consumer,
consumer = new Consumer(client,
[
{ topic: 'response', partition: 0, offset: 0}, { topic: 'heartbeat' }
{ topic: 'response', partition: 0, offset: 0}, { topic: 'heartbeat' }, {topic: "deployed"}
],
[
{ autoCommit: true }
......@@ -141,13 +143,19 @@ function deployContainer(path, imageName) {
}
app.post('/serverless/execute/:id', (req, res) => {
let runtime = req.body.runtime
if (functionToPort.has(req.params.id + runtime)) {
requestQueue.push({
req, res
})
let forwardTo = functionToPort.get(req.params.id + runtime)
console.log("resource found", forwardTo);
res.redirect(307, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`);
console.log("sending request to:", `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`);
} else {
requestQueue.push({ req, res })
if (requestQueue.length >= WINDOW_SIZE)
dispatch()
}
})
......@@ -158,6 +166,7 @@ function dispatch() {
console.log(req.body)
let runtime = req.body.runtime
let functionHash = req.params.id
let function_id = libSupport.makeid(20)
console.log("Dispatching function with Id", function_id, runtime);
let node_id = getAddress()
......@@ -216,6 +225,7 @@ consumer.on('message', function (message) {
message = JSON.parse(message)
console.log(message);
let res = db.get(message.function_id)
if (res != null)
res.json({
"status": "success",
"reply": message.result
......@@ -228,6 +238,15 @@ consumer.on('message', function (message) {
workerNodes.push(message.address)
console.log(workerNodes);
}
} else if (topic == "deployed") {
let res = db.get(message.function_id)
message = JSON.parse(message)
console.log(message);
// res.redirect(307, `http://${forwardTo.node_id}:${forwardTo.port}/serverless/function/execute`);
functionToPort.set(message.functionHash + message.runtime, {port: parseInt(message.port),
node_id: message.node_id})
console.log(functionToPort);
}
});
......
......@@ -5,7 +5,7 @@ const app = express()
app.use(bodyParser.urlencoded({ extended: true }))
app.use(bodyParser.json())
let port = 5000, lastRequest = Date.now()
app.post('/serverless/function/execute', (req, res) => {
app.post('/serverless/function/execute/:id', (req, res) => {
let payload = req.body
lastRequest = Date.now()
executor(payload).then((result) => {
......
......@@ -62,25 +62,32 @@ The DM is divided into two submodules the **Dispatcher** and the **Dispatch Daem
- Run the Master and Worker server as `npm start` or `node index.js`
### Internal Communication Interfaces
Internally DM uses Apache Kafka for interaction between the Dispatcher and the Dispatch Agents, while the messages are in JSON format.
Every Dispatch Agent listens on a topic which is its own UID (Currently the primary IP Address), the Dispatcher listens on the topics *"response"* and *"heartbeat"*.
- **Request Message:** When a request is received at the Dispatcher, it directs the Dispatch Agent to start a worker environment. A message is sent via the chose Worker's ID topic. \
Format:
```javascript
{ type: "execute",
function_id: "onetime unique ID",
runtime: "isolation runtime",
functionHash: "hash of the function to be run" }
```
- **Response Message:** In response, the worker executes the function, pulling resources from the central repository as required and sends a response. \
Format:
```javascript
{ status: 'success',
result: 'result of the execution',
function_id: 'onetime unique ID' }
```
- **Heartbeat Message:** The Dispatch Daemons also publish a periodic Heartbeat message back to the Dispatcher as a liveness test.\
Format:
```javascript
{ address: 'UID of the worker' }
```
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment