1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Kafka {
rd_kafka_t *writer;
rd_kafka_t *reader;
rd_kafka_queue_t **queues;
rd_kafka_topic_t **topics;
} Kafka;
header_function
int kafkaCreateTopic (Kafka *kafka, Char *topic,
Sint num_partitions, Sint replication_factor)
{
char errstr[256];
rd_kafka_NewTopic_t *new_topic = rd_kafka_NewTopic_new(topic,
num_partitions,
replication_factor,
errstr, sizeof(errstr));
if (!new_topic) {
fprintf(stderr, "Failed to create NewTopic object: %s\n",
errstr);
return -1;
}
/* Use a temporary queue for the asynchronous Admin result */
rd_kafka_queue_t *queue = rd_kafka_queue_new(kafka->writer);
sbufAdd(kafka->queues, queue);
/* Asynchronously create topic, result will be available on queue */
rd_kafka_CreateTopics(kafka->writer, &new_topic, 1, NULL, queue);
rd_kafka_NewTopic_destroy(new_topic);
/* Wait for result event */
rd_kafka_event_t *event = rd_kafka_queue_poll(queue, 15*1000);
if (!event) {
/* There will eventually be a result, after operation
* and request timeouts, but in this example we'll only
* wait 15s to avoid stalling too long when cluster
* is not available. */
fprintf(stderr, "No create topics result in 15s\n");
return -1;
}
if (rd_kafka_event_error(event)) {
/* Request-level failure */
fprintf(stderr, "Create topics request failed: %s\n",
rd_kafka_event_error_string(event));
rd_kafka_event_destroy(event);
return -1;
}
/* Extract the result type from the event. */
const rd_kafka_CreateTopics_result_t *result = rd_kafka_event_CreateTopics_result(event);
assert(result); /* Since we're using a dedicated queue we know this is
* a CreateTopics result type. */
/* Extract the per-topic results from the result type. */
size_t result_topics_count;
const rd_kafka_topic_result_t **result_topics = rd_kafka_CreateTopics_result_topics(result,
&result_topics_count);
assert(result_topics && result_topics_count == 1);
int return_value = 0;
if (rd_kafka_topic_result_error(result_topics[0]) ==
RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) {
fprintf(stderr, "Topic %s already exists\n",
rd_kafka_topic_result_name(result_topics[0]));
} else if (rd_kafka_topic_result_error(result_topics[0])) {
fprintf(stderr, "Failed to create topic %s: %s\n",
rd_kafka_topic_result_name(result_topics[0]),
rd_kafka_topic_result_error_string(result_topics[0]));
return_value = -1;
} else {
fprintf(stderr, "Topic %s successfully created\n",
rd_kafka_topic_result_name(result_topics[0]));
}
rd_kafka_event_destroy(event);
return return_value;
}
header_function
rd_kafka_t* kafkaCreateWriter (Kafka *kafka, Char *address)
{
char errstr[512] = {0};
printf("Creating writer conf\n");
rd_kafka_conf_t *kafka_writer_conf = rd_kafka_conf_new();
rd_kafka_conf_set_dr_msg_cb(kafka_writer_conf, NULL);
printf("Creating writer\n");
kafka->writer = rd_kafka_new(RD_KAFKA_PRODUCER, kafka_writer_conf,
errstr, sizeof(errstr));
if (!kafka->writer) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
rd_kafka_conf_destroy(kafka_writer_conf);
return NULL;
}
printf("Ading brokers to writer\n");
rd_kafka_brokers_add(kafka->writer, address);
#define CREATE_TOPIC(s) \
do { \
if (kafkaCreateTopic(kafka, s, 1, 1) == -1) { \
rd_kafka_destroy(kafka->writer); \
return NULL; \
} \
} while (0)
CREATE_TOPIC("REQUEST_DISPATCHER_2_ARBITER"); //
CREATE_TOPIC("RESPONSE_ARBITER_2_DISPATCHER");
CREATE_TOPIC("REQUEST_ARBITER_2_GRUNT");
CREATE_TOPIC("RESPONSE_GRUNT_2_ARBITER"); //
CREATE_TOPIC("JOIN_GRUNT_2_ARBITER"); //
CREATE_TOPIC("HEARTBEAT_GRUNT_2_ARBITER"); //
#undef CREATE_TOPIC
return kafka->writer;
}
header_function
rd_kafka_t* kafkaCreateReader (Kafka *kafka, Char *address)
{
char errstr[512] = {0};
rd_kafka_conf_t *kafka_reader_conf = rd_kafka_conf_new();
rd_kafka_conf_set(kafka_reader_conf, "group.id", "cloud-example-c", NULL, 0);
/* If there is no committed offset for this group, start reading
* partitions from the beginning. */
rd_kafka_conf_set(kafka_reader_conf, "auto.offset.reset", "earliest", NULL, 0);
/* Disable ERR__PARTITION_EOF when reaching end of partition. */
rd_kafka_conf_set(kafka_reader_conf, "enable.partition.eof", "false", NULL, 0);
kafka->reader = rd_kafka_new(RD_KAFKA_CONSUMER, kafka_reader_conf,
errstr, sizeof(errstr));
if (!kafka->reader) {
fprintf(stderr, "Failed to create consumer: %s\n", errstr);
rd_kafka_conf_destroy(kafka_reader_conf);
return NULL;
}
rd_kafka_brokers_add(kafka->reader, address);
rd_kafka_poll_set_consumer(kafka->reader);
return kafka->reader;
}
header_function
rd_kafka_topic_t* kafkaSubscribe (Kafka *kafka,
rd_kafka_topic_partition_list_t* topics, Char *topic)
{
rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
rd_kafka_topic_t *topic_result = rd_kafka_topic_new(kafka->reader, topic, NULL);
sbufAdd(kafka->topics, topic_result);
printf("Subscribe to %s\n", topic);
return topic_result;
}
header_function
B32 kafkaWrite (rd_kafka_t *kafka_writer, Char *topic, Char *user, Char *msg)
{
int delivery_counter = 0;
rd_kafka_resp_err_t err = rd_kafka_producev(
kafka_writer,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_KEY(user, strlen(user)),
RD_KAFKA_V_VALUE(msg, strlen(msg)),
/* producev() will make a copy of the message
* value (the key is always copied), so we
* can reuse the same json buffer on the
* next iteration. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_OPAQUE(&delivery_counter),
RD_KAFKA_V_END);
if (err) {
fprintf(stderr, "Produce failed: %s\n",
rd_kafka_err2str(err));
return false;
}
return true;
}