kafka.h 6.72 KB
/*
 * Creator: Naman Dixit
 * Notice: © Copyright 2020 Naman Dixit
 */

typedef struct Kafka {
    rd_kafka_t *writer;
    rd_kafka_t *reader;
    rd_kafka_queue_t **queues;
    rd_kafka_topic_t **topics;
} Kafka;

header_function
int kafkaCreateTopic (Kafka *kafka, Char *topic,
                      Sint num_partitions, Sint replication_factor)
{
    char errstr[256];
    rd_kafka_NewTopic_t *new_topic = rd_kafka_NewTopic_new(topic,
                                                           num_partitions,
                                                           replication_factor,
                                                           errstr, sizeof(errstr));
    if (!new_topic) {
        fprintf(stderr, "Failed to create NewTopic object: %s\n",
                errstr);
        return -1;
    }

    /* Use a temporary queue for the asynchronous Admin result */
    rd_kafka_queue_t *queue = rd_kafka_queue_new(kafka->writer);
    sbufAdd(kafka->queues, queue);

    /* Asynchronously create topic, result will be available on queue */
    rd_kafka_CreateTopics(kafka->writer, &new_topic, 1, NULL, queue);

    rd_kafka_NewTopic_destroy(new_topic);

    /* Wait for result event */
    rd_kafka_event_t *event = rd_kafka_queue_poll(queue, 15*1000);
    if (!event) {
        /* There will eventually be a result, after operation
         * and request timeouts, but in this example we'll only
         * wait 15s to avoid stalling too long when cluster
         * is not available. */
        fprintf(stderr, "No create topics result in 15s\n");
        return -1;
    }

    if (rd_kafka_event_error(event)) {
        /* Request-level failure */
        fprintf(stderr, "Create topics request failed: %s\n",
                rd_kafka_event_error_string(event));
        rd_kafka_event_destroy(event);
        return -1;
    }

    /* Extract the result type from the event. */
    const rd_kafka_CreateTopics_result_t *result = rd_kafka_event_CreateTopics_result(event);
    assert(result); /* Since we're using a dedicated queue we know this is
                     * a CreateTopics result type. */

    /* Extract the per-topic results from the result type. */
    size_t result_topics_count;
    const rd_kafka_topic_result_t **result_topics = rd_kafka_CreateTopics_result_topics(result,
                                                                                        &result_topics_count);
    assert(result_topics && result_topics_count == 1);

    int return_value = 0;

    if (rd_kafka_topic_result_error(result_topics[0]) ==
        RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) {
        fprintf(stderr, "Topic %s already exists\n",
                rd_kafka_topic_result_name(result_topics[0]));
    } else if (rd_kafka_topic_result_error(result_topics[0])) {
        fprintf(stderr, "Failed to create topic %s: %s\n",
                rd_kafka_topic_result_name(result_topics[0]),
                rd_kafka_topic_result_error_string(result_topics[0]));
        return_value = -1;
    } else {
        fprintf(stderr, "Topic %s successfully created\n",
                rd_kafka_topic_result_name(result_topics[0]));
    }

    rd_kafka_event_destroy(event);

    return return_value;
}

header_function
rd_kafka_t* kafkaCreateWriter (Kafka *kafka, Char *address)
{
    char errstr[512] = {0};

    printf("Creating writer conf\n");
    rd_kafka_conf_t *kafka_writer_conf = rd_kafka_conf_new();
    rd_kafka_conf_set_dr_msg_cb(kafka_writer_conf, NULL);

    printf("Creating writer\n");
    kafka->writer = rd_kafka_new(RD_KAFKA_PRODUCER, kafka_writer_conf,
                                errstr, sizeof(errstr));
    if (!kafka->writer) {
        fprintf(stderr, "Failed to create producer: %s\n", errstr);
        rd_kafka_conf_destroy(kafka_writer_conf);
        return NULL;
    }

    printf("Ading brokers to writer\n");
    rd_kafka_brokers_add(kafka->writer, address);

#define CREATE_TOPIC(s)                                         \
    do {                                                        \
        if (kafkaCreateTopic(kafka, s, 1, 1) == -1) {    \
            rd_kafka_destroy(kafka->writer);                     \
            return NULL;                                        \
        }                                                       \
    } while (0)

    CREATE_TOPIC("REQUEST_DISPATCHER_2_ARBITER"); //
    CREATE_TOPIC("RESPONSE_ARBITER_2_DISPATCHER");
    CREATE_TOPIC("REQUEST_ARBITER_2_GRUNT");
    CREATE_TOPIC("RESPONSE_GRUNT_2_ARBITER"); //
    CREATE_TOPIC("JOIN_GRUNT_2_ARBITER"); //
    CREATE_TOPIC("HEARTBEAT_GRUNT_2_ARBITER"); //
#undef CREATE_TOPIC

    return kafka->writer;
}

header_function
rd_kafka_t* kafkaCreateReader (Kafka *kafka, Char *address)
{
    char errstr[512] = {0};

    rd_kafka_conf_t *kafka_reader_conf = rd_kafka_conf_new();
    rd_kafka_conf_set(kafka_reader_conf, "group.id", "cloud-example-c", NULL, 0);
    /* If there is no committed offset for this group, start reading
     * partitions from the beginning. */
    rd_kafka_conf_set(kafka_reader_conf, "auto.offset.reset", "earliest", NULL, 0);
    /* Disable ERR__PARTITION_EOF when reaching end of partition. */
    rd_kafka_conf_set(kafka_reader_conf, "enable.partition.eof", "false", NULL, 0);

    kafka->reader = rd_kafka_new(RD_KAFKA_CONSUMER, kafka_reader_conf,
                                 errstr, sizeof(errstr));
    if (!kafka->reader) {
        fprintf(stderr, "Failed to create consumer: %s\n", errstr);
        rd_kafka_conf_destroy(kafka_reader_conf);
        return NULL;
    }
    rd_kafka_brokers_add(kafka->reader, address);
    rd_kafka_poll_set_consumer(kafka->reader);

    return kafka->reader;
}

header_function
rd_kafka_topic_t* kafkaSubscribe (Kafka *kafka,
                                  rd_kafka_topic_partition_list_t* topics, Char *topic)
{
    rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
    rd_kafka_topic_t *topic_result = rd_kafka_topic_new(kafka->reader, topic, NULL);
    sbufAdd(kafka->topics, topic_result);
    printf("Subscribe to %s\n", topic);
    return topic_result;
}

header_function
B32 kafkaWrite (rd_kafka_t *kafka_writer, Char *topic, Char *user, Char *msg)
{
    int delivery_counter = 0;
    rd_kafka_resp_err_t err = rd_kafka_producev(
        kafka_writer,
        RD_KAFKA_V_TOPIC(topic),
        RD_KAFKA_V_KEY(user, strlen(user)),
        RD_KAFKA_V_VALUE(msg, strlen(msg)),
        /* producev() will make a copy of the message
         * value (the key is always copied), so we
         * can reuse the same json buffer on the
         * next iteration. */
        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
        RD_KAFKA_V_OPAQUE(&delivery_counter),
        RD_KAFKA_V_END);
    if (err) {
        fprintf(stderr, "Produce failed: %s\n",
                rd_kafka_err2str(err));
        return false;
    }

    return true;
}