Commit 5a17b288 authored by Naman Dixit's avatar Naman Dixit

Added timestamps, started implementing the instrumentation dumper

parent b849dcc9
...@@ -55,17 +55,8 @@ typedef struct Thread_Manager_Command { ...@@ -55,17 +55,8 @@ typedef struct Thread_Manager_Command {
Thread_Manager_Command_DOCKER_DESTROY, Thread_Manager_Command_DOCKER_DESTROY,
} kind; } kind;
Char *id; Char *entity_id;
Char *resource_id;
union {
struct {
Sint placeholder;
} docker_create;
struct {
Sint placeholder;
} docker_destroy;
};
} Thread_Manager_Command; } Thread_Manager_Command;
typedef struct JSON_Print_Command { typedef struct JSON_Print_Command {
...@@ -158,17 +149,23 @@ int main(int argc, char** argv) ...@@ -158,17 +149,23 @@ int main(int argc, char** argv)
return -1; return -1;
} }
Char *join_msg = NULL; {
sbufPrint(join_msg, "{\"node_id\": \"%s\"", node_name); Sint timestamp = (Sint)time(0);
sbufPrint(join_msg, "\n}\n"); Char *join_msg = NULL;
sbufPrint(join_msg, "{\"node_id\": \"%s\"", node_name);
sbufPrint(join_msg, ",\n\"timestamp\": %d\n", timestamp);
sbufPrint(join_msg, "\n}\n");
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) { if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) {
return -1; return -1;
}
} }
U64 time_begin = timeMilli(); U64 time_begin = timeMilli();
U64 time_accum = 0; U64 time_accum = 0;
Sint time_of_launch = (Sint)time(0);
while (global_keep_running) { while (global_keep_running) {
// NOTE(naman): Get the fd's that are ready // NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 100); rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 100);
...@@ -177,23 +174,31 @@ int main(int argc, char** argv) ...@@ -177,23 +174,31 @@ int main(int argc, char** argv)
Command c = {0}; Command c = {0};
if (kafka_message_read != NULL) { if (kafka_message_read != NULL) {
fprintf(stderr,
"Received message on %s [%d] "
"at offset %"PRId64": \n%s\n",
rd_kafka_topic_name(kafka_message_read->rkt),
(int)kafka_message_read->partition, kafka_message_read->offset,
cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
char *buffer = (char *)kafka_message_read->payload;
const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true);
if ((cJSON_GetObjectItem(root, "timestamp") == NULL) ||
(cJSON_GetObjectItem(root, "timestamp")->valueint) < time_of_launch) {
printf("Ignoring : %s\n", buffer);
cJSON_Delete(root);
rd_kafka_message_destroy(kafka_message_read);
continue;
}
if (kafka_message_read->err) { if (kafka_message_read->err) {
/* Consumer error: typically just informational. */ /* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n", fprintf(stderr, "Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read)); rd_kafka_message_errstr(kafka_message_read));
} else if (kafka_message_read->rkt == topic_req_a2g) { } else if (kafka_message_read->rkt == topic_req_a2g) {
fprintf(stderr,
"Received message on %s [%d] "
"at offset %"PRId64": \n%s\n",
rd_kafka_topic_name(kafka_message_read->rkt),
(int)kafka_message_read->partition, kafka_message_read->offset,
cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
char *buffer = (char *)kafka_message_read->payload;
const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true);
if (root == NULL) { if (root == NULL) {
// TODO(naman): Error // TODO(naman): Error
} else { } else {
...@@ -202,37 +207,43 @@ int main(int argc, char** argv) ...@@ -202,37 +207,43 @@ int main(int argc, char** argv)
c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint; c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint;
} }
} else if (kafka_message_read->rkt == topic_rej_a2g) { } else if (kafka_message_read->rkt == topic_rej_a2g) {
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) { Sint timestamp = (Sint)time(0);
Char *rejoin_msg = NULL;
sbufPrint(rejoin_msg, "{\"node_id\": \"%s\"", node_name);
sbufPrint(rejoin_msg, ",\n\"timestamp\": %d\n", timestamp);
sbufPrint(rejoin_msg, "\n}\n");
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", rejoin_msg)) {
return -1; return -1;
} }
} else if (kafka_message_read->rkt == topic_log) { } else if (kafka_message_read->rkt == topic_log) {
fprintf(stderr, cJSON *msg_type_json = cJSON_GetObjectItem(root, "message_type");
"Received message on %s [%d] " if (msg_type_json == NULL) {
"at offset %"PRId64": \n%s\n", if (strequal(msg_type_json->valuestring, "deployment_launch")) {
rd_kafka_topic_name(kafka_message_read->rkt), Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
(int)kafka_message_read->partition, kafka_message_read->offset, if (strequal(node_id, node_name)) {
cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload))); Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
Char *entity_id = cJSON_GetObjectItem(root, "entity_id")->valuestring;
char *buffer = (char *)kafka_message_read->payload; Char *entity_type = cJSON_GetObjectItem(root, "entity_type")->valuestring;
const Char *json_error = NULL; Thread_Manager_Command tmc = {.entity_id = strdup(entity_id),
cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true); .resource_id = strdup(resource_id)};
B32 add_command = false;
if (root == NULL) {
// TODO(naman): Error if (strequal(entity_type, "docker")) {
} else { tmc.kind = Thread_Manager_Command_DOCKER_CREATE;
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring; add_command = true;
if (strequal(node_id, node_name)) { }
// FIXME(naman): Fix this placeholder
Thread_Manager_Command tmc = {0}; if (add_command) {
tmCommandEnqueue(tmc); tmCommandEnqueue(tmc);
/* "resource_id": "logical-entity-id", */ } else {
/* "function_id": "unique-function-id", */ free(tmc.entity_id);
/* "timestamp" : "iso-8601-timestamp", */ free(tmc.resource_id);
/* "reason": "deployment"/"termination", */ }
/* "status": true/false // Only valid if reason==deployment; */ }
} }
} }
} }
rd_kafka_message_destroy(kafka_message_read); rd_kafka_message_destroy(kafka_message_read);
} }
...@@ -252,8 +263,11 @@ int main(int argc, char** argv) ...@@ -252,8 +263,11 @@ int main(int argc, char** argv)
if (command_found) { if (command_found) {
Char *output = NULL; Char *output = NULL;
Sint timestamp = (Sint)time(0);
sbufPrint(output, "{\n\"node_id\": \"%s\"", node_name); sbufPrint(output, "{\n\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"resource_id\": \"%s\"", c.txn_id); sbufPrint(output, ",\n\"resource_id\": \"%s\"", c.txn_id);
sbufPrint(output, ",\n\"timestamp\": %d\n", timestamp);
if (memory >= c.res.memory) { if (memory >= c.res.memory) {
sbufPrint(output, ",\n\"success\": true\n"); sbufPrint(output, ",\n\"success\": true\n");
...@@ -290,8 +304,10 @@ int main(int argc, char** argv) ...@@ -290,8 +304,10 @@ int main(int argc, char** argv)
time_accum = 0; time_accum = 0;
Char *output = NULL; Char *output = NULL;
Sint timestamp = (Sint)time(0);
sbufPrint(output, "{\"node_id\": \"%s\"", node_name); sbufPrint(output, "{\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"timestamp\": %d\n", timestamp);
sbufPrint(output, ",\n\"memory\": %d", memory); sbufPrint(output, ",\n\"memory\": %d", memory);
sbufPrint(output, "\n}\n"); sbufPrint(output, "\n}\n");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment