You need to sign in or sign up before continuing.
Commit 41213d30 authored by Naman Dixit's avatar Naman Dixit

Changed RS to reflect new nomenclature and JSON schema

parent e2983232
This diff is collapsed.
...@@ -106,22 +106,6 @@ rd_kafka_t* kafkaCreateWriter (Kafka *kafka, Char *address) ...@@ -106,22 +106,6 @@ rd_kafka_t* kafkaCreateWriter (Kafka *kafka, Char *address)
printf("Ading brokers to writer\n"); printf("Ading brokers to writer\n");
rd_kafka_brokers_add(kafka->writer, address); rd_kafka_brokers_add(kafka->writer, address);
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka->writer); \
return NULL; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DISPATCHER_2_ARBITER"); //
CREATE_TOPIC("RESPONSE_ARBITER_2_DISPATCHER");
CREATE_TOPIC("REQUEST_ARBITER_2_GRUNT");
CREATE_TOPIC("RESPONSE_GRUNT_2_ARBITER"); //
CREATE_TOPIC("JOIN_GRUNT_2_ARBITER"); //
CREATE_TOPIC("HEARTBEAT_GRUNT_2_ARBITER"); //
#undef CREATE_TOPIC
return kafka->writer; return kafka->writer;
} }
......
...@@ -56,20 +56,44 @@ void signalHandlerSIGINT (int _) ...@@ -56,20 +56,44 @@ void signalHandlerSIGINT (int _)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
unused_variable(argc); Char *node_name = NULL;
unused_variable(argv);
if (argc > 1) {
node_name = argv[1];
} else {
Char hostname[1024] = {0};
gethostname(hostname, 1023);
sbufPrint(node_name, "%s", hostname);
}
signal(SIGINT, signalHandlerSIGINT); signal(SIGINT, signalHandlerSIGINT);
Kafka kafka = {0}; Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092"); kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(&kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka.writer); \
return -1; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DM_2_RM"); //
CREATE_TOPIC("RESPONSE_RM_2_DM");
CREATE_TOPIC("REQUEST_RM_2_RD");
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092"); kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_t *topic_req_a2g = kafkaSubscribe(&kafka, kafka_reader_topics, rd_kafka_topic_t *topic_req_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REQUEST_ARBITER_2_GRUNT"); "REQUEST_RM_2_RD");
unused_variable(topic_req_a2g); unused_variable(topic_req_a2g);
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics); rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
...@@ -83,10 +107,10 @@ int main(int argc, char** argv) ...@@ -83,10 +107,10 @@ int main(int argc, char** argv)
} }
Char *join_msg = NULL; Char *join_msg = NULL;
sbufPrint(join_msg, "{\"id\": \"my-machine\""); sbufPrint(join_msg, "{\"node_id\": \"%s\"", node_name);
sbufPrint(join_msg, "\n}\n"); sbufPrint(join_msg, "\n}\n");
if (!kafkaWrite(kafka.writer, "JOIN_GRUNT_2_ARBITER", "rm_grunt", join_msg)) { if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) {
return -1; return -1;
} }
...@@ -122,7 +146,7 @@ int main(int argc, char** argv) ...@@ -122,7 +146,7 @@ int main(int argc, char** argv)
// TODO(naman): Error // TODO(naman): Error
} else { } else {
command_found = true; command_found = true;
c.txn_id = cJSON_GetObjectItem(root, "id")->valuestring; c.txn_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint; c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint;
} }
} }
...@@ -144,22 +168,25 @@ int main(int argc, char** argv) ...@@ -144,22 +168,25 @@ int main(int argc, char** argv)
if (command_found) { if (command_found) {
Char *output = NULL; Char *output = NULL;
sbufPrint(output, "{\n\"id\": \"%s\"", c.txn_id); sbufPrint(output, "{\n\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"resource_id\": \"%s\"", c.txn_id);
if (memory >= c.res.memory) { if (memory >= c.res.memory) {
sbufPrint(output, ",\n\"success\": %d\n", 1); sbufPrint(output, ",\n\"success\": true\n");
// TODO(naman): Add port // TODO(naman): Add port
// sbufPrint(output, ",\n\"port\": %d\n", port); // sbufPrint(output, ",\n\"port\": %d\n", port);
} else { } else {
sbufPrint(output, ",\n\"success\": %d\n", 0); sbufPrint(output, ",\n\"success\": false\n");
} }
sbufPrint(output, "\n}\n"); sbufPrint(output, "\n}\n");
if (!kafkaWrite(kafka.writer, "RESPONSE_GRUNT_2_ARBITER", "rm_grunt", output)) { if (!kafkaWrite(kafka.writer, "RESPONSE_RD_2_RM", "resource_daemon", output)) {
return -1; return -1;
} }
} else { // Send a heartbeat message if it is time to do so }
{ // Send a heartbeat message if it is time to do so
U64 time_new = timeMilli(); U64 time_new = timeMilli();
U64 time_passed = time_new - time_begin; U64 time_passed = time_new - time_begin;
time_begin = time_new; time_begin = time_new;
...@@ -170,12 +197,12 @@ int main(int argc, char** argv) ...@@ -170,12 +197,12 @@ int main(int argc, char** argv)
Char *output = NULL; Char *output = NULL;
sbufPrint(output, "{\"id\": \"my-machine\""); sbufPrint(output, "{\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"memory\": %d", memory); sbufPrint(output, ",\n\"memory\": %d", memory);
sbufPrint(output, "\n}\n"); sbufPrint(output, "\n}\n");
if (!kafkaWrite(kafka.writer, "HEARTBEAT_GRUNT_2_ARBITER", "rm_grunt", output)) { if (!kafkaWrite(kafka.writer, "HEARTBEAT_RD_2_RM", "resource_daemon", output)) {
return -1; return -1;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment