Commit 985f4ec2 authored by Naman Dixit's avatar Naman Dixit

Fixed a few bugs

parent 051a9cc2
...@@ -250,7 +250,7 @@ Sint main (Sint argc, Char *argv[]) ...@@ -250,7 +250,7 @@ Sint main (Sint argc, Char *argv[])
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring; Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring; Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
Char *function_id = cJSON_GetObjectItem(root, "function_id")->valuestring; Char *function_id = cJSON_GetObjectItem(root, "function_id")->valuestring;
printf("%s\n", cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
unused_variable(node_id); unused_variable(node_id);
unused_variable(resource_id); unused_variable(resource_id);
unused_variable(function_id); unused_variable(function_id);
...@@ -295,15 +295,15 @@ Sint main (Sint argc, Char *argv[]) ...@@ -295,15 +295,15 @@ Sint main (Sint argc, Char *argv[])
Char *topic = NULL; Char *topic = NULL;
if (c.kind == Command_REQUEST_ARBITER_2_GRUNT) { if (c.kind == Command_REQUEST_ARBITER_2_GRUNT) {
topic = "REQUEST_ARBITER_2_GRUNT"; topic = "REQUEST_RM_2_RD";
sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id); sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"memory\": %d\n", c.req_a2g.memory); sbufPrint(output, ",\n\"memory\": %d\n", c.req_a2g.memory);
sbufPrint(output, "\n}\n"); sbufPrint(output, "\n}\n");
} else if (c.kind == Command_RESPONSE_ARBITER_2_DM) { } else if (c.kind == Command_RESPONSE_ARBITER_2_DM) {
topic = "RESPONSE_ARBITER_2_DISPATCHER"; topic = "RESPONSE_RM_2_DM";
sbufPrint(output, "{\n\"id\": \"%s\"", c.resource_id); sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"nodes\": ["); sbufPrint(output, ",\n\"nodes\": [");
for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) { for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) {
sbufPrint(output, "\"%s\"", c.res_a2d.grunt_ids[k]); sbufPrint(output, "\"%s\"", c.res_a2d.grunt_ids[k]);
...@@ -321,6 +321,7 @@ Sint main (Sint argc, Char *argv[]) ...@@ -321,6 +321,7 @@ Sint main (Sint argc, Char *argv[])
return -1; return -1;
} }
} }
sbufDelete(output); sbufDelete(output);
free(c.resource_id); free(c.resource_id);
sbufUnsortedRemove(commands, j); sbufUnsortedRemove(commands, j);
......
...@@ -48,11 +48,28 @@ int main(int argc, char** argv) ...@@ -48,11 +48,28 @@ int main(int argc, char** argv)
Kafka kafka = {0}; Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092"); kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(&kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka.writer); \
return -1; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DM_2_RM"); //
CREATE_TOPIC("RESPONSE_RM_2_DM");
CREATE_TOPIC("REQUEST_RM_2_RD");
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092"); kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
kafkaSubscribe(&kafka, kafka_reader_topics, "RESPONSE_ARBITER_2_DISPATCHER"); kafkaSubscribe(&kafka, kafka_reader_topics, "RESPONSE_RM_2_DM");
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics); rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics); rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
...@@ -67,14 +84,14 @@ int main(int argc, char** argv) ...@@ -67,14 +84,14 @@ int main(int argc, char** argv)
Char *output = NULL; Char *output = NULL;
Sint id = (Sint)time(NULL); Sint id = (Sint)time(NULL);
sbufPrint(output, "{\n\"id\": \"%d\"", id); sbufPrint(output, "{\n\"resource_id\": \"%d\"", id);
sbufPrint(output, ",\n\"memory\": %d", memory_required); sbufPrint(output, ",\n\"memory\": %d", memory_required);
sbufPrint(output, "\n}\n"); sbufPrint(output, "\n}\n");
printf("Sending to Arbiter:\n%s\n", cJSON_Print(cJSON_Parse(output))); printf("Sending to Arbiter:\n%s\n", cJSON_Print(cJSON_Parse(output)));
if (output != NULL) { if (output != NULL) {
if (!kafkaWrite(kafka.writer, "REQUEST_DISPATCHER_2_ARBITER", "rm_test", output)) { if (!kafkaWrite(kafka.writer, "REQUEST_DM_2_RM", "rm_test", output)) {
return -1; return -1;
} }
} }
...@@ -116,7 +133,7 @@ int main(int argc, char** argv) ...@@ -116,7 +133,7 @@ int main(int argc, char** argv)
if (root == NULL) { if (root == NULL) {
// TODO(naman): Error // TODO(naman): Error
} else { } else {
cJSON *array = cJSON_GetObjectItem(root, "id"); cJSON *array = cJSON_GetObjectItem(root, "resource_id");
cJSON *elem = NULL; cJSON *elem = NULL;
cJSON_ArrayForEach(elem, array) { cJSON_ArrayForEach(elem, array) {
printf("%s\n", elem->valuestring); printf("%s\n", elem->valuestring);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment