Commit 7404203a authored by Naman Dixit's avatar Naman Dixit

Pulled config into ini file, rearranged some code

parent 13692b53
[Kafka]
Address = 10.129.6.5:9092
[Arbiter]
MessageReadGap = 10
GruntTimeToDie = 10000
GruntResponseWaitTime = 100
[Section]
MessageReadGap = 10
HeartbeatGap = 1000
\ No newline at end of file
......@@ -3,12 +3,6 @@
* Notice: © Copyright 2020 Naman Dixit
*/
#define KAFKA_ADDRESS "10.129.6.5:9092"
#define ARBITER_READ_MESSAGE_GAP_MS 10
#define ARBITER_GRUNT_TIME_TO_DIE_MS 10000
#define ARBITER_DISPATCHER_RESPONSE_WAIT_TIME_MS 100
#define logMessage(s, ...) printf(s "\n", ##__VA_ARGS__)
#define logError(s, ...) fprintf(stderr, s "\n", ##__VA_ARGS__)
......@@ -28,89 +22,11 @@
#include <signal.h>
#include <librdkafka/rdkafka.h>
typedef struct Grunt {
Char *id;
Sint memory;
B32 rejoin_asked;
Sint time_to_die; // in ms
} Grunt;
typedef struct Grunt_Survey {
Char **grunt_ids;
U16 *ports;
U64 milli_passed;
U64 milli_last;
Char *resource_id;
} Grunt_Survey;
typedef struct Command {
enum Command_Kind {
Command_NONE,
Command_RESPONSE_ARBITER_2_DM,
Command_REQUEST_ARBITER_2_GRUNT,
Command_JOIN_ACK_ARBITER_2_GRUNT,
Command_REJOIN_ARBITER_2_GRUNT,
} kind;
Char *resource_id;
union {
struct {
Char **grunt_ids;
} res_a2d;
struct {
Sint memory;
} req_a2g;
struct {
Char *grunt_id;
} rejoin_a2g;
struct {
Char *grunt_id;
} join_ack_a2g;
};
} Command;
typedef struct Grunt_Tracker {
Grunt *grunts;
Size *free_list;
Hash_Table map;
} Grunt_Tracker;
internal_function
void gruntTrackBegin (Grunt_Tracker *t, Grunt g)
{
if (t->grunts == NULL) {
t->map = htCreate(0);
sbufAdd(t->grunts, (Grunt){0}); // SInce 0 index out of hash table will be invalid
}
Size insertion_index = 0;
if (sbufElemin(t->free_list) > 0) {
t->grunts[t->free_list[0]] = g;
insertion_index = t->free_list[0];
sbufUnsortedRemove(t->free_list, 0);
} else {
sbufAdd(t->grunts, g);
insertion_index = sbufElemin(t->grunts) - 1;
}
htInsert(&t->map, hashString(g.id), insertion_index);
}
internal_function
void gruntTrackEnd (Grunt_Tracker *t, Char *grunt_id)
{
U64 hash = hashString(grunt_id);
Size index = htLookup(&t->map, hash);
sbufAdd(t->free_list, index);
free(t->grunts[index].id);
t->grunts[index] = (Grunt){0};
htRemove(&t->map, hash);
}
#include "kafka.h"
#include "time.c"
#include "conf.c"
#include "command.c"
#include "grunt_manager.c"
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
......@@ -123,8 +39,16 @@ void gruntTrackEnd (Grunt_Tracker *t, Char *grunt_id)
# pragma clang diagnostic pop
# endif
#include "kafka.h"
#include "time.c"
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wreserved-id-macro"
# pragma clang diagnostic ignored "-Wcast-qual"
# endif
#include "inih/ini.h"
#include "inih/ini.c"
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
global_variable volatile sig_atomic_t global_keep_running = 1;
......@@ -142,6 +66,20 @@ Sint main (Sint argc, Char *argv[])
signal(SIGINT, signalHandlerSIGINT);
Configuration conf = {0};
{ // Default config values
conf.kafka_address = "10.129.6.5:9092";
conf.message_read_gap = 10;
conf.grunt_time_to_die = 10000;
conf.grunt_response_wait_time = 100;
}
if (ini_parse("config.ini", confCallback, &conf) < 0) {
printf("Can't load 'config.ini'\n");
return -1;
}
Command *commands = NULL;
Grunt_Tracker gt = {0};
......@@ -149,7 +87,7 @@ Sint main (Sint argc, Char *argv[])
Kafka kafka = {0};
kafkaCreateWriter(&kafka, KAFKA_ADDRESS);
kafkaCreateWriter(&kafka, conf.kafka_address);
#define CREATE_TOPIC(s) \
do { \
......@@ -170,7 +108,7 @@ Sint main (Sint argc, Char *argv[])
CREATE_TOPIC("LOG_COMMON"); //
#undef CREATE_TOPIC
kafkaCreateReader(&kafka, KAFKA_ADDRESS);
kafkaCreateReader(&kafka, conf.kafka_address);
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
......@@ -206,7 +144,7 @@ Sint main (Sint argc, Char *argv[])
while (global_keep_running) {
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader,
ARBITER_READ_MESSAGE_GAP_MS);
conf.message_read_gap);
U64 time_passed_now = timeMilli();
U64 time_passed = time_passed_now - time_passed_last;
......@@ -277,7 +215,7 @@ Sint main (Sint argc, Char *argv[])
}
} else if (kafka_message_read->rkt == topic_join_g2a) {
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
Grunt grunt = {.id = strdup(id), .time_to_die = ARBITER_GRUNT_TIME_TO_DIE_MS};
Grunt grunt = {.id = strdup(id), .time_to_die = conf.grunt_time_to_die};
logMessage("Join G2A:\tid: %s", id);
......@@ -294,7 +232,7 @@ Sint main (Sint argc, Char *argv[])
U64 index = htLookup(&gt.map, hashString(id));
if (index != 0) { // Prevent any left over message
// TODO(naman): Add any new resource fields here
gt.grunts[index].time_to_die = ARBITER_GRUNT_TIME_TO_DIE_MS;
gt.grunts[index].time_to_die = conf.grunt_time_to_die;
gt.grunts[index].memory = cJSON_GetObjectItem(root, "memory")->valueint;
logMessage("Beat G2A:\tid: %s (Memory %d)", id, gt.grunts[index].memory);
......@@ -360,7 +298,7 @@ Sint main (Sint argc, Char *argv[])
U64 milli_new = timeMilli();
gs->milli_passed += milli_new - gs->milli_last;
gs->milli_last = milli_new;
if (gs->milli_passed >= ARBITER_DISPATCHER_RESPONSE_WAIT_TIME_MS) {
if (gs->milli_passed >= conf.grunt_response_wait_time) {
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = gs->resource_id;
......
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Command {
enum Command_Kind {
Command_NONE,
Command_RESPONSE_ARBITER_2_DM,
Command_REQUEST_ARBITER_2_GRUNT,
Command_JOIN_ACK_ARBITER_2_GRUNT,
Command_REJOIN_ARBITER_2_GRUNT,
} kind;
Char *resource_id;
union {
struct {
Char **grunt_ids;
} res_a2d;
struct {
Sint memory;
} req_a2g;
struct {
Char *grunt_id;
} rejoin_a2g;
struct {
Char *grunt_id;
} join_ack_a2g;
};
} Command;
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Configuration {
Char *kafka_address;
U64 grunt_response_wait_time;
Sint message_read_gap;
Sint grunt_time_to_die;
} Configuration;
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wincompatible-pointer-types-discards-qualifiers"
# endif
internal_function
Sint confCallback (void* user, const Char *section,
const Char *name, const Char *value)
{
Configuration* conf = (Configuration*)user;
if (strequal(section, "Arbiter")) {
if (strequal(name, "MessageReadGap")) {
conf->message_read_gap = atoi(value);
} else if (strequal(name, "GruntTimeToDie")) {
conf->grunt_time_to_die = atoi(value);
} else if (strequal(name, "GruntResponseWaitTime")) {
conf->grunt_response_wait_time = strtoul(value, NULL, 10);
} else {
return 0; /* unknown section/name, error */
}
} else if (strequal(section, "Kafka")) {
if (strequal(name, "Address")) {
conf->kafka_address = strdup(value);
} else {
return 0; /* unknown section/name, error */
}
}
return 1;
}
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Grunt {
Char *id;
Sint memory;
B32 rejoin_asked;
Sint time_to_die; // in ms
} Grunt;
typedef struct Grunt_Survey {
Char **grunt_ids;
U16 *ports;
U64 milli_passed;
U64 milli_last;
Char *resource_id;
} Grunt_Survey;
typedef struct Grunt_Tracker {
Grunt *grunts;
Size *free_list;
Hash_Table map;
} Grunt_Tracker;
internal_function
void gruntTrackBegin (Grunt_Tracker *t, Grunt g)
{
if (t->grunts == NULL) {
t->map = htCreate(0);
sbufAdd(t->grunts, (Grunt){0}); // SInce 0 index out of hash table will be invalid
}
Size insertion_index = 0;
if (sbufElemin(t->free_list) > 0) {
t->grunts[t->free_list[0]] = g;
insertion_index = t->free_list[0];
sbufUnsortedRemove(t->free_list, 0);
} else {
sbufAdd(t->grunts, g);
insertion_index = sbufElemin(t->grunts) - 1;
}
htInsert(&t->map, hashString(g.id), insertion_index);
}
internal_function
void gruntTrackEnd (Grunt_Tracker *t, Char *grunt_id)
{
U64 hash = hashString(grunt_id);
Size index = htLookup(&t->map, hash);
sbufAdd(t->free_list, index);
free(t->grunts[index].id);
t->grunts[index] = (Grunt){0};
htRemove(&t->map, hash);
}
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Configuration {
Char *kafka_address;
U64 heartbeat_gap;
Sint message_read_gap;
} Configuration;
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wincompatible-pointer-types-discards-qualifiers"
# endif
internal_function
Sint confCallback (void* user, const Char *section,
const Char *name, const Char *value)
{
Configuration* conf = (Configuration*)user;
if (strequal(section, "Grunt")) {
if (strequal(name, "MessageReadGap")) {
conf->message_read_gap = atoi(value);
} else if (strequal(name, "HeartbeatGap")) {
conf->heartbeat_gap = strtoul(value, NULL, 10);
} else {
return 0; /* unknown section/name, error */
}
} else if (strequal(section, "Kafka")) {
if (strequal(name, "Address")) {
conf->kafka_address = strdup(value);
} else {
return 0; /* unknown section/name, error */
}
}
return 1;
}
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
......@@ -3,10 +3,6 @@
* Notice: © Copyright 2020 Naman Dixit
*/
#define KAFKA_ADDRESS "10.129.6.5:9092"
#define GRUNT_READ_MESSAGE_GAP_MS 10
#define GRUNT_HEARTBEAT_TIME_GAP_MS 1000
#include "nlib/nlib.h"
#include <stdio.h>
......@@ -38,6 +34,17 @@
# pragma clang diagnostic pop
# endif
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wreserved-id-macro"
# pragma clang diagnostic ignored "-Wcast-qual"
# endif
#include "inih/ini.h"
#include "inih/ini.c"
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
typedef struct Resources {
Sint memory;
} Resources;
......@@ -49,6 +56,7 @@ typedef struct Command {
#include "kafka.h"
#include "time.c"
#include "conf.c"
#include "command.h"
......@@ -105,6 +113,19 @@ int main(int argc, char** argv)
signal(SIGINT, signalHandlerSIGINT);
Configuration conf = {0};
{ // Default config values
conf.kafka_address = "10.129.6.5:9092";
conf.message_read_gap = 10;
conf.heartbeat_gap = 1000;
}
if (ini_parse("config.ini", confCallback, &conf) < 0) {
printf("Can't load 'config.ini'\n");
return -1;
}
tmCommandInit();
instrumentCommandInit();
......@@ -113,7 +134,7 @@ int main(int argc, char** argv)
Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, KAFKA_ADDRESS);
kafka.writer = kafkaCreateWriter(&kafka, conf.kafka_address);
#define CREATE_TOPIC(s) \
do { \
......@@ -133,7 +154,7 @@ int main(int argc, char** argv)
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, KAFKA_ADDRESS);
kafka.reader = kafkaCreateReader(&kafka, conf.kafka_address);
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
......@@ -186,7 +207,7 @@ int main(int argc, char** argv)
while (global_keep_running) {
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader,
GRUNT_READ_MESSAGE_GAP_MS);
conf.message_read_gap);
B32 command_found = false;
Command c = {0};
......@@ -329,7 +350,7 @@ int main(int argc, char** argv)
time_begin = time_new;
time_accum += time_passed;
if (time_accum >= GRUNT_HEARTBEAT_TIME_GAP_MS) {
if (time_accum >= conf.heartbeat_gap) {
time_accum = 0;
Char *output = NULL;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment