Commit b876af62 authored by Naman Dixit's avatar Naman Dixit

Temporary commit before meeting

parent 30bda1e2
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
#if !defined(COMMAND_H_INCLUDE_GUARD)
#define COMMAND_SYSTEM(req_type_name, req_var_name, req_member_type, \
res_type_name, res_var_name, res_member_type) \
\
typedef struct req_type_name { \
req_member_type member; \
struct req_type_name *next; \
} req_type_name; \
\
global_variable req_type_name *global_##req_var_name##_first; \
global_variable req_type_name *global_##req_var_name##_last; \
global_variable req_type_name *global_##req_var_name##_divider; \
global_variable pthread_mutex_t global_##req_var_name##_lock = PTHREAD_MUTEX_INITIALIZER; \
global_variable pthread_cond_t global_##req_var_name##_cond_var = PTHREAD_COND_INITIALIZER; \
\
internal_function \
void req_var_name##Init (void) \
{ \
global_##req_var_name##_first = calloc(1, sizeof(*global_##req_var_name##_first)); \
global_##req_var_name##_last = global_##req_var_name##_first; \
global_##req_var_name##_divider = global_##req_var_name##_first; \
\
return; \
} \
\
internal_function \
void req_var_name##Enqueue (req_member_type c) \
{ \
global_##req_var_name##_last->next = calloc(1, sizeof(*(global_##req_var_name##_last->next))); \
global_##req_var_name##_last->next->member = c; \
global_##req_var_name##_last = global_##req_var_name##_last->next; \
\
while (global_##req_var_name##_first != global_##req_var_name##_divider) { \
req_type_name *temp = global_##req_var_name##_first; \
global_##req_var_name##_first = global_##req_var_name##_first->next; \
free(temp); \
} \
\
pthread_cond_signal(&global_##req_var_name##_cond_var); \
\
return; \
} \
\
internal_function \
void req_var_name##Dequeue (req_member_type *c, _Atomic B64 *keep_alive) \
{ \
pthread_mutex_lock(&global_##req_var_name##_lock); \
\
while (global_##req_var_name##_divider == global_##req_var_name##_last) { \
if ((keep_alive != NULL) && (atomic_load(keep_alive) == false)) { \
*c = (req_member_type){0}; \
pthread_cond_signal(&global_##req_var_name##_cond_var); \
pthread_mutex_unlock(&global_##req_var_name##_lock); \
return; \
} \
pthread_cond_wait(&global_##req_var_name##_cond_var, &global_##req_var_name##_lock); \
} \
\
*c = global_##req_var_name##_divider->next->member; \
global_##req_var_name##_divider = global_##req_var_name##_divider->next; \
\
pthread_mutex_unlock(&global_##req_var_name##_lock); \
} \
\
\
\
\
typedef struct res_type_name { \
res_member_type member; \
struct res_type_name *next; \
} res_type_name; \
\
global_variable res_type_name *global_##res_var_name##_first; \
global_variable res_type_name *global_##res_var_name##_last; \
global_variable res_type_name *global_##res_var_name##_divider; \
global_variable pthread_mutex_t global_##res_var_name##_lock = PTHREAD_MUTEX_INITIALIZER; \
\
internal_function \
void res_var_name##Init (void) \
{ \
global_##res_var_name##_first = calloc(1, sizeof(*global_##res_var_name##_first)); \
global_##res_var_name##_last = global_##res_var_name##_first; \
global_##res_var_name##_divider = global_##res_var_name##_first; \
} \
\
internal_function \
void res_var_name##Enqueue (res_member_type c) \
{ \
pthread_mutex_lock(&global_##res_var_name##_lock); \
\
global_##res_var_name##_last->next = calloc(1, sizeof(*(global_##res_var_name##_last->next))); \
global_##res_var_name##_last->next->member = c; \
global_##res_var_name##_last = global_##res_var_name##_last->next; \
\
while (global_##res_var_name##_first != global_##res_var_name##_divider) { \
res_type_name *temp = global_##res_var_name##_first; \
global_##res_var_name##_first = global_##res_var_name##_first->next; \
free(temp); \
} \
\
pthread_mutex_unlock(&global_##res_var_name##_lock); \
\
return; \
} \
\
internal_function \
B32 res_var_name##Dequeue (res_member_type *c) \
{ \
if (global_##res_var_name##_divider != global_##res_var_name##_last) { \
*c = global_##res_var_name##_divider->next->member; \
global_##res_var_name##_divider = global_##res_var_name##_divider->next; \
return true; \
} \
\
return false; \
}
#define COMMAND_H_INCLUDE_GUARD
#endif
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <sys/time.h> #include <sys/time.h>
#include <assert.h> #include <assert.h>
#include <signal.h> #include <signal.h>
#include <pthread.h>
#include <librdkafka/rdkafka.h> #include <librdkafka/rdkafka.h>
# if defined(COMPILER_CLANG) # if defined(COMPILER_CLANG)
...@@ -45,7 +46,50 @@ typedef struct Command { ...@@ -45,7 +46,50 @@ typedef struct Command {
#include "kafka.h" #include "kafka.h"
#include "time.c" #include "time.c"
#include "command.h"
typedef struct Thread_Manager_Command {
enum Thread_Manager_Command_Kind {
Thread_Manager_Command_NONE,
Thread_Manager_Command_DOCKER_CREATE,
Thread_Manager_Command_DOCKER_DESTROY,
} kind;
Char *id;
union {
struct {
Sint placeholder;
} docker_create;
struct {
Sint placeholder;
} docker_destroy;
};
} Thread_Manager_Command;
typedef struct JSON_Print_Command {
Char *msg;
Char *topic;
} JSON_Print_Command;
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wextra-semi"
# endif
COMMAND_SYSTEM(Thread_Manager_Request, tmCommand, Thread_Manager_Command,
JSON_Print_Request, instrumentCommand, JSON_Print_Command);
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
global_variable volatile sig_atomic_t global_keep_running = 1; global_variable volatile sig_atomic_t global_keep_running = 1;
global_variable Char *node_name;
#include "instrument_docker.c"
#include "thread_manager.c"
internal_function internal_function
void signalHandlerSIGINT (int _) void signalHandlerSIGINT (int _)
...@@ -56,8 +100,6 @@ void signalHandlerSIGINT (int _) ...@@ -56,8 +100,6 @@ void signalHandlerSIGINT (int _)
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
Char *node_name = NULL;
if (argc > 1) { if (argc > 1) {
node_name = argv[1]; node_name = argv[1];
} else { } else {
...@@ -68,6 +110,9 @@ int main(int argc, char** argv) ...@@ -68,6 +110,9 @@ int main(int argc, char** argv)
signal(SIGINT, signalHandlerSIGINT); signal(SIGINT, signalHandlerSIGINT);
tmCommandInit();
instrumentCommandInit();
Kafka kafka = {0}; Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092"); kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
...@@ -86,6 +131,7 @@ int main(int argc, char** argv) ...@@ -86,6 +131,7 @@ int main(int argc, char** argv)
CREATE_TOPIC("RESPONSE_RD_2_RM"); // CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); // CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); // CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); // CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092"); kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
...@@ -94,7 +140,10 @@ int main(int argc, char** argv) ...@@ -94,7 +140,10 @@ int main(int argc, char** argv)
rd_kafka_topic_t *topic_req_a2g = kafkaSubscribe(&kafka, kafka_reader_topics, rd_kafka_topic_t *topic_req_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REQUEST_RM_2_RD"); "REQUEST_RM_2_RD");
unused_variable(topic_req_a2g); rd_kafka_topic_t *topic_rej_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REJOIN_RM_2_RD");
rd_kafka_topic_t *topic_log = kafkaSubscribe(&kafka, kafka_reader_topics,
"LOG_COMMON");
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics); rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics); rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
...@@ -129,7 +178,7 @@ int main(int argc, char** argv) ...@@ -129,7 +178,7 @@ int main(int argc, char** argv)
/* Consumer error: typically just informational. */ /* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n", fprintf(stderr, "Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read)); rd_kafka_message_errstr(kafka_message_read));
} else { } else if (kafka_message_read->rkt == topic_req_a2g) {
fprintf(stderr, fprintf(stderr,
"Received message on %s [%d] " "Received message on %s [%d] "
"at offset %"PRId64": \n%s\n", "at offset %"PRId64": \n%s\n",
...@@ -149,6 +198,36 @@ int main(int argc, char** argv) ...@@ -149,6 +198,36 @@ int main(int argc, char** argv)
c.txn_id = cJSON_GetObjectItem(root, "resource_id")->valuestring; c.txn_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint; c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint;
} }
} else if (kafka_message_read->rkt == topic_rej_a2g) {
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) {
return -1;
}
} else if (kafka_message_read->rkt == topic_log) {
fprintf(stderr,
"Received message on %s [%d] "
"at offset %"PRId64": \n%s\n",
rd_kafka_topic_name(kafka_message_read->rkt),
(int)kafka_message_read->partition, kafka_message_read->offset,
cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
char *buffer = (char *)kafka_message_read->payload;
const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true);
if (root == NULL) {
// TODO(naman): Error
} else {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
if (streq(node_id, node_name)) {
// FIXME(naman): Fix this placeholder
/* "resource_id": "logical-entity-id", */
/* "function_id": "unique-function-id", */
/* "timestamp" : "iso-8601-timestamp", */
/* "reason": "deployment"/"termination", */
/* "status": true/false // Only valid if reason==deployment; */
}
}
} }
rd_kafka_message_destroy(kafka_message_read); rd_kafka_message_destroy(kafka_message_read);
} }
......
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
internal_function
noreturn
void* dockerProcessLoop (void *arg)
{
unused_variable(arg);
pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);
while (true) {
// TODO(naman): Get data
Char *json = NULL;
Char *output = NULL;
sbufPrint(output, "{\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"type\": \"%s\"", "docker");
sbufPrint(output, ",\n\"data\": %s", json ? json : "{}");
sbufPrint(output, "\n}\n");
JSON_Print_Command jpc = {.msg = output,
.topic = "LOG_CHANNEL"};
U64 time_before = timeMilli();
instrumentCommandEnqueue(jpc);
U64 time_after = timeMilli();
if ((time_after - time_before) < 1000) {
sleep((Uint)(time_after - time_before));
}
}
}
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Thread {
pthread_t thread;
Char *id;
} Thread;
typedef struct Thread_Tracker {
Thread *threads;
Size *free_list;
Hash_Table map;
} Thread_Tracker;
internal_function
void threadTrackBegin (Thread_Tracker *t, Thread th)
{
if (t->threads == NULL) {
t->map = htCreate(0);
sbufAdd(t->threads, (Thread){0}); // SInce 0 index out of hash table will be invalid
}
Size insertion_index = 0;
if (sbufElemin(t->free_list) > 0) {
t->threads[t->free_list[0]] = th;
insertion_index = t->free_list[0];
sbufUnsortedRemove(t->free_list, 0);
} else {
sbufAdd(t->threads, th);
insertion_index = sbufElemin(t->threads) - 1;
}
htInsert(&t->map, hashString(th.id), insertion_index);
}
internal_function
void threadTrackEnd (Thread_Tracker *t, Char *thread_id)
{
Size index = htLookup(&t->map, hashString(thread_id));
sbufAdd(t->free_list, index);
free(t->threads[index].id);
t->threads[index] = (Thread){0};
htRemove(&t->map, index);
}
internal_function
void* tmProcessLoop (void *arg)
{
unused_variable(arg);
Thread_Tracker tt = {0};
while (true) {
Thread_Manager_Command command = {0};
tmCommandDequeue(&command, NULL);
switch (command.kind) {
case Thread_Manager_Command_DOCKER_CREATE: {
pthread_t thread;
pthread_create(&thread, NULL, &dockerProcessLoop, NULL);
threadTrackBegin(&tt, (Thread){.id = command.id, .thread = thread});
} break;
case Thread_Manager_Command_DOCKER_DESTROY: {
Size index = htLookup(&tt.map, hashString(command.id));
pthread_t thread = tt.threads[index].thread;
pthread_cancel(thread);
pthread_join(thread, NULL);
threadTrackEnd(&tt, command.id);
} break;
case Thread_Manager_Command_NONE: {
} break;
}
}
}
...@@ -96,20 +96,25 @@ int main(int argc, char** argv) ...@@ -96,20 +96,25 @@ int main(int argc, char** argv)
} }
} }
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 0); rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 10);
while (true) { while (true) {
if (kafka_message_read != NULL) { if (kafka_message_read != NULL) {
const Char *json_error = NULL; const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts((char *)kafka_message_read->payload, &json_error, true); cJSON *root = cJSON_ParseWithOpts((char *)kafka_message_read->payload, &json_error, true);
Sint id_now = atoi(cJSON_GetObjectItem(root, "id")->valuestring); Sint id_now = 0;
if (cJSON_GetObjectItem(root, "resource_id") == NULL) {
goto skip_message;
}
id_now = atoi(cJSON_GetObjectItem(root, "resource_id")->valuestring);
if (id_now == id) { if (id_now == id) {
break; break;
} else { } else {
skip_message:
printf("Found a cranky old message: %d\n", id_now); printf("Found a cranky old message: %d\n", id_now);
rd_kafka_message_destroy(kafka_message_read); rd_kafka_message_destroy(kafka_message_read);
} }
} }
kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 0); kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 10);
} }
if (kafka_message_read != NULL) { if (kafka_message_read != NULL) {
...@@ -143,5 +148,15 @@ int main(int argc, char** argv) ...@@ -143,5 +148,15 @@ int main(int argc, char** argv)
rd_kafka_message_destroy(kafka_message_read); rd_kafka_message_destroy(kafka_message_read);
} }
for (Size i = 0; i < sbufElemin(kafka.topics); i++) {
rd_kafka_topic_destroy(kafka.topics[i]);
}
rd_kafka_consumer_close(kafka.reader);
rd_kafka_destroy(kafka.reader);
for (Size i = 0; i < sbufElemin(kafka.queues); i++) {
rd_kafka_queue_destroy(kafka.queues[i]);
}
rd_kafka_destroy(kafka.writer);
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment