Commit 13692b53 authored by Naman Dixit's avatar Naman Dixit

Worked around first message's latency issues, pulled parameters into preprocessor defines

parent a086a716
......@@ -3,7 +3,14 @@
* Notice: © Copyright 2020 Naman Dixit
*/
#define KAFKA_ADDRESS "10.129.6.5:9092"
#define ARBITER_READ_MESSAGE_GAP_MS 10
#define ARBITER_GRUNT_TIME_TO_DIE_MS 10000
#define ARBITER_DISPATCHER_RESPONSE_WAIT_TIME_MS 100
#define logMessage(s, ...) printf(s "\n", ##__VA_ARGS__)
#define logError(s, ...) fprintf(stderr, s "\n", ##__VA_ARGS__)
#include "nlib/nlib.h"
......@@ -41,6 +48,7 @@ typedef struct Command {
Command_NONE,
Command_RESPONSE_ARBITER_2_DM,
Command_REQUEST_ARBITER_2_GRUNT,
Command_JOIN_ACK_ARBITER_2_GRUNT,
Command_REJOIN_ARBITER_2_GRUNT,
} kind;
......@@ -58,6 +66,10 @@ typedef struct Command {
struct {
Char *grunt_id;
} rejoin_a2g;
struct {
Char *grunt_id;
} join_ack_a2g;
};
} Command;
......@@ -137,7 +149,7 @@ Sint main (Sint argc, Char *argv[])
Kafka kafka = {0};
kafkaCreateWriter(&kafka, "10.129.6.5:9092");
kafkaCreateWriter(&kafka, KAFKA_ADDRESS);
#define CREATE_TOPIC(s) \
do { \
......@@ -153,11 +165,12 @@ Sint main (Sint argc, Char *argv[])
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("JOIN_ACK_RM_2_RD"); //
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); //
#undef CREATE_TOPIC
kafkaCreateReader(&kafka, "10.129.6.5:9092");
kafkaCreateReader(&kafka, KAFKA_ADDRESS);
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
......@@ -174,10 +187,14 @@ Sint main (Sint argc, Char *argv[])
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader,
kafka_reader_topics);
logMessage("Subscription finished\n");
fflush(stdout);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
if (kafka_reader_topics_err) {
fprintf(stderr, "Subscribe failed: %s\n",
logError("Subscribe failed: %s\n",
rd_kafka_err2str(kafka_reader_topics_err));
rd_kafka_destroy(kafka.reader);
return -1;
......@@ -188,7 +205,8 @@ Sint main (Sint argc, Char *argv[])
U64 time_passed_last = timeMilli();
while (global_keep_running) {
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 0);
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader,
ARBITER_READ_MESSAGE_GAP_MS);
U64 time_passed_now = timeMilli();
U64 time_passed = time_passed_now - time_passed_last;
......@@ -203,7 +221,7 @@ Sint main (Sint argc, Char *argv[])
if (kafka_message_read != NULL) {
if (kafka_message_read->err) {
/* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n",
logError("Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read));
} else {
/* Proper message */
......@@ -219,20 +237,18 @@ Sint main (Sint argc, Char *argv[])
if ((cJSON_GetObjectItem(root, "timestamp") == NULL) ||
(cJSON_GetObjectItem(root, "timestamp")->valueint) < time_of_launch) {
printf("Ignoring : %s\n", kafka_message_read->payload);
logMessage("Ignoring : %s\n", kafka_message_read->payload);
cJSON_Delete(root);
rd_kafka_message_destroy(kafka_message_read);
continue;
}
if (kafka_message_read->rkt == topic_req_dm2a) {
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = strdup(cJSON_GetObjectItem(root, "resource_id")->valuestring);
// TODO(naman): Add any new resource fields here
Sint memory = cJSON_GetObjectItem(root, "memory")->valueint;
logMessage("Request DM2RM:\tid: %s = ([memory] = %d)",
c.resource_id, memory);
......@@ -261,20 +277,24 @@ Sint main (Sint argc, Char *argv[])
}
} else if (kafka_message_read->rkt == topic_join_g2a) {
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
Grunt grunt = {.id = id, .time_to_die = 2000};
Grunt grunt = {.id = strdup(id), .time_to_die = ARBITER_GRUNT_TIME_TO_DIE_MS};
logMessage("Join G2A:\tid: %s", id);
if (htLookup(&gt.map, hashString(id)) == 0) {
gruntTrackBegin(&gt, grunt);
}
Command c = {.kind = Command_JOIN_ACK_ARBITER_2_GRUNT,
.join_ack_a2g.grunt_id = strdup(id)};
sbufAdd(commands, c);
} else if (kafka_message_read->rkt == topic_beat_g2a) {
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
U64 index = htLookup(&gt.map, hashString(id));
if (index != 0) { // Prevent any left over message
// TODO(naman): Add any new resource fields here
gt.grunts[index].time_to_die = 2000;
gt.grunts[index].time_to_die = ARBITER_GRUNT_TIME_TO_DIE_MS;
gt.grunts[index].memory = cJSON_GetObjectItem(root, "memory")->valueint;
logMessage("Beat G2A:\tid: %s (Memory %d)", id, gt.grunts[index].memory);
......@@ -308,7 +328,7 @@ Sint main (Sint argc, Char *argv[])
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
Char *function_id = cJSON_GetObjectItem(root, "function_id")->valuestring;
printf("%s\n", cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
logMessage("Log: %s\n", cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
unused_variable(node_id);
unused_variable(resource_id);
unused_variable(function_id);
......@@ -327,7 +347,7 @@ Sint main (Sint argc, Char *argv[])
Size index = gt.map.values[j];
Grunt g = gt.grunts[index];
if (g.time_to_die <= 0) {
printf("Deleting grunt: %s\n", g.id);
logMessage("Deleting grunt: %s\n", g.id);
gruntTrackEnd(&gt, g.id);
}
}
......@@ -340,7 +360,7 @@ Sint main (Sint argc, Char *argv[])
U64 milli_new = timeMilli();
gs->milli_passed += milli_new - gs->milli_last;
gs->milli_last = milli_new;
if (gs->milli_passed >= 2000) {
if (gs->milli_passed >= ARBITER_DISPATCHER_RESPONSE_WAIT_TIME_MS) {
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = gs->resource_id;
......@@ -390,6 +410,14 @@ Sint main (Sint argc, Char *argv[])
}
sbufPrint(output, "]");
sbufPrint(output, "\n}");
} else if (c.kind == Command_JOIN_ACK_ARBITER_2_GRUNT) {
topic = "JOIN_ACK_RM_2_RD";
sbufPrint(output, "{\n\"node_id\": \"%s\"", c.join_ack_a2g.grunt_id);
sbufPrint(output, ",\n\"timestamp\": %d", timestamp);
sbufPrint(output, "\n}");
free(c.join_ack_a2g.grunt_id);
} else if (c.kind == Command_REJOIN_ARBITER_2_GRUNT) {
topic = "REJOIN_RM_2_RD";
......
......@@ -3,6 +3,10 @@
* Notice: © Copyright 2020 Naman Dixit
*/
#define KAFKA_ADDRESS "10.129.6.5:9092"
#define GRUNT_READ_MESSAGE_GAP_MS 10
#define GRUNT_HEARTBEAT_TIME_GAP_MS 1000
#include "nlib/nlib.h"
#include <stdio.h>
......@@ -109,7 +113,7 @@ int main(int argc, char** argv)
Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
kafka.writer = kafkaCreateWriter(&kafka, KAFKA_ADDRESS);
#define CREATE_TOPIC(s) \
do { \
......@@ -125,23 +129,34 @@ int main(int argc, char** argv)
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("JOIN_ACK_RM_2_RD"); //
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
kafka.reader = kafkaCreateReader(&kafka, KAFKA_ADDRESS);
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_t *topic_req_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REQUEST_RM_2_RD");
rd_kafka_topic_t *topic_jac_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"JOIN_ACK_RM_2_RD");
rd_kafka_topic_t *topic_rej_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REJOIN_RM_2_RD");
rd_kafka_topic_t *topic_log = kafkaSubscribe(&kafka, kafka_reader_topics,
"LOG_COMMON");
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader,
kafka_reader_topics);
printf("Subscription finished\n");
fflush(stdout);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
printf("Partition list destroyed\n");
fflush(stdout);
if (kafka_reader_topics_err) {
fprintf(stderr, "Subscribe failed: %s\n",
rd_kafka_err2str(kafka_reader_topics_err));
......@@ -161,6 +176,8 @@ int main(int argc, char** argv)
}
}
B32 join_successful = false;
U64 time_begin = timeMilli();
U64 time_accum = 0;
......@@ -168,7 +185,8 @@ int main(int argc, char** argv)
while (global_keep_running) {
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 100);
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader,
GRUNT_READ_MESSAGE_GAP_MS);
B32 command_found = false;
Command c = {0};
......@@ -198,7 +216,10 @@ int main(int argc, char** argv)
/* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read));
} else if (kafka_message_read->rkt == topic_req_a2g) {
} else if (kafka_message_read->rkt == topic_jac_a2g) {
join_successful = true;
} else if (join_successful) {
if (kafka_message_read->rkt == topic_req_a2g) {
if (root == NULL) {
// TODO(naman): Error
} else {
......@@ -207,6 +228,11 @@ int main(int argc, char** argv)
c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint;
}
} else if (kafka_message_read->rkt == topic_rej_a2g) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
if (strequal(node_id, node_name)) {
join_successful = false;
Sint timestamp = (Sint)time(0);
Char *rejoin_msg = NULL;
sbufPrint(rejoin_msg, "{\"node_id\": \"%s\"", node_name);
......@@ -216,6 +242,7 @@ int main(int argc, char** argv)
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", rejoin_msg)) {
return -1;
}
}
} else if (kafka_message_read->rkt == topic_log) {
cJSON *msg_type_json = cJSON_GetObjectItem(root, "message_type");
if (msg_type_json == NULL) {
......@@ -245,9 +272,11 @@ int main(int argc, char** argv)
}
}
}
}
rd_kafka_message_destroy(kafka_message_read);
}
if (join_successful) {
int memory = 0;
FILE *meminfo = fopen("/proc/meminfo", "r");
......@@ -300,7 +329,7 @@ int main(int argc, char** argv)
time_begin = time_new;
time_accum += time_passed;
if (time_accum >= 1000) {
if (time_accum >= GRUNT_HEARTBEAT_TIME_GAP_MS) {
time_accum = 0;
Char *output = NULL;
......@@ -318,6 +347,7 @@ int main(int argc, char** argv)
}
}
}
}
for (Size i = 0; i < sbufElemin(kafka.topics); i++) {
rd_kafka_topic_destroy(kafka.topics[i]);
......
......@@ -91,11 +91,13 @@ int main(int argc, char** argv)
printf("Sending to Arbiter:\n%s\n", cJSON_Print(cJSON_Parse(output)));
printf("%ld\n", time(0));
if (output != NULL) {
if (!kafkaWrite(kafka.writer, "REQUEST_DM_2_RM", "rm_test", output)) {
return -1;
}
}
printf("%ld\n", time(0));
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 10);
while (true) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment