Commit d5be07ce authored by Naman Dixit's avatar Naman Dixit

Fixed all bugs in RM

parent 72a72d4b
......@@ -24,10 +24,12 @@
typedef struct Grunt {
Char *id;
Sint memory;
B32 rejoin_asked;
Sint time_to_die; // in ms
} Grunt;
typedef struct Grunt_Survey {
Grunt **grunt_ptrs;
Char **grunt_ids;
U16 *ports;
U64 milli_passed;
U64 milli_last;
......@@ -37,22 +39,14 @@ typedef struct Grunt_Survey {
typedef struct Command {
enum Command_Kind {
Command_NONE,
Command_REQUEST_DM_2_ARBITER,
Command_RESPONSE_ARBITER_2_DM,
Command_REQUEST_ARBITER_2_GRUNT,
Command_RESPONSE_GRUNT_2_ARBITER,
Command_HEARTBEAT_GRUNT_2_ARBITER,
Command_REJOIN_ARBITER_2_GRUNT,
} kind;
Char *resource_id;
union {
struct {
Sint memory;
} req_d2a;
struct {
Char **grunt_ids;
} res_a2d;
......@@ -62,17 +56,49 @@ typedef struct Command {
} req_a2g;
struct {
Char *id;
U16 port;
} res_g2a;
struct {
Char *id;
Sint memory;
} beat_g2a;
Char *grunt_id;
} rejoin_a2g;
};
} Command;
typedef struct Grunt_Tracker {
Grunt *grunts;
Size *free_list;
Hash_Table map;
} Grunt_Tracker;
internal_function
void gruntTrackBegin (Grunt_Tracker *t, Grunt g)
{
if (t->grunts == NULL) {
t->map = htCreate(0);
sbufAdd(t->grunts, (Grunt){0}); // SInce 0 index out of hash table will be invalid
}
Size insertion_index = 0;
if (sbufElemin(t->free_list) > 0) {
t->grunts[t->free_list[0]] = g;
insertion_index = t->free_list[0];
sbufUnsortedRemove(t->free_list, 0);
} else {
sbufAdd(t->grunts, g);
insertion_index = sbufElemin(t->grunts) - 1;
}
htInsert(&t->map, hashString(g.id), insertion_index);
}
internal_function
void gruntTrackEnd (Grunt_Tracker *t, Char *grunt_id)
{
Size index = htLookup(&t->map, hashString(grunt_id));
sbufAdd(t->free_list, index);
free(t->grunts[index].id);
t->grunts[index] = (Grunt){0};
htRemove(&t->map, index);
}
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wpadded"
......@@ -104,11 +130,9 @@ Sint main (Sint argc, Char *argv[])
signal(SIGINT, signalHandlerSIGINT);
Command *commands = NULL;
Grunt *grunts = NULL;
Hash_Table grunt_map = htCreate(0);
Hash_Table grunt_survey_map = htCreate(0);
Grunt_Tracker gt = {0};
sbufAdd(grunts, (Grunt){0}); // SInce 0 index out of hash table will be invalid
Hash_Table grunt_survey_map = htCreate(0);
Kafka kafka = {0};
......@@ -128,6 +152,7 @@ Sint main (Sint argc, Char *argv[])
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); //
#undef CREATE_TOPIC
......@@ -161,6 +186,13 @@ Sint main (Sint argc, Char *argv[])
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 100);
for (Size j = 0; j < gt.map.slot_count; j++) {
if (gt.map.values[j] != 0) {
Grunt g = gt.grunts[gt.map.values[j]];
g.time_to_die -= 100;
}
}
if (kafka_message_read != NULL) {
if (kafka_message_read->err) {
/* Consumer error: typically just informational. */
......@@ -179,7 +211,7 @@ Sint main (Sint argc, Char *argv[])
cJSON *root = cJSON_ParseWithOpts(kafka_message_read->payload, &json_error, true);
if (kafka_message_read->rkt == topic_req_dm2a) {
Command c = {.kind = Command_REQUEST_DM_2_ARBITER};
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = strdup(cJSON_GetObjectItem(root, "resource_id")->valuestring);
// TODO(naman): Add any new resource fields here
......@@ -190,11 +222,13 @@ Sint main (Sint argc, Char *argv[])
Char **grunt_ids = NULL;
for (Size j = 0; j < sbufElemin(grunts); j++) {
Grunt g = grunts[j];
for (Size j = 0; j < gt.map.slot_count; j++) {
if (gt.map.values[j] != 0) {
Grunt g = gt.grunts[gt.map.values[j]];
if (g.memory >= memory) {
c.kind = Command_RESPONSE_ARBITER_2_DM;
sbufAdd(grunt_ids, g.id);
sbufAdd(grunt_ids, strdup(g.id));
}
}
}
......@@ -211,13 +245,31 @@ Sint main (Sint argc, Char *argv[])
}
} else if (kafka_message_read->rkt == topic_join_g2a) {
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
Grunt grunt = {.id = id};
Grunt grunt = {.id = id, .time_to_die = 1000};
logMessage("Join G2A:\tid: %s", id);
if (htLookup(&grunt_map, hashString(id)) == 0) {
sbufAdd(grunts, grunt);
htInsert(&grunt_map, hashString(id), sbufElemin(grunts) - 1);
if (htLookup(&gt.map, hashString(id)) == 0) {
gruntTrackBegin(&gt, grunt);
}
} else if (kafka_message_read->rkt == topic_beat_g2a) {
Char *id = cJSON_GetObjectItem(root, "node_id")->valuestring;
U64 index = htLookup(&gt.map, hashString(id));
if (index != 0) { // Prevent any left over message
// TODO(naman): Add any new resource fields here
gt.grunts[index].time_to_die = 1000;
gt.grunts[index].memory = cJSON_GetObjectItem(root, "memory")->valueint;
logMessage("Beat G2A:\tid: %s (Memory %d)", id, gt.grunts[index].memory);
} else {
if ((gt.grunts != NULL) && (gt.grunts[index].rejoin_asked != true)) {
gt.grunts[index].rejoin_asked = true;
}
Command c = {.kind = Command_REJOIN_ARBITER_2_GRUNT,
.rejoin_a2g.grunt_id = id};
sbufAdd(commands, c);
logMessage("Beat G2A:\tid: %s (UNNOWN)", id);
}
} else if (kafka_message_read->rkt == topic_res_g2a) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
......@@ -232,20 +284,10 @@ Sint main (Sint argc, Char *argv[])
hashString(resource_id));
if (gs != NULL) { // If it has not been already removed
Grunt *g = &grunts[htLookup(&grunt_map, hashString(node_id))];
sbufAdd(gs->grunt_ptrs, g);
Grunt *g = &gt.grunts[htLookup(&gt.map, hashString(node_id))];
sbufAdd(gs->grunt_ids, strdup(g->id));
}
}
} else if (kafka_message_read->rkt == topic_beat_g2a) {
Char *id = cJSON_GetObjectItem(root, "node_id")->valuestring;
logMessage("Beat G2A:\tid: %s", id);
U64 index = htLookup(&grunt_map, hashString(id));
if (index != 0) { // Prevent any left over message
// TODO(naman): Add any new resource fields here
grunts[index].memory = cJSON_GetObjectItem(root, "memory")->valueint;
}
} else if (kafka_message_read->rkt == topic_log) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
......@@ -264,6 +306,16 @@ Sint main (Sint argc, Char *argv[])
rd_kafka_message_destroy(kafka_message_read);
}
for (Size j = 0; j < gt.map.slot_count; j++) {
if (gt.map.values[j] != 0) {
Size index = gt.map.values[j];
Grunt g = gt.grunts[index];
if (g.time_to_die <= 0) {
gruntTrackEnd(&gt, g.id);
}
}
}
for (Size i = 0; i < grunt_survey_map.slot_count; i++) {
if (grunt_survey_map.keys[i] != 0) {
Grunt_Survey *gs = (Grunt_Survey *)grunt_survey_map.values[i];
......@@ -275,13 +327,17 @@ Sint main (Sint argc, Char *argv[])
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = gs->resource_id;
for (Size k = 0; k < sbufElemin(gs->grunt_ptrs); k++) {
sbufAdd(c.res_a2d.grunt_ids, gs->grunt_ptrs[k]->id);
for (Size k = 0; k < sbufElemin(gs->grunt_ids); k++) {
Size index = htLookup(&gt.map, hashString(gs->grunt_ids[k]));
if (index != 0) {
Char *id = gt.grunts[index].id;
sbufAdd(c.res_a2d.grunt_ids, id);
}
}
sbufAdd(commands, c);
free(gs->grunt_ptrs);
sbufDelete(gs->grunt_ids);
free(gs->ports);
htRemove(&grunt_survey_map, hashString(gs->resource_id));
}
......@@ -313,6 +369,11 @@ Sint main (Sint argc, Char *argv[])
}
sbufPrint(output, "]");
sbufPrint(output, "\n}");
} else if (c.kind == Command_REJOIN_ARBITER_2_GRUNT) {
topic = "REJOIN_RM_2_RD";
sbufPrint(output, "{\n\"node_id\": \"%s\"", c.rejoin_a2g.grunt_id);
sbufPrint(output, "\n}");
}
if (output != NULL) {
......@@ -324,6 +385,12 @@ Sint main (Sint argc, Char *argv[])
sbufDelete(output);
free(c.resource_id);
if (c.kind == Command_RESPONSE_ARBITER_2_DM) {
for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) {
free(c.res_a2d.grunt_ids[k]);
}
}
sbufUnsortedRemove(commands, j);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment