Commit b2c7f892 authored by Nilanjan Daw's avatar Nilanjan Daw

Merge remote-tracking branch 'origin/master'

parents ca680d11 626ea0c0
......@@ -31,14 +31,14 @@ typedef struct Grunt_Survey {
U16 *ports;
U64 milli_passed;
U64 milli_last;
Char *txn_id;
Char *resource_id;
} Grunt_Survey;
typedef struct Command {
enum Command_Kind {
Command_NONE,
Command_REQUEST_DISPATCHER_2_ARBITER,
Command_RESPONSE_ARBITER_2_DISPATCHER,
Command_REQUEST_DM_2_ARBITER,
Command_RESPONSE_ARBITER_2_DM,
Command_REQUEST_ARBITER_2_GRUNT,
Command_RESPONSE_GRUNT_2_ARBITER,
......@@ -46,7 +46,7 @@ typedef struct Command {
Command_HEARTBEAT_GRUNT_2_ARBITER,
} kind;
Char *txn_id;
Char *resource_id;
union {
struct {
......@@ -113,18 +113,38 @@ Sint main (Sint argc, Char *argv[])
Kafka kafka = {0};
kafkaCreateWriter(&kafka, "10.129.6.5:9092");
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(&kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka.writer); \
return -1; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DM_2_RM"); //
CREATE_TOPIC("RESPONSE_RM_2_DM");
CREATE_TOPIC("REQUEST_RM_2_RD");
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("LOG_COMMON"); //
#undef CREATE_TOPIC
kafkaCreateReader(&kafka, "10.129.6.5:9092");
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_t *topic_req_d2a = kafkaSubscribe(&kafka, kafka_reader_topics,
"REQUEST_DISPATCHER_2_ARBITER");
rd_kafka_topic_t *topic_req_dm2a = kafkaSubscribe(&kafka, kafka_reader_topics,
"REQUEST_DM_2_RM");
rd_kafka_topic_t *topic_join_g2a = kafkaSubscribe(&kafka, kafka_reader_topics,
"JOIN_GRUNT_2_ARBITER");
"JOIN_RD_2_RM");
rd_kafka_topic_t *topic_res_g2a = kafkaSubscribe(&kafka, kafka_reader_topics,
"RESPONSE_GRUNT_2_ARBITER");
"RESPONSE_RD_2_RM");
rd_kafka_topic_t *topic_beat_g2a = kafkaSubscribe(&kafka, kafka_reader_topics,
"HEARTBEAT_GRUNT_2_ARBITER");
"HEARTBEAT_RD_2_RM");
rd_kafka_topic_t *topic_log = kafkaSubscribe(&kafka, kafka_reader_topics,
"LOG_COMMON");
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader,
kafka_reader_topics);
......@@ -158,20 +178,22 @@ Sint main (Sint argc, Char *argv[])
const char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(kafka_message_read->payload, &json_error, true);
if (kafka_message_read->rkt == topic_req_d2a) {
Command c = {.kind = Command_REQUEST_ARBITER_2_GRUNT};
c.txn_id = cJSON_GetObjectItem(root, "id")->valuestring;
if (kafka_message_read->rkt == topic_req_dm2a) {
Command c = {.kind = Command_REQUEST_DM_2_ARBITER};
c.resource_id = strdup(cJSON_GetObjectItem(root, "resource_id")->valuestring);
// TODO(naman): Add any new resource fields here
Sint memory = cJSON_GetObjectItem(root, "memory")->valueint;
logMessage("Request D2A:\tid: %s = ([memory] = %d)", c.txn_id, memory);
logMessage("Request DM2RM:\tid: %s = ([memory] = %d)",
c.resource_id, memory);
Char **grunt_ids = NULL;
for (Size j = 0; j < sbufElemin(grunts); j++) {
Grunt g = grunts[j];
if (g.memory >= memory) {
c.kind = Command_RESPONSE_ARBITER_2_DISPATCHER;
c.kind = Command_RESPONSE_ARBITER_2_DM;
sbufAdd(grunt_ids, g.id);
}
}
......@@ -179,16 +201,16 @@ Sint main (Sint argc, Char *argv[])
if (c.kind == Command_REQUEST_ARBITER_2_GRUNT) {
c.req_a2g.memory = memory;
Grunt_Survey *gs = calloc(1, sizeof(*gs));
htInsert(&grunt_survey_map, hashString(c.txn_id), (Uptr)gs);
htInsert(&grunt_survey_map, hashString(c.resource_id), (Uptr)gs);
gs->milli_last = timeMilli();
gs->txn_id = c.txn_id;
} else if (c.kind == Command_RESPONSE_ARBITER_2_DISPATCHER) {
gs->resource_id = c.resource_id;
} else if (c.kind == Command_RESPONSE_ARBITER_2_DM) {
c.res_a2d.grunt_ids = grunt_ids;
sbufAdd(commands, c);
}
} else if (kafka_message_read->rkt == topic_join_g2a) {
Char *id = cJSON_GetObjectItem(root, "id")->valuestring;
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
Grunt grunt = {.id = id};
logMessage("Join G2A:\tid: %s", id);
......@@ -198,32 +220,45 @@ Sint main (Sint argc, Char *argv[])
htInsert(&grunt_map, hashString(id), sbufElemin(grunts) - 1);
}
} else if (kafka_message_read->rkt == topic_res_g2a) {
Char *id = cJSON_GetObjectItem(root, "id")->valuestring;
B32 success = (B32)(cJSON_GetObjectItem(root, "success")->valueint);
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
B32 success = (B32)cJSON_IsTrue(cJSON_GetObjectItem(root, "success"));
logMessage("Response G2A:\tid: %s = %s", id, success ? "succeded" : "failed");
logMessage("Response G2A:\tid: %s (%s) = %s",
resource_id, node_id, success ? "succeded" : "failed");
if (success) {
Grunt_Survey *gs = (Grunt_Survey *)htLookup(&grunt_survey_map,
hashString(id));
hashString(resource_id));
if (gs != NULL) { // If it has not been already removed
Grunt *g = &grunts[htLookup(&grunt_map, hashString(id))];
Grunt *g = &grunts[htLookup(&grunt_map, hashString(node_id))];
sbufAdd(gs->grunt_ptrs, g);
}
}
} else if (kafka_message_read->rkt == topic_beat_g2a) {
Char *id = cJSON_GetObjectItem(root, "id")->valuestring;
Char *id = cJSON_GetObjectItem(root, "node_id")->valuestring;
logMessage("Beat G2A:\tid: %s", id);
U64 index = htLookup(&grunt_map, hashString(id));
if (index != 0) { // Prevent any left over message
// TODO(naman): Add any new resource fields here
grunts[index].memory = cJSON_GetObjectItem(root, "memory")->valueint;
}
} else {
} else if (kafka_message_read->rkt == topic_log) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
Char *function_id = cJSON_GetObjectItem(root, "function_id")->valuestring;
unused_variable(node_id);
unused_variable(resource_id);
unused_variable(function_id);
} else {
// TODO(naman): Error
}
cJSON_Delete(root);
}
rd_kafka_message_destroy(kafka_message_read);
......@@ -237,16 +272,18 @@ Sint main (Sint argc, Char *argv[])
gs->milli_passed += milli_new - gs->milli_last;
gs->milli_last = milli_new;
if (gs->milli_passed >= 1000) {
htRemove(&grunt_survey_map, hashString(gs->txn_id));
Command c = {.kind = Command_RESPONSE_ARBITER_2_DISPATCHER};
c.txn_id = gs->txn_id;
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = gs->resource_id;
for (Size k = 0; k < sbufElemin(gs->grunt_ptrs); k++) {
sbufAdd(c.res_a2d.grunt_ids, gs->grunt_ptrs[k]->id);
}
sbufAdd(commands, c);
free(gs->grunt_ptrs);
free(gs->ports);
htRemove(&grunt_survey_map, hashString(gs->resource_id));
}
}
}
......@@ -260,14 +297,14 @@ Sint main (Sint argc, Char *argv[])
if (c.kind == Command_REQUEST_ARBITER_2_GRUNT) {
topic = "REQUEST_ARBITER_2_GRUNT";
sbufPrint(output, "{\n\"id\": \"%s\"", c.txn_id);
sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"memory\": %d\n", c.req_a2g.memory);
sbufPrint(output, "\n}\n");
} else if (c.kind == Command_RESPONSE_ARBITER_2_DISPATCHER) {
} else if (c.kind == Command_RESPONSE_ARBITER_2_DM) {
topic = "RESPONSE_ARBITER_2_DISPATCHER";
sbufPrint(output, "{\n\"id\": \"%s\"", c.txn_id);
sbufPrint(output, ",\n\"grunts\": [");
sbufPrint(output, "{\n\"id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"nodes\": [");
for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) {
sbufPrint(output, "\"%s\"", c.res_a2d.grunt_ids[k]);
if (k < sbufElemin(c.res_a2d.grunt_ids) - 1) {
......@@ -280,12 +317,13 @@ Sint main (Sint argc, Char *argv[])
if (output != NULL) {
printf("Sending to %s\n%s\n", topic, output);
if (!kafkaWrite(kafka.writer, topic, "rm_arbiter", output)) {
if (!kafkaWrite(kafka.writer, topic, "resource_manager", output)) {
return -1;
}
}
sbufDelete(output);
sbufDelete(commands);
free(c.resource_id);
sbufUnsortedRemove(commands, j);
}
}
......
......@@ -106,22 +106,6 @@ rd_kafka_t* kafkaCreateWriter (Kafka *kafka, Char *address)
printf("Ading brokers to writer\n");
rd_kafka_brokers_add(kafka->writer, address);
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka->writer); \
return NULL; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DISPATCHER_2_ARBITER"); //
CREATE_TOPIC("RESPONSE_ARBITER_2_DISPATCHER");
CREATE_TOPIC("REQUEST_ARBITER_2_GRUNT");
CREATE_TOPIC("RESPONSE_GRUNT_2_ARBITER"); //
CREATE_TOPIC("JOIN_GRUNT_2_ARBITER"); //
CREATE_TOPIC("HEARTBEAT_GRUNT_2_ARBITER"); //
#undef CREATE_TOPIC
return kafka->writer;
}
......
......@@ -56,20 +56,44 @@ void signalHandlerSIGINT (int _)
int main(int argc, char** argv)
{
unused_variable(argc);
unused_variable(argv);
Char *node_name = NULL;
if (argc > 1) {
node_name = argv[1];
} else {
Char hostname[1024] = {0};
gethostname(hostname, 1023);
sbufPrint(node_name, "%s", hostname);
}
signal(SIGINT, signalHandlerSIGINT);
Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(&kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka.writer); \
return -1; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DM_2_RM"); //
CREATE_TOPIC("RESPONSE_RM_2_DM");
CREATE_TOPIC("REQUEST_RM_2_RD");
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_t *topic_req_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REQUEST_ARBITER_2_GRUNT");
"REQUEST_RM_2_RD");
unused_variable(topic_req_a2g);
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
......@@ -83,10 +107,10 @@ int main(int argc, char** argv)
}
Char *join_msg = NULL;
sbufPrint(join_msg, "{\"id\": \"my-machine\"");
sbufPrint(join_msg, "{\"node_id\": \"%s\"", node_name);
sbufPrint(join_msg, "\n}\n");
if (!kafkaWrite(kafka.writer, "JOIN_GRUNT_2_ARBITER", "rm_grunt", join_msg)) {
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) {
return -1;
}
......@@ -122,7 +146,7 @@ int main(int argc, char** argv)
// TODO(naman): Error
} else {
command_found = true;
c.txn_id = cJSON_GetObjectItem(root, "id")->valuestring;
c.txn_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint;
}
}
......@@ -144,22 +168,25 @@ int main(int argc, char** argv)
if (command_found) {
Char *output = NULL;
sbufPrint(output, "{\n\"id\": \"%s\"", c.txn_id);
sbufPrint(output, "{\n\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"resource_id\": \"%s\"", c.txn_id);
if (memory >= c.res.memory) {
sbufPrint(output, ",\n\"success\": %d\n", 1);
sbufPrint(output, ",\n\"success\": true\n");
// TODO(naman): Add port
// sbufPrint(output, ",\n\"port\": %d\n", port);
} else {
sbufPrint(output, ",\n\"success\": %d\n", 0);
sbufPrint(output, ",\n\"success\": false\n");
}
sbufPrint(output, "\n}\n");
if (!kafkaWrite(kafka.writer, "RESPONSE_GRUNT_2_ARBITER", "rm_grunt", output)) {
if (!kafkaWrite(kafka.writer, "RESPONSE_RD_2_RM", "resource_daemon", output)) {
return -1;
}
} else { // Send a heartbeat message if it is time to do so
}
{ // Send a heartbeat message if it is time to do so
U64 time_new = timeMilli();
U64 time_passed = time_new - time_begin;
time_begin = time_new;
......@@ -170,12 +197,12 @@ int main(int argc, char** argv)
Char *output = NULL;
sbufPrint(output, "{\"id\": \"my-machine\"");
sbufPrint(output, "{\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"memory\": %d", memory);
sbufPrint(output, "\n}\n");
if (!kafkaWrite(kafka.writer, "HEARTBEAT_GRUNT_2_ARBITER", "rm_grunt", output)) {
if (!kafkaWrite(kafka.writer, "HEARTBEAT_RD_2_RM", "resource_daemon", output)) {
return -1;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment