Commit 10c76714 authored by Naman Dixit's avatar Naman Dixit

Added Kafka support

parent 64418d8e
...@@ -38,8 +38,7 @@ fi ...@@ -38,8 +38,7 @@ fi
# For Address Sanitizer: -fsanitize=address -fno-omit-frame-pointer # For Address Sanitizer: -fsanitize=address -fno-omit-frame-pointer
# Memory Sanitizer : -fsanitize=memory -fno-optimize-sibling-calls -fno-omit-frame-pointer -fsanitize-memory-track-origins # Memory Sanitizer : -fsanitize=memory -fno-optimize-sibling-calls -fno-omit-frame-pointer -fsanitize-memory-track-origins
ArbiterCompilerFlags="-iquote /code/include -iquote ${ProjectRoot}/src \ ArbiterCompilerFlags="-iquote ${ProjectRoot}/src/common \
-iquote ${ProjectRoot}/src/common \
-g3 -O0 -fno-strict-aliasing -fwrapv -msse2 \ -g3 -O0 -fno-strict-aliasing -fwrapv -msse2 \
" "
ArbiterLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \ ArbiterLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \
...@@ -47,10 +46,10 @@ ArbiterLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \ ...@@ -47,10 +46,10 @@ ArbiterLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \
-D_POSIX_C_SOURCE=200809L -D_DEFAULT_SOURCE" -D_POSIX_C_SOURCE=200809L -D_DEFAULT_SOURCE"
ArbiterWarningFlags="-Weverything -Wpedantic -pedantic-errors -Werror \ ArbiterWarningFlags="-Weverything -Wpedantic -pedantic-errors -Werror \
-Wno-c++98-compat -Wno-gnu-statement-expression \ -Wno-c++98-compat -Wno-gnu-statement-expression \
-Wno-bad-function-cast -Wno-unused-function \ -Wno-bad-function-cast -Wno-used-but-marked-unused \
-Wno-padded " -Wno-padded -Wno-gnu-zero-variadic-macro-arguments "
ArbiterLinkerFlags="-o ${ArbiterTargetPath} \ ArbiterLinkerFlags="-o ${ArbiterTargetPath} \
-static-libgcc -lm -pthread \ -static-libgcc -lm -pthread -lrdkafka \
-Wl,-rpath=\${ORIGIN} -Wl,-z,origin -Wl,--enable-new-dtags" -Wl,-rpath=\${ORIGIN} -Wl,-z,origin -Wl,--enable-new-dtags"
${Compiler} ${ArbiterCompilerFlags} ${ArbiterLanguageFlags} ${ArbiterWarningFlags} \ ${Compiler} ${ArbiterCompilerFlags} ${ArbiterLanguageFlags} ${ArbiterWarningFlags} \
...@@ -70,8 +69,7 @@ fi ...@@ -70,8 +69,7 @@ fi
# For Address Sanitizer: -fsanitize=address -fno-omit-frame-pointer # For Address Sanitizer: -fsanitize=address -fno-omit-frame-pointer
# Memory Sanitizer : -fsanitize=memory -fno-optimize-sibling-calls -fno-omit-frame-pointer -fsanitize-memory-track-origins # Memory Sanitizer : -fsanitize=memory -fno-optimize-sibling-calls -fno-omit-frame-pointer -fsanitize-memory-track-origins
GruntCompilerFlags="-iquote /code/include -iquote ${ProjectRoot}/src \ GruntCompilerFlags="-iquote ${ProjectRoot}/src/common \
-iquote ${ProjectRoot}/src/common \
-g3 -O0 -fno-strict-aliasing -fwrapv -msse2 \ -g3 -O0 -fno-strict-aliasing -fwrapv -msse2 \
" "
GruntLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \ GruntLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \
...@@ -79,10 +77,10 @@ GruntLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \ ...@@ -79,10 +77,10 @@ GruntLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \
-D_POSIX_C_SOURCE=200809L -D_DEFAULT_SOURCE" -D_POSIX_C_SOURCE=200809L -D_DEFAULT_SOURCE"
GruntWarningFlags="-Weverything -Wpedantic -pedantic-errors -Werror \ GruntWarningFlags="-Weverything -Wpedantic -pedantic-errors -Werror \
-Wno-c++98-compat -Wno-gnu-statement-expression \ -Wno-c++98-compat -Wno-gnu-statement-expression \
-Wno-bad-function-cast -Wno-unused-function \ -Wno-bad-function-cast -Wno-used-but-marked-unused \
-Wno-padded " -Wno-padded "
GruntLinkerFlags="-o ${GruntTargetPath} \ GruntLinkerFlags="-o ${GruntTargetPath} \
-static-libgcc -lm -pthread \ -static-libgcc -lm -pthread -lrdkafka \
-Wl,-rpath=\${ORIGIN} -Wl,-z,origin -Wl,--enable-new-dtags" -Wl,-rpath=\${ORIGIN} -Wl,-z,origin -Wl,--enable-new-dtags"
${Compiler} ${GruntCompilerFlags} ${GruntLanguageFlags} ${GruntWarningFlags} \ ${Compiler} ${GruntCompilerFlags} ${GruntLanguageFlags} ${GruntWarningFlags} \
...@@ -111,10 +109,10 @@ TestLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \ ...@@ -111,10 +109,10 @@ TestLanguageFlags="--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -DBUILD_DEBUG \
-D_POSIX_C_SOURCE=200809L -D_DEFAULT_SOURCE" -D_POSIX_C_SOURCE=200809L -D_DEFAULT_SOURCE"
TestWarningFlags="-Weverything -Wpedantic -pedantic-errors -Werror \ TestWarningFlags="-Weverything -Wpedantic -pedantic-errors -Werror \
-Wno-c++98-compat -Wno-gnu-statement-expression \ -Wno-c++98-compat -Wno-gnu-statement-expression \
-Wno-bad-function-cast -Wno-unused-function \ -Wno-bad-function-cast -Wno-used-but-marked-unused \
-Wno-padded " -Wno-padded "
TestLinkerFlags="-o ${TestTargetPath} \ TestLinkerFlags="-o ${TestTargetPath} \
-static-libgcc -lm -pthread \ -static-libgcc -lm -pthread -lrdkafka \
-Wl,-rpath=\${ORIGIN} -Wl,-z,origin -Wl,--enable-new-dtags" -Wl,-rpath=\${ORIGIN} -Wl,-z,origin -Wl,--enable-new-dtags"
${Compiler} ${TestCompilerFlags} ${TestLanguageFlags} ${TestWarningFlags} \ ${Compiler} ${TestCompilerFlags} ${TestLanguageFlags} ${TestWarningFlags} \
......
This diff is collapsed.
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef enum Socket_Kind {
Socket_Kind_NONE,
Socket_Kind_INTERNAL,
Socket_Kind_EXTERNAL,
} Socket_Kind;
internal_function
Sint socketCreateListener (Char *port)
{
printf("Openiing socket on port %s\n", port);
// NOTE(naman): Create a socket for IPv4 and TCP.
Sint sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd < 0) {
perror("ERROR opening socket");
exit(-1);
}
// NOTE(naman): This helps avoid spurious EADDRINUSE when the previous instance of this
// server died.
int opt = 1;
if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror("setsockopt");
exit(-1);
}
// NOTE(naman): Get actual internet address to bind to using IPv4 and TCP,
// and listening passively
struct addrinfo hints = {.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM,
.ai_flags = AI_PASSIVE};
struct addrinfo *addrinfo = NULL;
Sint s = getaddrinfo(NULL, port, &hints, &addrinfo);
if (s != 0) {
fprintf(stderr, "Error: getaddrinfo: %s\n", gai_strerror(s));
exit(-1);
}
// NOTE(naman): Assign an address to the socket
if (bind(sock_fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
perror("bind()");
exit(-1);
}
// NOTE(naman): Start listening for incoming connections
if (listen(sock_fd, MAX_SOCKET_CONNECTIONS_REQUEST) != 0) {
perror("listen()");
exit(-1);
}
// NOTE(naman): Set the socket as non-blocking
int flags = fcntl(sock_fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl F_GETFL");
exit(-1);
}
if (fcntl(sock_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL O_NONBLOCK");
exit(-1);
}
printf("Log: Waiting for connection on port %s...\n", port);
return sock_fd;
}
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Kafka {
rd_kafka_t *writer;
rd_kafka_t *reader;
rd_kafka_queue_t **queues;
rd_kafka_topic_t **topics;
} Kafka;
header_function
int kafkaCreateTopic (Kafka *kafka, Char *topic,
Sint num_partitions, Sint replication_factor)
{
char errstr[256];
rd_kafka_NewTopic_t *new_topic = rd_kafka_NewTopic_new(topic,
num_partitions,
replication_factor,
errstr, sizeof(errstr));
if (!new_topic) {
fprintf(stderr, "Failed to create NewTopic object: %s\n",
errstr);
return -1;
}
/* Use a temporary queue for the asynchronous Admin result */
rd_kafka_queue_t *queue = rd_kafka_queue_new(kafka->writer);
sbufAdd(kafka->queues, queue);
/* Asynchronously create topic, result will be available on queue */
rd_kafka_CreateTopics(kafka->writer, &new_topic, 1, NULL, queue);
rd_kafka_NewTopic_destroy(new_topic);
/* Wait for result event */
rd_kafka_event_t *event = rd_kafka_queue_poll(queue, 15*1000);
if (!event) {
/* There will eventually be a result, after operation
* and request timeouts, but in this example we'll only
* wait 15s to avoid stalling too long when cluster
* is not available. */
fprintf(stderr, "No create topics result in 15s\n");
return -1;
}
if (rd_kafka_event_error(event)) {
/* Request-level failure */
fprintf(stderr, "Create topics request failed: %s\n",
rd_kafka_event_error_string(event));
rd_kafka_event_destroy(event);
return -1;
}
/* Extract the result type from the event. */
const rd_kafka_CreateTopics_result_t *result = rd_kafka_event_CreateTopics_result(event);
assert(result); /* Since we're using a dedicated queue we know this is
* a CreateTopics result type. */
/* Extract the per-topic results from the result type. */
size_t result_topics_count;
const rd_kafka_topic_result_t **result_topics = rd_kafka_CreateTopics_result_topics(result,
&result_topics_count);
assert(result_topics && result_topics_count == 1);
int return_value = 0;
if (rd_kafka_topic_result_error(result_topics[0]) ==
RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) {
fprintf(stderr, "Topic %s already exists\n",
rd_kafka_topic_result_name(result_topics[0]));
} else if (rd_kafka_topic_result_error(result_topics[0])) {
fprintf(stderr, "Failed to create topic %s: %s\n",
rd_kafka_topic_result_name(result_topics[0]),
rd_kafka_topic_result_error_string(result_topics[0]));
return_value = -1;
} else {
fprintf(stderr, "Topic %s successfully created\n",
rd_kafka_topic_result_name(result_topics[0]));
}
rd_kafka_event_destroy(event);
return return_value;
}
header_function
rd_kafka_t* kafkaCreateWriter (Kafka *kafka, Char *address)
{
char errstr[512] = {0};
printf("Creating writer conf\n");
rd_kafka_conf_t *kafka_writer_conf = rd_kafka_conf_new();
rd_kafka_conf_set_dr_msg_cb(kafka_writer_conf, NULL);
printf("Creating writer\n");
kafka->writer = rd_kafka_new(RD_KAFKA_PRODUCER, kafka_writer_conf,
errstr, sizeof(errstr));
if (!kafka->writer) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
rd_kafka_conf_destroy(kafka_writer_conf);
return NULL;
}
printf("Ading brokers to writer\n");
rd_kafka_brokers_add(kafka->writer, address);
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka->writer); \
return NULL; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DISPATCHER_2_ARBITER"); //
CREATE_TOPIC("RESPONSE_ARBITER_2_DISPATCHER");
CREATE_TOPIC("REQUEST_ARBITER_2_GRUNT");
CREATE_TOPIC("RESPONSE_GRUNT_2_ARBITER"); //
CREATE_TOPIC("JOIN_GRUNT_2_ARBITER"); //
CREATE_TOPIC("HEARTBEAT_GRUNT_2_ARBITER"); //
#undef CREATE_TOPIC
return kafka->writer;
}
header_function
rd_kafka_t* kafkaCreateReader (Kafka *kafka, Char *address)
{
char errstr[512] = {0};
rd_kafka_conf_t *kafka_reader_conf = rd_kafka_conf_new();
rd_kafka_conf_set(kafka_reader_conf, "group.id", "cloud-example-c", NULL, 0);
/* If there is no committed offset for this group, start reading
* partitions from the beginning. */
rd_kafka_conf_set(kafka_reader_conf, "auto.offset.reset", "earliest", NULL, 0);
/* Disable ERR__PARTITION_EOF when reaching end of partition. */
rd_kafka_conf_set(kafka_reader_conf, "enable.partition.eof", "false", NULL, 0);
kafka->reader = rd_kafka_new(RD_KAFKA_CONSUMER, kafka_reader_conf,
errstr, sizeof(errstr));
if (!kafka->reader) {
fprintf(stderr, "Failed to create consumer: %s\n", errstr);
rd_kafka_conf_destroy(kafka_reader_conf);
return NULL;
}
rd_kafka_brokers_add(kafka->reader, address);
rd_kafka_poll_set_consumer(kafka->reader);
return kafka->reader;
}
header_function
rd_kafka_topic_t* kafkaSubscribe (Kafka *kafka,
rd_kafka_topic_partition_list_t* topics, Char *topic)
{
rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
rd_kafka_topic_t *topic_result = rd_kafka_topic_new(kafka->reader, topic, NULL);
sbufAdd(kafka->topics, topic_result);
printf("Subscribe to %s\n", topic);
return topic_result;
}
header_function
B32 kafkaWrite (rd_kafka_t *kafka_writer, Char *topic, Char *user, Char *msg)
{
int delivery_counter = 0;
rd_kafka_resp_err_t err = rd_kafka_producev(
kafka_writer,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_KEY(user, strlen(user)),
RD_KAFKA_V_VALUE(msg, strlen(msg)),
/* producev() will make a copy of the message
* value (the key is always copied), so we
* can reuse the same json buffer on the
* next iteration. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_OPAQUE(&delivery_counter),
RD_KAFKA_V_END);
if (err) {
fprintf(stderr, "Produce failed: %s\n",
rd_kafka_err2str(err));
return false;
}
return true;
}
This diff is collapsed.
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
internal_function
void socketWrite (Char *output, Size output_len,
int sock_fd)
{
ssize_t nsent = 0;
Size output_cursor = 0;
while (true) {
nsent = write(sock_fd, output + output_cursor, output_len - output_cursor);
if (nsent == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
} else {
exit(-1);
}
} else if ((Size)nsent < output_len) {
output_cursor += (Size)nsent;
} else {
break;
}
}
}
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
#include <netdb.h> #include <netdb.h>
#include <unistd.h> #include <unistd.h>
#include <ctype.h> #include <ctype.h>
#include <assert.h>
#include <signal.h>
#include <librdkafka/rdkafka.h>
# if defined(COMPILER_CLANG) # if defined(COMPILER_CLANG)
# pragma clang diagnostic push # pragma clang diagnostic push
...@@ -30,6 +34,7 @@ ...@@ -30,6 +34,7 @@
# pragma clang diagnostic pop # pragma clang diagnostic pop
# endif # endif
#include "kafka.h"
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
...@@ -40,93 +45,70 @@ int main(int argc, char** argv) ...@@ -40,93 +45,70 @@ int main(int argc, char** argv)
Sint memory_required = (Sint)strtol(argv[1], NULL, 10); Sint memory_required = (Sint)strtol(argv[1], NULL, 10);
Char *port = "9526"; Kafka kafka = {0};
Sint sock_fd = socket(AF_INET, SOCK_STREAM, 0); kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
struct addrinfo hints = {.ai_family = AF_INET, rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
.ai_socktype = SOCK_STREAM};
struct addrinfo *result = NULL;
Sint s = getaddrinfo(NULL, port, &hints, &result);
if (s != 0) {
fprintf(stderr, "Error: getaddrinfo: %s\n", gai_strerror(s));
exit(-1);
}
while (connect(sock_fd, result->ai_addr, result->ai_addrlen) == -1) { kafkaSubscribe(&kafka, kafka_reader_topics, "RESPONSE_ARBITER_2_DISPATCHER");
fprintf(stderr, "Error: Couldn't connect on port %s, trying again in one second...\n", port);
sleep(1);
}
printf("Log: Starting communication with server on port %s...\n", port); rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
if (kafka_reader_topics_err) {
fprintf(stderr, "Subscribe failed: %s\n",
rd_kafka_err2str(kafka_reader_topics_err));
rd_kafka_destroy(kafka.reader);
return -1;
}
Char *output = NULL; Char *output = NULL;
sbufPrint(output, " "); Sint id = (Sint)time(NULL);
sbufPrint(output, "{\n\"id\": %d", (Sint)time(NULL)); sbufPrint(output, "{\n\"id\": \"%d\"", id);
sbufPrint(output, ",\n\"memory\": %d", memory_required); sbufPrint(output, ",\n\"memory\": %d", memory_required);
sbufPrint(output, "\n}\n"); sbufPrint(output, "\n}\n");
Size output_len = strlen(output);
printf("Sending to Arbiter:\n%s\n",
cJSON_Print(cJSON_Parse(output + 4)));
#if defined(ENDIAN_LITTLE) printf("Sending to Arbiter:\n%s\n", cJSON_Print(cJSON_Parse(output)));
U32 json_len = (U32)output_len - 4;
U32 json_len_be = swap_endian(json_len);
output[0] = ((Char*)&json_len_be)[0];
output[1] = ((Char*)&json_len_be)[1];
output[2] = ((Char*)&json_len_be)[2];
output[3] = ((Char*)&json_len_be)[3];
#endif
write(sock_fd, output, output_len); if (output != NULL) {
if (!kafkaWrite(kafka.writer, "REQUEST_DISPATCHER_2_ARBITER", "rm_test", output)) {
{ return -1;
cJSON *array = NULL; }
}
B32 initialized = false;
Size buffer_len = 0;
Size buffer_cap = MiB(1);
U32 buffer_expected_len = 0;
Char *buffer = calloc(buffer_cap, sizeof(*buffer));
Char size_bytes[4] = {0};
Size size_bytes_count = 0;
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 0);
while (true) { while (true) {
if (initialized == false) { if (kafka_message_read != NULL) {
long len = read(sock_fd, const Char *json_error = NULL;
(Char*)size_bytes + size_bytes_count, cJSON *root = cJSON_ParseWithOpts((char *)kafka_message_read->payload, &json_error, true);
4 - size_bytes_count); Sint id_now = atoi(cJSON_GetObjectItem(root, "id")->valuestring);
if (id_now == id) {
if (len == 0) { break;
perror("read() returned zero"); } else {
exit(-1); printf("Found a cranky old message: %d\n", id_now);
rd_kafka_message_destroy(kafka_message_read);
} }
}
size_bytes_count += (Size)len; kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 0);
if (size_bytes_count == 4) {
initialized = true;
buffer_expected_len = (U32)((size_bytes[3] << 0U) |
(size_bytes[2] << 8U) |
(size_bytes[1] << 16U) |
(size_bytes[0] << 24U));
} }
continue; if (kafka_message_read != NULL) {
if (kafka_message_read->err) {
/* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read));
} else { } else {
long len = read(sock_fd, fprintf(stderr,
buffer + buffer_len, "Received message on %s [%d] "
buffer_expected_len - buffer_len); "at offset %"PRId64": \n%s\n",
rd_kafka_topic_name(kafka_message_read->rkt),
buffer_len += (Size)len; (int)kafka_message_read->partition, kafka_message_read->offset,
cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
if (buffer_expected_len == buffer_len) { char *buffer = (char *)kafka_message_read->payload;
printf("Recieved: Final Response:\n%s\n",
cJSON_Print(cJSON_Parse(buffer)));
const Char *json_error = NULL; const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true); cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true);
...@@ -134,14 +116,14 @@ int main(int argc, char** argv) ...@@ -134,14 +116,14 @@ int main(int argc, char** argv)
if (root == NULL) { if (root == NULL) {
// TODO(naman): Error // TODO(naman): Error
} else { } else {
array = cJSON_GetObjectItem(root, "ip"); cJSON *array = cJSON_GetObjectItem(root, "id");
} cJSON *elem = NULL;
cJSON_ArrayForEach(elem, array) {
free(buffer); printf("%s\n", elem->valuestring);
break;
} }
} }
} }
rd_kafka_message_destroy(kafka_message_read);
} }
return 0; return 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment