Commit e5c9d0a6 authored by Nilanjan Daw's avatar Nilanjan Daw

Merge remote-tracking branch 'origin/master'

parents bb3f9eb1 24db4557
......@@ -6,4 +6,3 @@ firecracker*
secrets.json
resource_system/bin/**
resource_system/version.linux
grunt
......@@ -35,8 +35,9 @@ Format:
{
"resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
// "port": 2343 --- NOT IMPLEMENTED YET
"grunts": ["a", "b", ...] // List of machine IDs
"grunts": [
{ node_id: some unique ID, port: port address}, ...
] // List of machine IDs
}
```
......
......@@ -24,10 +24,12 @@
typedef struct Grunt {
Char *id;
Sint memory;
B32 rejoin_asked;
Sint time_to_die; // in ms
} Grunt;
typedef struct Grunt_Survey {
Grunt **grunt_ptrs;
Char **grunt_ids;
U16 *ports;
U64 milli_passed;
U64 milli_last;
......@@ -37,22 +39,14 @@ typedef struct Grunt_Survey {
typedef struct Command {
enum Command_Kind {
Command_NONE,
Command_REQUEST_DM_2_ARBITER,
Command_RESPONSE_ARBITER_2_DM,
Command_REQUEST_ARBITER_2_GRUNT,
Command_RESPONSE_GRUNT_2_ARBITER,
Command_HEARTBEAT_GRUNT_2_ARBITER,
Command_REJOIN_ARBITER_2_GRUNT,
} kind;
Char *resource_id;
union {
struct {
Sint memory;
} req_d2a;
struct {
Char **grunt_ids;
} res_a2d;
......@@ -62,17 +56,49 @@ typedef struct Command {
} req_a2g;
struct {
Char *id;
U16 port;
} res_g2a;
struct {
Char *id;
Sint memory;
} beat_g2a;
Char *grunt_id;
} rejoin_a2g;
};
} Command;
typedef struct Grunt_Tracker {
Grunt *grunts;
Size *free_list;
Hash_Table map;
} Grunt_Tracker;
internal_function
void gruntTrackBegin (Grunt_Tracker *t, Grunt g)
{
if (t->grunts == NULL) {
t->map = htCreate(0);
sbufAdd(t->grunts, (Grunt){0}); // SInce 0 index out of hash table will be invalid
}
Size insertion_index = 0;
if (sbufElemin(t->free_list) > 0) {
t->grunts[t->free_list[0]] = g;
insertion_index = t->free_list[0];
sbufUnsortedRemove(t->free_list, 0);
} else {
sbufAdd(t->grunts, g);
insertion_index = sbufElemin(t->grunts) - 1;
}
htInsert(&t->map, hashString(g.id), insertion_index);
}
internal_function
void gruntTrackEnd (Grunt_Tracker *t, Char *grunt_id)
{
Size index = htLookup(&t->map, hashString(grunt_id));
sbufAdd(t->free_list, index);
free(t->grunts[index].id);
t->grunts[index] = (Grunt){0};
htRemove(&t->map, index);
}
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wpadded"
......@@ -104,11 +130,9 @@ Sint main (Sint argc, Char *argv[])
signal(SIGINT, signalHandlerSIGINT);
Command *commands = NULL;
Grunt *grunts = NULL;
Hash_Table grunt_map = htCreate(0);
Hash_Table grunt_survey_map = htCreate(0);
Grunt_Tracker gt = {0};
sbufAdd(grunts, (Grunt){0}); // SInce 0 index out of hash table will be invalid
Hash_Table grunt_survey_map = htCreate(0);
Kafka kafka = {0};
......@@ -128,6 +152,7 @@ Sint main (Sint argc, Char *argv[])
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); //
#undef CREATE_TOPIC
......@@ -161,6 +186,13 @@ Sint main (Sint argc, Char *argv[])
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 100);
for (Size j = 0; j < gt.map.slot_count; j++) {
if (gt.map.values[j] != 0) {
Grunt g = gt.grunts[gt.map.values[j]];
g.time_to_die -= 100;
}
}
if (kafka_message_read != NULL) {
if (kafka_message_read->err) {
/* Consumer error: typically just informational. */
......@@ -179,7 +211,7 @@ Sint main (Sint argc, Char *argv[])
cJSON *root = cJSON_ParseWithOpts(kafka_message_read->payload, &json_error, true);
if (kafka_message_read->rkt == topic_req_dm2a) {
Command c = {.kind = Command_REQUEST_DM_2_ARBITER};
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = strdup(cJSON_GetObjectItem(root, "resource_id")->valuestring);
// TODO(naman): Add any new resource fields here
......@@ -190,11 +222,13 @@ Sint main (Sint argc, Char *argv[])
Char **grunt_ids = NULL;
for (Size j = 0; j < sbufElemin(grunts); j++) {
Grunt g = grunts[j];
if (g.memory >= memory) {
c.kind = Command_RESPONSE_ARBITER_2_DM;
sbufAdd(grunt_ids, g.id);
for (Size j = 0; j < gt.map.slot_count; j++) {
if (gt.map.values[j] != 0) {
Grunt g = gt.grunts[gt.map.values[j]];
if (g.memory >= memory) {
c.kind = Command_RESPONSE_ARBITER_2_DM;
sbufAdd(grunt_ids, strdup(g.id));
}
}
}
......@@ -211,13 +245,31 @@ Sint main (Sint argc, Char *argv[])
}
} else if (kafka_message_read->rkt == topic_join_g2a) {
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
Grunt grunt = {.id = id};
Grunt grunt = {.id = id, .time_to_die = 1000};
logMessage("Join G2A:\tid: %s", id);
if (htLookup(&grunt_map, hashString(id)) == 0) {
sbufAdd(grunts, grunt);
htInsert(&grunt_map, hashString(id), sbufElemin(grunts) - 1);
if (htLookup(&gt.map, hashString(id)) == 0) {
gruntTrackBegin(&gt, grunt);
}
} else if (kafka_message_read->rkt == topic_beat_g2a) {
Char *id = cJSON_GetObjectItem(root, "node_id")->valuestring;
U64 index = htLookup(&gt.map, hashString(id));
if (index != 0) { // Prevent any left over message
// TODO(naman): Add any new resource fields here
gt.grunts[index].time_to_die = 1000;
gt.grunts[index].memory = cJSON_GetObjectItem(root, "memory")->valueint;
logMessage("Beat G2A:\tid: %s (Memory %d)", id, gt.grunts[index].memory);
} else {
if ((gt.grunts != NULL) && (gt.grunts[index].rejoin_asked != true)) {
gt.grunts[index].rejoin_asked = true;
}
Command c = {.kind = Command_REJOIN_ARBITER_2_GRUNT,
.rejoin_a2g.grunt_id = id};
sbufAdd(commands, c);
logMessage("Beat G2A:\tid: %s (UNNOWN)", id);
}
} else if (kafka_message_read->rkt == topic_res_g2a) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
......@@ -232,20 +284,10 @@ Sint main (Sint argc, Char *argv[])
hashString(resource_id));
if (gs != NULL) { // If it has not been already removed
Grunt *g = &grunts[htLookup(&grunt_map, hashString(node_id))];
sbufAdd(gs->grunt_ptrs, g);
Grunt *g = &gt.grunts[htLookup(&gt.map, hashString(node_id))];
sbufAdd(gs->grunt_ids, strdup(g->id));
}
}
} else if (kafka_message_read->rkt == topic_beat_g2a) {
Char *id = cJSON_GetObjectItem(root, "node_id")->valuestring;
logMessage("Beat G2A:\tid: %s", id);
U64 index = htLookup(&grunt_map, hashString(id));
if (index != 0) { // Prevent any left over message
// TODO(naman): Add any new resource fields here
grunts[index].memory = cJSON_GetObjectItem(root, "memory")->valueint;
}
} else if (kafka_message_read->rkt == topic_log) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
......@@ -264,6 +306,16 @@ Sint main (Sint argc, Char *argv[])
rd_kafka_message_destroy(kafka_message_read);
}
for (Size j = 0; j < gt.map.slot_count; j++) {
if (gt.map.values[j] != 0) {
Size index = gt.map.values[j];
Grunt g = gt.grunts[index];
if (g.time_to_die <= 0) {
gruntTrackEnd(&gt, g.id);
}
}
}
for (Size i = 0; i < grunt_survey_map.slot_count; i++) {
if (grunt_survey_map.keys[i] != 0) {
Grunt_Survey *gs = (Grunt_Survey *)grunt_survey_map.values[i];
......@@ -275,13 +327,17 @@ Sint main (Sint argc, Char *argv[])
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = gs->resource_id;
for (Size k = 0; k < sbufElemin(gs->grunt_ptrs); k++) {
sbufAdd(c.res_a2d.grunt_ids, gs->grunt_ptrs[k]->id);
for (Size k = 0; k < sbufElemin(gs->grunt_ids); k++) {
Size index = htLookup(&gt.map, hashString(gs->grunt_ids[k]));
if (index != 0) {
Char *id = gt.grunts[index].id;
sbufAdd(c.res_a2d.grunt_ids, id);
}
}
sbufAdd(commands, c);
free(gs->grunt_ptrs);
sbufDelete(gs->grunt_ids);
free(gs->ports);
htRemove(&grunt_survey_map, hashString(gs->resource_id));
}
......@@ -313,6 +369,11 @@ Sint main (Sint argc, Char *argv[])
}
sbufPrint(output, "]");
sbufPrint(output, "\n}");
} else if (c.kind == Command_REJOIN_ARBITER_2_GRUNT) {
topic = "REJOIN_RM_2_RD";
sbufPrint(output, "{\n\"node_id\": \"%s\"", c.rejoin_a2g.grunt_id);
sbufPrint(output, "\n}");
}
if (output != NULL) {
......@@ -324,6 +385,12 @@ Sint main (Sint argc, Char *argv[])
sbufDelete(output);
free(c.resource_id);
if (c.kind == Command_RESPONSE_ARBITER_2_DM) {
for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) {
free(c.res_a2d.grunt_ids[k]);
}
}
sbufUnsortedRemove(commands, j);
}
}
......
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
#if !defined(COMMAND_H_INCLUDE_GUARD)
#define COMMAND_SYSTEM(req_type_name, req_var_name, req_member_type, \
res_type_name, res_var_name, res_member_type) \
\
typedef struct req_type_name { \
req_member_type member; \
struct req_type_name *next; \
} req_type_name; \
\
global_variable req_type_name *global_##req_var_name##_first; \
global_variable req_type_name *global_##req_var_name##_last; \
global_variable req_type_name *global_##req_var_name##_divider; \
global_variable pthread_mutex_t global_##req_var_name##_lock = PTHREAD_MUTEX_INITIALIZER; \
global_variable pthread_cond_t global_##req_var_name##_cond_var = PTHREAD_COND_INITIALIZER; \
\
internal_function \
void req_var_name##Init (void) \
{ \
global_##req_var_name##_first = calloc(1, sizeof(*global_##req_var_name##_first)); \
global_##req_var_name##_last = global_##req_var_name##_first; \
global_##req_var_name##_divider = global_##req_var_name##_first; \
\
return; \
} \
\
internal_function \
void req_var_name##Enqueue (req_member_type c) \
{ \
global_##req_var_name##_last->next = calloc(1, sizeof(*(global_##req_var_name##_last->next))); \
global_##req_var_name##_last->next->member = c; \
global_##req_var_name##_last = global_##req_var_name##_last->next; \
\
while (global_##req_var_name##_first != global_##req_var_name##_divider) { \
req_type_name *temp = global_##req_var_name##_first; \
global_##req_var_name##_first = global_##req_var_name##_first->next; \
free(temp); \
} \
\
pthread_cond_signal(&global_##req_var_name##_cond_var); \
\
return; \
} \
\
internal_function \
void req_var_name##Dequeue (req_member_type *c, _Atomic B64 *keep_alive) \
{ \
pthread_mutex_lock(&global_##req_var_name##_lock); \
\
while (global_##req_var_name##_divider == global_##req_var_name##_last) { \
if ((keep_alive != NULL) && (atomic_load(keep_alive) == false)) { \
*c = (req_member_type){0}; \
pthread_cond_signal(&global_##req_var_name##_cond_var); \
pthread_mutex_unlock(&global_##req_var_name##_lock); \
return; \
} \
pthread_cond_wait(&global_##req_var_name##_cond_var, &global_##req_var_name##_lock); \
} \
\
*c = global_##req_var_name##_divider->next->member; \
global_##req_var_name##_divider = global_##req_var_name##_divider->next; \
\
pthread_mutex_unlock(&global_##req_var_name##_lock); \
} \
\
\
\
\
typedef struct res_type_name { \
res_member_type member; \
struct res_type_name *next; \
} res_type_name; \
\
global_variable res_type_name *global_##res_var_name##_first; \
global_variable res_type_name *global_##res_var_name##_last; \
global_variable res_type_name *global_##res_var_name##_divider; \
global_variable pthread_mutex_t global_##res_var_name##_lock = PTHREAD_MUTEX_INITIALIZER; \
\
internal_function \
void res_var_name##Init (void) \
{ \
global_##res_var_name##_first = calloc(1, sizeof(*global_##res_var_name##_first)); \
global_##res_var_name##_last = global_##res_var_name##_first; \
global_##res_var_name##_divider = global_##res_var_name##_first; \
} \
\
internal_function \
void res_var_name##Enqueue (res_member_type c) \
{ \
pthread_mutex_lock(&global_##res_var_name##_lock); \
\
global_##res_var_name##_last->next = calloc(1, sizeof(*(global_##res_var_name##_last->next))); \
global_##res_var_name##_last->next->member = c; \
global_##res_var_name##_last = global_##res_var_name##_last->next; \
\
while (global_##res_var_name##_first != global_##res_var_name##_divider) { \
res_type_name *temp = global_##res_var_name##_first; \
global_##res_var_name##_first = global_##res_var_name##_first->next; \
free(temp); \
} \
\
pthread_mutex_unlock(&global_##res_var_name##_lock); \
\
return; \
} \
\
internal_function \
B32 res_var_name##Dequeue (res_member_type *c) \
{ \
if (global_##res_var_name##_divider != global_##res_var_name##_last) { \
*c = global_##res_var_name##_divider->next->member; \
global_##res_var_name##_divider = global_##res_var_name##_divider->next; \
return true; \
} \
\
return false; \
}
#define COMMAND_H_INCLUDE_GUARD
#endif
......@@ -20,6 +20,7 @@
#include <sys/time.h>
#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include <librdkafka/rdkafka.h>
# if defined(COMPILER_CLANG)
......@@ -45,7 +46,50 @@ typedef struct Command {
#include "kafka.h"
#include "time.c"
#include "command.h"
typedef struct Thread_Manager_Command {
enum Thread_Manager_Command_Kind {
Thread_Manager_Command_NONE,
Thread_Manager_Command_DOCKER_CREATE,
Thread_Manager_Command_DOCKER_DESTROY,
} kind;
Char *id;
union {
struct {
Sint placeholder;
} docker_create;
struct {
Sint placeholder;
} docker_destroy;
};
} Thread_Manager_Command;
typedef struct JSON_Print_Command {
Char *msg;
Char *topic;
} JSON_Print_Command;
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wextra-semi"
# endif
COMMAND_SYSTEM(Thread_Manager_Request, tmCommand, Thread_Manager_Command,
JSON_Print_Request, instrumentCommand, JSON_Print_Command);
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
global_variable volatile sig_atomic_t global_keep_running = 1;
global_variable Char *node_name;
#include "instrument_docker.c"
#include "thread_manager.c"
internal_function
void signalHandlerSIGINT (int _)
......@@ -56,8 +100,6 @@ void signalHandlerSIGINT (int _)
int main(int argc, char** argv)
{
Char *node_name = NULL;
if (argc > 1) {
node_name = argv[1];
} else {
......@@ -68,6 +110,12 @@ int main(int argc, char** argv)
signal(SIGINT, signalHandlerSIGINT);
tmCommandInit();
instrumentCommandInit();
pthread_t thread_manager;
pthread_create(&thread_manager, NULL, &tmProcessLoop, NULL);
Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
......@@ -86,6 +134,7 @@ int main(int argc, char** argv)
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
......@@ -94,7 +143,10 @@ int main(int argc, char** argv)
rd_kafka_topic_t *topic_req_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REQUEST_RM_2_RD");
unused_variable(topic_req_a2g);
rd_kafka_topic_t *topic_rej_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REJOIN_RM_2_RD");
rd_kafka_topic_t *topic_log = kafkaSubscribe(&kafka, kafka_reader_topics,
"LOG_COMMON");
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
......@@ -129,7 +181,7 @@ int main(int argc, char** argv)
/* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read));
} else {
} else if (kafka_message_read->rkt == topic_req_a2g) {
fprintf(stderr,
"Received message on %s [%d] "
"at offset %"PRId64": \n%s\n",
......@@ -149,6 +201,38 @@ int main(int argc, char** argv)
c.txn_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint;
}
} else if (kafka_message_read->rkt == topic_rej_a2g) {
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) {
return -1;
}
} else if (kafka_message_read->rkt == topic_log) {
fprintf(stderr,
"Received message on %s [%d] "
"at offset %"PRId64": \n%s\n",
rd_kafka_topic_name(kafka_message_read->rkt),
(int)kafka_message_read->partition, kafka_message_read->offset,
cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
char *buffer = (char *)kafka_message_read->payload;
const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true);
if (root == NULL) {
// TODO(naman): Error
} else {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
if (strequal(node_id, node_name)) {
// FIXME(naman): Fix this placeholder
Thread_Manager_Command tmc = {0};
tmCommandEnqueue(tmc);
/* "resource_id": "logical-entity-id", */
/* "function_id": "unique-function-id", */
/* "timestamp" : "iso-8601-timestamp", */
/* "reason": "deployment"/"termination", */
/* "status": true/false // Only valid if reason==deployment; */
}
}
}
rd_kafka_message_destroy(kafka_message_read);
}
......@@ -186,6 +270,16 @@ int main(int argc, char** argv)
}
}
{
JSON_Print_Command command = {0};
while (instrumentCommandDequeue(&command)) {
// TODO(naman): Enable this after proper testing
/* if (!kafkaWrite(kafka.writer, command.topic, "resource_daemon", command.msg)) { */
/* return -1; */
/* } */
}
}
{ // Send a heartbeat message if it is time to do so
U64 time_new = timeMilli();
U64 time_passed = time_new - time_begin;
......
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
internal_function
noreturn
void* dockerProcessLoop (void *arg)
{
unused_variable(arg);
pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);
while (true) {
// TODO(naman): Get data
Char *json = NULL;
Char *output = NULL;
sbufPrint(output, "{\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"type\": \"%s\"", "docker");
sbufPrint(output, ",\n\"data\": %s", json ? json : "{}");
sbufPrint(output, "\n}\n");
JSON_Print_Command jpc = {.msg = output,
.topic = "LOG_CHANNEL"};
U64 time_before = timeMilli();
instrumentCommandEnqueue(jpc);
U64 time_after = timeMilli();
if ((time_after - time_before) < 1000) {
sleep((Uint)(time_after - time_before));
}
}
}
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Thread {
pthread_t thread;
Char *id;
} Thread;
typedef struct Thread_Tracker {
Thread *threads;
Size *free_list;
Hash_Table map;
} Thread_Tracker;
internal_function
void threadTrackBegin (Thread_Tracker *t, Thread th)
{
if (t->threads == NULL) {
t->map = htCreate(0);
sbufAdd(t->threads, (Thread){0}); // SInce 0 index out of hash table will be invalid
}
Size insertion_index = 0;
if (sbufElemin(t->free_list) > 0) {
t->threads[t->free_list[0]] = th;
insertion_index = t->free_list[0];
sbufUnsortedRemove(t->free_list, 0);
} else {
sbufAdd(t->threads, th);
insertion_index = sbufElemin(t->threads) - 1;
}
htInsert(&t->map, hashString(th.id), insertion_index);
}
internal_function
void threadTrackEnd (Thread_Tracker *t, Char *thread_id)
{
Size index = htLookup(&t->map, hashString(thread_id));
sbufAdd(t->free_list, index);
free(t->threads[index].id);
t->threads[index] = (Thread){0};
htRemove(&t->map, index);
}
internal_function
void* tmProcessLoop (void *arg)
{
unused_variable(arg);
Thread_Tracker tt = {0};
while (true) {
Thread_Manager_Command command = {0};
tmCommandDequeue(&command, NULL);
switch (command.kind) {
case Thread_Manager_Command_DOCKER_CREATE: {
pthread_t thread;
pthread_create(&thread, NULL, &dockerProcessLoop, NULL);
threadTrackBegin(&tt, (Thread){.id = command.id, .thread = thread});
} break;
case Thread_Manager_Command_DOCKER_DESTROY: {
Size index = htLookup(&tt.map, hashString(command.id));
pthread_t thread = tt.threads[index].thread;
pthread_cancel(thread);
pthread_join(thread, NULL);
threadTrackEnd(&tt, command.id);
} break;
case Thread_Manager_Command_NONE: {
} break;
}
}
}
......@@ -96,20 +96,25 @@ int main(int argc, char** argv)
}
}
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 0);
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 10);
while (true) {
if (kafka_message_read != NULL) {
const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts((char *)kafka_message_read->payload, &json_error, true);
Sint id_now = atoi(cJSON_GetObjectItem(root, "id")->valuestring);
Sint id_now = 0;
if (cJSON_GetObjectItem(root, "resource_id") == NULL) {
goto skip_message;
}
id_now = atoi(cJSON_GetObjectItem(root, "resource_id")->valuestring);
if (id_now == id) {
break;
} else {
skip_message:
printf("Found a cranky old message: %d\n", id_now);
rd_kafka_message_destroy(kafka_message_read);
}
}
kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 0);
kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 10);
}
if (kafka_message_read != NULL) {
......@@ -143,5 +148,15 @@ int main(int argc, char** argv)
rd_kafka_message_destroy(kafka_message_read);
}
for (Size i = 0; i < sbufElemin(kafka.topics); i++) {
rd_kafka_topic_destroy(kafka.topics[i]);
}
rd_kafka_consumer_close(kafka.reader);
rd_kafka_destroy(kafka.reader);
for (Size i = 0; i < sbufElemin(kafka.queues); i++) {
rd_kafka_queue_destroy(kafka.queues[i]);
}
rd_kafka_destroy(kafka.writer);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment