Commit 407c256c authored by nilanjandaw's avatar nilanjandaw

Merge branch 'master' of https://git.cse.iitb.ac.in/synerg/xanadu

parents cc255a08 e8fdecac
...@@ -4,6 +4,6 @@ node_modules ...@@ -4,6 +4,6 @@ node_modules
package-lock.json package-lock.json
firecracker* firecracker*
secrets.json secrets.json
resource_manager/bin/** resource_system/bin/**
resource_manager/version.linux resource_system/version.linux
grunt grunt
\ No newline at end of file
[submodule "resource_manager/src/common/cJSON"] [submodule "resource_manager/src/common/cJSON"]
path = resource_manager/src/common/cJSON path = resource_system/src/common/cJSON
url = https://github.com/DaveGamble/cJSON url = https://github.com/DaveGamble/cJSON
[submodule "resource_manager/src/common/nlib"] [submodule "resource_manager/src/common/nlib"]
path = resource_manager/src/common/nlib path = resource_system/src/common/nlib
url = https://github.com/namandixit/nlib url = https://github.com/namandixit/nlib
...@@ -23,6 +23,7 @@ The Dispatch Manager (DM) sends a request to the Resource Manager (RM), detailin ...@@ -23,6 +23,7 @@ The Dispatch Manager (DM) sends a request to the Resource Manager (RM), detailin
```javascript ```javascript
{ {
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
"memory": 1024, // in MiB "memory": 1024, // in MiB
... // Any other resources ... // Any other resources
} }
...@@ -33,6 +34,7 @@ Format: ...@@ -33,6 +34,7 @@ Format:
```javascript ```javascript
{ {
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
// "port": 2343 --- NOT IMPLEMENTED YET // "port": 2343 --- NOT IMPLEMENTED YET
"grunts": ["a", "b", ...] // List of machine IDs "grunts": ["a", "b", ...] // List of machine IDs
} }
...@@ -41,9 +43,10 @@ Format: ...@@ -41,9 +43,10 @@ Format:
Once the runtime entity has been launched (or the launch has failed), the Executor sends back a status message on the `LOG_COMMON` topic. Once the runtime entity has been launched (or the launch has failed), the Executor sends back a status message on the `LOG_COMMON` topic.
```javascript ```javascript
{ {
"node_id" "node_id" : "uique-machine-id",
"resource_id" "resource_id": "logical-entity-id",
"function_id" "function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp",
"reason": "deployment"/"termination" "reason": "deployment"/"termination"
"status": true/false // Only valid if reason==deployment "status": true/false // Only valid if reason==deployment
} }
...@@ -53,25 +56,28 @@ Instrumentation data is also sent on the `LOG_COMMON` topic. This data is sent f ...@@ -53,25 +56,28 @@ Instrumentation data is also sent on the `LOG_COMMON` topic. This data is sent f
and whoever needs the data is allowed to read it. Each message is required to have atleast three fields: `node_id`, `resource_id` and `function_id`. and whoever needs the data is allowed to read it. Each message is required to have atleast three fields: `node_id`, `resource_id` and `function_id`.
```javascript ```javascript
{ // Example message from Executor { // Example message from Executor
"node_id" "node_id" : "uique-machine-id",
"resource_id" "resource_id": "logical-entity-id",
"function_id" "function_id": "unique-function-id",
"cpu" "timestamp" : "iso-8601-timestamp",
"memory" "cpu" : 343, // in MHz
"network" "memory": 534, // in MiB
"network": 234 // in KBps
} }
{ // Example message from reverse proxy { // Example message from reverse proxy
"node_id" "node_id" : "uique-machine-id",
"resource_id" "resource_id": "logical-entity-id",
"function_id" "function_id": "unique-function-id",
"average_fn_time" "timestamp" : "iso-8601-timestamp",
"average_fn_time" : 23 // in ms
} }
{ // Example message from dispatch manager { // Example message from dispatch manager
"node_id" "node_id" : "uique-machine-id",
"resource_id" "resource_id": "logical-entity-id",
"function_id" "function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp",
"coldstart_time" "coldstart_time"
} }
``` ```
...@@ -206,6 +212,7 @@ Upon being launched, each Resource Daemon (RD) sends a JOIN message to the RM on ...@@ -206,6 +212,7 @@ Upon being launched, each Resource Daemon (RD) sends a JOIN message to the RM on
```javascript ```javascript
{ {
"node_id": "unique-machine-id", "node_id": "unique-machine-id",
"timestamp" : "iso-8601-timestamp"
} }
``` ```
...@@ -215,16 +222,30 @@ resources being tracked by RDs on each machine. This data is cached by the RM. ...@@ -215,16 +222,30 @@ resources being tracked by RDs on each machine. This data is cached by the RM.
```javascript ```javascript
{ {
"node_id": "unique-machine-id", "node_id": "unique-machine-id",
"timestamp" : "iso-8601-timestamp",
"memory": 1024, // in MiB "memory": 1024, // in MiB
... // Any other resources ... // Any other resources
} }
``` ```
If the RM recieves a heartbeat from an RD which has not joined before (due to either the RM dying and some older messages being stuck in Kafka),
it sends a rejoin command to the RD on topic `REJOIN_RM_2_RD`.
```javascript
{
"node_id": "unique-machine-id",
"timestamp" : "iso-8601-timestamp"
}
```
Also, if the RM doesn't recieve heartbeats from some RD for some amount of time, it assumes that the RD is dead and frees its resources.
If the RD sends a beat after this, the rejoin message is sent.
The RM, upon recieving the request from the DM, checks its local cache to find a suitable machine. If it finds some, it sends a message back to the The RM, upon recieving the request from the DM, checks its local cache to find a suitable machine. If it finds some, it sends a message back to the
DM on topic `RESPONSE_RM_2_DM`. DM on topic `RESPONSE_RM_2_DM`.
```javascript ```javascript
{ {
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
// "port": 2343 --- NOT IMPLEMENTED YET // "port": 2343 --- NOT IMPLEMENTED YET
"nodes": ["a", "b", ...] // List of unique machine IDs "nodes": ["a", "b", ...] // List of unique machine IDs
} }
...@@ -236,6 +257,7 @@ Format: ...@@ -236,6 +257,7 @@ Format:
```javascript ```javascript
{ {
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
"memory": 1024, // in MiB "memory": 1024, // in MiB
... // Any other resources ... // Any other resources
} }
...@@ -246,6 +268,7 @@ The RDs recieve this message and send back whether on not they satisfy the const ...@@ -246,6 +268,7 @@ The RDs recieve this message and send back whether on not they satisfy the const
{ {
"node_id": "unique-machine-id", "node_id": "unique-machine-id",
"resource_id": "unique-transaction-id", "resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
"success" : 0/1 // 0 = fail, 1 = success "success" : 0/1 // 0 = fail, 1 = success
} }
``` ```
......
Dependency: Clang compiler
Run "build.linux" script to compile the project.
...@@ -250,7 +250,7 @@ Sint main (Sint argc, Char *argv[]) ...@@ -250,7 +250,7 @@ Sint main (Sint argc, Char *argv[])
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring; Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring; Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
Char *function_id = cJSON_GetObjectItem(root, "function_id")->valuestring; Char *function_id = cJSON_GetObjectItem(root, "function_id")->valuestring;
printf("%s\n", cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
unused_variable(node_id); unused_variable(node_id);
unused_variable(resource_id); unused_variable(resource_id);
unused_variable(function_id); unused_variable(function_id);
...@@ -295,15 +295,15 @@ Sint main (Sint argc, Char *argv[]) ...@@ -295,15 +295,15 @@ Sint main (Sint argc, Char *argv[])
Char *topic = NULL; Char *topic = NULL;
if (c.kind == Command_REQUEST_ARBITER_2_GRUNT) { if (c.kind == Command_REQUEST_ARBITER_2_GRUNT) {
topic = "REQUEST_ARBITER_2_GRUNT"; topic = "REQUEST_RM_2_RD";
sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id); sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"memory\": %d\n", c.req_a2g.memory); sbufPrint(output, ",\n\"memory\": %d\n", c.req_a2g.memory);
sbufPrint(output, "\n}\n"); sbufPrint(output, "\n}\n");
} else if (c.kind == Command_RESPONSE_ARBITER_2_DM) { } else if (c.kind == Command_RESPONSE_ARBITER_2_DM) {
topic = "RESPONSE_ARBITER_2_DISPATCHER"; topic = "RESPONSE_RM_2_DM";
sbufPrint(output, "{\n\"id\": \"%s\"", c.resource_id); sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"nodes\": ["); sbufPrint(output, ",\n\"nodes\": [");
for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) { for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) {
sbufPrint(output, "\"%s\"", c.res_a2d.grunt_ids[k]); sbufPrint(output, "\"%s\"", c.res_a2d.grunt_ids[k]);
...@@ -321,6 +321,7 @@ Sint main (Sint argc, Char *argv[]) ...@@ -321,6 +321,7 @@ Sint main (Sint argc, Char *argv[])
return -1; return -1;
} }
} }
sbufDelete(output); sbufDelete(output);
free(c.resource_id); free(c.resource_id);
sbufUnsortedRemove(commands, j); sbufUnsortedRemove(commands, j);
......
File moved
File moved
...@@ -48,11 +48,28 @@ int main(int argc, char** argv) ...@@ -48,11 +48,28 @@ int main(int argc, char** argv)
Kafka kafka = {0}; Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092"); kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(&kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka.writer); \
return -1; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DM_2_RM"); //
CREATE_TOPIC("RESPONSE_RM_2_DM");
CREATE_TOPIC("REQUEST_RM_2_RD");
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092"); kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
kafkaSubscribe(&kafka, kafka_reader_topics, "RESPONSE_ARBITER_2_DISPATCHER"); kafkaSubscribe(&kafka, kafka_reader_topics, "RESPONSE_RM_2_DM");
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics); rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics); rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
...@@ -67,14 +84,14 @@ int main(int argc, char** argv) ...@@ -67,14 +84,14 @@ int main(int argc, char** argv)
Char *output = NULL; Char *output = NULL;
Sint id = (Sint)time(NULL); Sint id = (Sint)time(NULL);
sbufPrint(output, "{\n\"id\": \"%d\"", id); sbufPrint(output, "{\n\"resource_id\": \"%d\"", id);
sbufPrint(output, ",\n\"memory\": %d", memory_required); sbufPrint(output, ",\n\"memory\": %d", memory_required);
sbufPrint(output, "\n}\n"); sbufPrint(output, "\n}\n");
printf("Sending to Arbiter:\n%s\n", cJSON_Print(cJSON_Parse(output))); printf("Sending to Arbiter:\n%s\n", cJSON_Print(cJSON_Parse(output)));
if (output != NULL) { if (output != NULL) {
if (!kafkaWrite(kafka.writer, "REQUEST_DISPATCHER_2_ARBITER", "rm_test", output)) { if (!kafkaWrite(kafka.writer, "REQUEST_DM_2_RM", "rm_test", output)) {
return -1; return -1;
} }
} }
...@@ -116,7 +133,7 @@ int main(int argc, char** argv) ...@@ -116,7 +133,7 @@ int main(int argc, char** argv)
if (root == NULL) { if (root == NULL) {
// TODO(naman): Error // TODO(naman): Error
} else { } else {
cJSON *array = cJSON_GetObjectItem(root, "id"); cJSON *array = cJSON_GetObjectItem(root, "resource_id");
cJSON *elem = NULL; cJSON *elem = NULL;
cJSON_ArrayForEach(elem, array) { cJSON_ArrayForEach(elem, array) {
printf("%s\n", elem->valuestring); printf("%s\n", elem->valuestring);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment