Commit b849dcc9 authored by Naman Dixit's avatar Naman Dixit

Added timestamp to RM, fixed a few bugs

parent e5c9d0a6
......@@ -182,14 +182,16 @@ Sint main (Sint argc, Char *argv[])
return -1;
}
Sint time_of_launch = (Sint)time(0);
while (global_keep_running) {
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 100);
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 0);
for (Size j = 0; j < gt.map.slot_count; j++) {
if (gt.map.values[j] != 0) {
Grunt g = gt.grunts[gt.map.values[j]];
g.time_to_die -= 100;
g.time_to_die -= 10;
}
}
......@@ -210,6 +212,15 @@ Sint main (Sint argc, Char *argv[])
const char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(kafka_message_read->payload, &json_error, true);
if ((cJSON_GetObjectItem(root, "timestamp") == NULL) ||
(cJSON_GetObjectItem(root, "timestamp")->valueint) < time_of_launch) {
printf("Ignoring : %s\n", kafka_message_read->payload);
cJSON_Delete(root);
rd_kafka_message_destroy(kafka_message_read);
continue;
}
if (kafka_message_read->rkt == topic_req_dm2a) {
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = strdup(cJSON_GetObjectItem(root, "resource_id")->valuestring);
......@@ -253,7 +264,7 @@ Sint main (Sint argc, Char *argv[])
gruntTrackBegin(&gt, grunt);
}
} else if (kafka_message_read->rkt == topic_beat_g2a) {
Char *id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
U64 index = htLookup(&gt.map, hashString(id));
if (index != 0) { // Prevent any left over message
......@@ -269,7 +280,7 @@ Sint main (Sint argc, Char *argv[])
Command c = {.kind = Command_REJOIN_ARBITER_2_GRUNT,
.rejoin_a2g.grunt_id = id};
sbufAdd(commands, c);
logMessage("Beat G2A:\tid: %s (UNNOWN)", id);
logMessage("Beat G2A:\tid: %s (UNNOWN)", c.rejoin_a2g.grunt_id);
}
} else if (kafka_message_read->rkt == topic_res_g2a) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
......@@ -347,6 +358,8 @@ Sint main (Sint argc, Char *argv[])
for (Size j = 0; j < sbufElemin(commands); j++) {
Command c = commands[j];
Sint timestamp = (Sint)time(0);
Char *output = NULL;
Char *topic = NULL;
......@@ -355,11 +368,13 @@ Sint main (Sint argc, Char *argv[])
sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"memory\": %d\n", c.req_a2g.memory);
sbufPrint(output, ",\n\"timestamp\": %d\n", timestamp);
sbufPrint(output, "\n}\n");
} else if (c.kind == Command_RESPONSE_ARBITER_2_DM) {
topic = "RESPONSE_RM_2_DM";
sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"timestamp\": %d\n", timestamp);
sbufPrint(output, ",\n\"nodes\": [");
for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) {
sbufPrint(output, "\"%s\"", c.res_a2d.grunt_ids[k]);
......@@ -373,7 +388,10 @@ Sint main (Sint argc, Char *argv[])
topic = "REJOIN_RM_2_RD";
sbufPrint(output, "{\n\"node_id\": \"%s\"", c.rejoin_a2g.grunt_id);
sbufPrint(output, ",\n\"timestamp\": %d\n", timestamp);
sbufPrint(output, "\n}");
free(c.rejoin_a2g.grunt_id);
}
if (output != NULL) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment