Commit e349248d authored by Nilanjan Daw's avatar Nilanjan Daw

Merge remote-tracking branch 'origin/master'

parents 936b0965 8172c839
......@@ -4,3 +4,6 @@
[submodule "resource_manager/src/common/nlib"]
path = resource_system/src/common/nlib
url = https://github.com/namandixit/nlib
[submodule "resource_system/src/common/inih"]
path = resource_system/src/common/inih
url = https://github.com/benhoyt/inih
[Kafka]
Address = 10.129.6.5:9092
[Arbiter]
MessageReadGap = 10
GruntTimeToDie = 10000
GruntResponseWaitTime = 100
[Grunt]
MessageReadGap = 10
HeartbeatGap = 1000
\ No newline at end of file
File deleted
......@@ -23,7 +23,7 @@ The Dispatch Manager (DM) sends a request to the Resource Manager (RM), detailin
```javascript
{
"resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"memory": 1024, // in MiB
... // Any other resources
}
......@@ -34,7 +34,7 @@ Format:
```javascript
{
"resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"grunts": [
{ node_id: some unique ID, port: port address}, ...
] // List of machine IDs
......@@ -44,10 +44,13 @@ Format:
Once the runtime entity has been launched (or the launch has failed), the Executor sends back a status message on the `LOG_COMMON` topic.
```javascript
{
"node_id" : "uique-machine-id",
"message_type" : "deployment_launch",
"node_id" : "unique-machine-id",
"entity_id" : "handle for the actual container/VM/etc.",
"entity_type" : "docker/libvirt/etc.",
"resource_id": "logical-entity-id",
"function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"reason": "deployment"/"termination"
"status": true/false // Only valid if reason==deployment
}
......@@ -57,28 +60,31 @@ Instrumentation data is also sent on the `LOG_COMMON` topic. This data is sent f
and whoever needs the data is allowed to read it. Each message is required to have atleast three fields: `node_id`, `resource_id` and `function_id`.
```javascript
{ // Example message from Executor
"node_id" : "uique-machine-id",
"message_type" : "instrumentation",
"node_id" : "unique-machine-id",
"resource_id": "logical-entity-id",
"function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"cpu" : 343, // in MHz
"memory": 534, // in MiB
"network": 234 // in KBps
}
{ // Example message from reverse proxy
"node_id" : "uique-machine-id",
"message_type" : "instrumentation",
"node_id" : "unique-machine-id",
"resource_id": "logical-entity-id",
"function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"average_fn_time" : 23 // in ms
}
{ // Example message from dispatch manager
"node_id" : "uique-machine-id",
"message_type" : "instrumentation",
"node_id" : "unique-machine-id",
"resource_id": "logical-entity-id",
"function_id": "unique-function-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"coldstart_time"
}
```
......@@ -223,7 +229,7 @@ resources being tracked by RDs on each machine. This data is cached by the RM.
```javascript
{
"node_id": "unique-machine-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"memory": 1024, // in MiB
... // Any other resources
}
......@@ -246,7 +252,7 @@ DM on topic `RESPONSE_RM_2_DM`.
```javascript
{
"resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
// "port": 2343 --- NOT IMPLEMENTED YET
"nodes": ["a", "b", ...] // List of unique machine IDs
}
......@@ -258,7 +264,7 @@ Format:
```javascript
{
"resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"memory": 1024, // in MiB
... // Any other resources
}
......@@ -269,7 +275,7 @@ The RDs recieve this message and send back whether on not they satisfy the const
{
"node_id": "unique-machine-id",
"resource_id": "unique-transaction-id",
"timestamp" : "iso-8601-timestamp",
"timestamp" : "time(2) compatible timestamp",
"success" : 0/1 // 0 = fail, 1 = success
}
```
......
......@@ -4,6 +4,7 @@
*/
#define logMessage(s, ...) printf(s "\n", ##__VA_ARGS__)
#define logError(s, ...) fprintf(stderr, s "\n", ##__VA_ARGS__)
#include "nlib/nlib.h"
......@@ -21,83 +22,11 @@
#include <signal.h>
#include <librdkafka/rdkafka.h>
typedef struct Grunt {
Char *id;
Sint memory;
B32 rejoin_asked;
Sint time_to_die; // in ms
} Grunt;
typedef struct Grunt_Survey {
Char **grunt_ids;
U16 *ports;
U64 milli_passed;
U64 milli_last;
Char *resource_id;
} Grunt_Survey;
typedef struct Command {
enum Command_Kind {
Command_NONE,
Command_RESPONSE_ARBITER_2_DM,
Command_REQUEST_ARBITER_2_GRUNT,
Command_REJOIN_ARBITER_2_GRUNT,
} kind;
Char *resource_id;
union {
struct {
Char **grunt_ids;
} res_a2d;
struct {
Sint memory;
} req_a2g;
struct {
Char *grunt_id;
} rejoin_a2g;
};
} Command;
typedef struct Grunt_Tracker {
Grunt *grunts;
Size *free_list;
Hash_Table map;
} Grunt_Tracker;
internal_function
void gruntTrackBegin (Grunt_Tracker *t, Grunt g)
{
if (t->grunts == NULL) {
t->map = htCreate(0);
sbufAdd(t->grunts, (Grunt){0}); // SInce 0 index out of hash table will be invalid
}
Size insertion_index = 0;
if (sbufElemin(t->free_list) > 0) {
t->grunts[t->free_list[0]] = g;
insertion_index = t->free_list[0];
sbufUnsortedRemove(t->free_list, 0);
} else {
sbufAdd(t->grunts, g);
insertion_index = sbufElemin(t->grunts) - 1;
}
htInsert(&t->map, hashString(g.id), insertion_index);
}
internal_function
void gruntTrackEnd (Grunt_Tracker *t, Char *grunt_id)
{
Size index = htLookup(&t->map, hashString(grunt_id));
sbufAdd(t->free_list, index);
free(t->grunts[index].id);
t->grunts[index] = (Grunt){0};
htRemove(&t->map, index);
}
#include "kafka.h"
#include "time.c"
#include "conf.c"
#include "command.c"
#include "grunt_manager.c"
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
......@@ -110,8 +39,16 @@ void gruntTrackEnd (Grunt_Tracker *t, Char *grunt_id)
# pragma clang diagnostic pop
# endif
#include "kafka.h"
#include "time.c"
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wreserved-id-macro"
# pragma clang diagnostic ignored "-Wcast-qual"
# endif
#include "inih/ini.h"
#include "inih/ini.c"
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
global_variable volatile sig_atomic_t global_keep_running = 1;
......@@ -129,6 +66,20 @@ Sint main (Sint argc, Char *argv[])
signal(SIGINT, signalHandlerSIGINT);
Configuration conf = {0};
{ // Default config values
conf.kafka_address = "10.129.6.5:9092";
conf.message_read_gap = 10;
conf.grunt_time_to_die = 10000;
conf.grunt_response_wait_time = 100;
}
if (ini_parse("config.ini", confCallback, &conf) < 0) {
printf("Can't load 'config.ini'\n");
return -1;
}
Command *commands = NULL;
Grunt_Tracker gt = {0};
......@@ -136,7 +87,7 @@ Sint main (Sint argc, Char *argv[])
Kafka kafka = {0};
kafkaCreateWriter(&kafka, "10.129.6.5:9092");
kafkaCreateWriter(&kafka, conf.kafka_address);
#define CREATE_TOPIC(s) \
do { \
......@@ -152,11 +103,12 @@ Sint main (Sint argc, Char *argv[])
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("JOIN_ACK_RM_2_RD"); //
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); //
#undef CREATE_TOPIC
kafkaCreateReader(&kafka, "10.129.6.5:9092");
kafkaCreateReader(&kafka, conf.kafka_address);
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
......@@ -173,30 +125,41 @@ Sint main (Sint argc, Char *argv[])
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader,
kafka_reader_topics);
logMessage("Subscription finished\n");
fflush(stdout);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
if (kafka_reader_topics_err) {
fprintf(stderr, "Subscribe failed: %s\n",
logError("Subscribe failed: %s\n",
rd_kafka_err2str(kafka_reader_topics_err));
rd_kafka_destroy(kafka.reader);
return -1;
}
Sint time_of_launch = (Sint)time(0);
U64 time_passed_last = timeMilli();
while (global_keep_running) {
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 100);
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader,
conf.message_read_gap);
U64 time_passed_now = timeMilli();
U64 time_passed = time_passed_now - time_passed_last;
time_passed_last = time_passed_now;
for (Size j = 0; j < gt.map.slot_count; j++) {
if (gt.map.values[j] != 0) {
Grunt g = gt.grunts[gt.map.values[j]];
g.time_to_die -= 100;
if (gt.map.keys[j] != 0) {
gt.grunts[gt.map.values[j]].time_to_die -= time_passed;
}
}
if (kafka_message_read != NULL) {
if (kafka_message_read->err) {
/* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n",
logError("Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read));
} else {
/* Proper message */
......@@ -210,13 +173,20 @@ Sint main (Sint argc, Char *argv[])
const char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(kafka_message_read->payload, &json_error, true);
if ((cJSON_GetObjectItem(root, "timestamp") == NULL) ||
(cJSON_GetObjectItem(root, "timestamp")->valueint) < time_of_launch) {
logMessage("Ignoring : %s\n", kafka_message_read->payload);
cJSON_Delete(root);
rd_kafka_message_destroy(kafka_message_read);
continue;
}
if (kafka_message_read->rkt == topic_req_dm2a) {
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = strdup(cJSON_GetObjectItem(root, "resource_id")->valuestring);
// TODO(naman): Add any new resource fields here
Sint memory = cJSON_GetObjectItem(root, "memory")->valueint;
logMessage("Request DM2RM:\tid: %s = ([memory] = %d)",
c.resource_id, memory);
......@@ -245,20 +215,24 @@ Sint main (Sint argc, Char *argv[])
}
} else if (kafka_message_read->rkt == topic_join_g2a) {
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
Grunt grunt = {.id = id, .time_to_die = 1000};
Grunt grunt = {.id = strdup(id), .time_to_die = conf.grunt_time_to_die};
logMessage("Join G2A:\tid: %s", id);
if (htLookup(&gt.map, hashString(id)) == 0) {
gruntTrackBegin(&gt, grunt);
}
Command c = {.kind = Command_JOIN_ACK_ARBITER_2_GRUNT,
.join_ack_a2g.grunt_id = strdup(id)};
sbufAdd(commands, c);
} else if (kafka_message_read->rkt == topic_beat_g2a) {
Char *id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *id = strdup(cJSON_GetObjectItem(root, "node_id")->valuestring);
U64 index = htLookup(&gt.map, hashString(id));
if (index != 0) { // Prevent any left over message
// TODO(naman): Add any new resource fields here
gt.grunts[index].time_to_die = 1000;
gt.grunts[index].time_to_die = conf.grunt_time_to_die;
gt.grunts[index].memory = cJSON_GetObjectItem(root, "memory")->valueint;
logMessage("Beat G2A:\tid: %s (Memory %d)", id, gt.grunts[index].memory);
......@@ -269,7 +243,7 @@ Sint main (Sint argc, Char *argv[])
Command c = {.kind = Command_REJOIN_ARBITER_2_GRUNT,
.rejoin_a2g.grunt_id = id};
sbufAdd(commands, c);
logMessage("Beat G2A:\tid: %s (UNNOWN)", id);
logMessage("Beat G2A:\tid: %s (UNNOWN)", c.rejoin_a2g.grunt_id);
}
} else if (kafka_message_read->rkt == topic_res_g2a) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
......@@ -292,7 +266,7 @@ Sint main (Sint argc, Char *argv[])
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
Char *function_id = cJSON_GetObjectItem(root, "function_id")->valuestring;
printf("%s\n", cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
logMessage("Log: %s\n", cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
unused_variable(node_id);
unused_variable(resource_id);
unused_variable(function_id);
......@@ -311,6 +285,7 @@ Sint main (Sint argc, Char *argv[])
Size index = gt.map.values[j];
Grunt g = gt.grunts[index];
if (g.time_to_die <= 0) {
logMessage("Deleting grunt: %s\n", g.id);
gruntTrackEnd(&gt, g.id);
}
}
......@@ -323,7 +298,7 @@ Sint main (Sint argc, Char *argv[])
U64 milli_new = timeMilli();
gs->milli_passed += milli_new - gs->milli_last;
gs->milli_last = milli_new;
if (gs->milli_passed >= 1000) {
if (gs->milli_passed >= conf.grunt_response_wait_time) {
Command c = {.kind = Command_RESPONSE_ARBITER_2_DM};
c.resource_id = gs->resource_id;
......@@ -347,6 +322,8 @@ Sint main (Sint argc, Char *argv[])
for (Size j = 0; j < sbufElemin(commands); j++) {
Command c = commands[j];
Sint timestamp = (Sint)time(0);
Char *output = NULL;
Char *topic = NULL;
......@@ -354,12 +331,14 @@ Sint main (Sint argc, Char *argv[])
topic = "REQUEST_RM_2_RD";
sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"memory\": %d\n", c.req_a2g.memory);
sbufPrint(output, ",\n\"memory\": %d", c.req_a2g.memory);
sbufPrint(output, ",\n\"timestamp\": %d", timestamp);
sbufPrint(output, "\n}\n");
} else if (c.kind == Command_RESPONSE_ARBITER_2_DM) {
topic = "RESPONSE_RM_2_DM";
sbufPrint(output, "{\n\"resource_id\": \"%s\"", c.resource_id);
sbufPrint(output, ",\n\"timestamp\": %d", timestamp);
sbufPrint(output, ",\n\"nodes\": [");
for (Size k = 0; k < sbufElemin(c.res_a2d.grunt_ids); k++) {
sbufPrint(output, "\"%s\"", c.res_a2d.grunt_ids[k]);
......@@ -369,11 +348,22 @@ Sint main (Sint argc, Char *argv[])
}
sbufPrint(output, "]");
sbufPrint(output, "\n}");
} else if (c.kind == Command_JOIN_ACK_ARBITER_2_GRUNT) {
topic = "JOIN_ACK_RM_2_RD";
sbufPrint(output, "{\n\"node_id\": \"%s\"", c.join_ack_a2g.grunt_id);
sbufPrint(output, ",\n\"timestamp\": %d", timestamp);
sbufPrint(output, "\n}");
free(c.join_ack_a2g.grunt_id);
} else if (c.kind == Command_REJOIN_ARBITER_2_GRUNT) {
topic = "REJOIN_RM_2_RD";
sbufPrint(output, "{\n\"node_id\": \"%s\"", c.rejoin_a2g.grunt_id);
sbufPrint(output, ",\n\"timestamp\": %d", timestamp);
sbufPrint(output, "\n}");
free(c.rejoin_a2g.grunt_id);
}
if (output != NULL) {
......
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Command {
enum Command_Kind {
Command_NONE,
Command_RESPONSE_ARBITER_2_DM,
Command_REQUEST_ARBITER_2_GRUNT,
Command_JOIN_ACK_ARBITER_2_GRUNT,
Command_REJOIN_ARBITER_2_GRUNT,
} kind;
Char *resource_id;
union {
struct {
Char **grunt_ids;
} res_a2d;
struct {
Sint memory;
} req_a2g;
struct {
Char *grunt_id;
} rejoin_a2g;
struct {
Char *grunt_id;
} join_ack_a2g;
};
} Command;
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Configuration {
Char *kafka_address;
U64 grunt_response_wait_time;
Sint message_read_gap;
Sint grunt_time_to_die;
} Configuration;
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wincompatible-pointer-types-discards-qualifiers"
# endif
internal_function
Sint confCallback (void* user, const Char *section,
const Char *name, const Char *value)
{
Configuration* conf = (Configuration*)user;
if (strequal(section, "Arbiter")) {
if (strequal(name, "MessageReadGap")) {
conf->message_read_gap = atoi(value);
} else if (strequal(name, "GruntTimeToDie")) {
conf->grunt_time_to_die = atoi(value);
} else if (strequal(name, "GruntResponseWaitTime")) {
conf->grunt_response_wait_time = strtoul(value, NULL, 10);
} else {
return 0; /* unknown section/name, error */
}
} else if (strequal(section, "Kafka")) {
if (strequal(name, "Address")) {
conf->kafka_address = strdup(value);
} else {
return 0; /* unknown section/name, error */
}
}
return 1;
}
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Grunt {
Char *id;
Sint memory;
B32 rejoin_asked;
Sint time_to_die; // in ms
} Grunt;
typedef struct Grunt_Survey {
Char **grunt_ids;
U16 *ports;
U64 milli_passed;
U64 milli_last;
Char *resource_id;
} Grunt_Survey;
typedef struct Grunt_Tracker {
Grunt *grunts;
Size *free_list;
Hash_Table map;
} Grunt_Tracker;
internal_function
void gruntTrackBegin (Grunt_Tracker *t, Grunt g)
{
if (t->grunts == NULL) {
t->map = htCreate(0);
sbufAdd(t->grunts, (Grunt){0}); // SInce 0 index out of hash table will be invalid
}
Size insertion_index = 0;
if (sbufElemin(t->free_list) > 0) {
t->grunts[t->free_list[0]] = g;
insertion_index = t->free_list[0];
sbufUnsortedRemove(t->free_list, 0);
} else {
sbufAdd(t->grunts, g);
insertion_index = sbufElemin(t->grunts) - 1;
}
htInsert(&t->map, hashString(g.id), insertion_index);
}
internal_function
void gruntTrackEnd (Grunt_Tracker *t, Char *grunt_id)
{
U64 hash = hashString(grunt_id);
Size index = htLookup(&t->map, hash);
sbufAdd(t->free_list, index);
free(t->grunts[index].id);
t->grunts[index] = (Grunt){0};
htRemove(&t->map, hash);
}
Subproject commit 351217124ddb3e3fe2b982248a04c672350bb0af
/*
* Creator: Naman Dixit
* Notice: © Copyright 2020 Naman Dixit
*/
typedef struct Configuration {
Char *kafka_address;
U64 heartbeat_gap;
Sint message_read_gap;
} Configuration;
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wincompatible-pointer-types-discards-qualifiers"
# endif
internal_function
Sint confCallback (void* user, const Char *section,
const Char *name, const Char *value)
{
Configuration* conf = (Configuration*)user;
if (strequal(section, "Grunt")) {
if (strequal(name, "MessageReadGap")) {
conf->message_read_gap = atoi(value);
} else if (strequal(name, "HeartbeatGap")) {
conf->heartbeat_gap = strtoul(value, NULL, 10);
} else {
return 0; /* unknown section/name, error */
}
} else if (strequal(section, "Kafka")) {
if (strequal(name, "Address")) {
conf->kafka_address = strdup(value);
} else {
return 0; /* unknown section/name, error */
}
}
return 1;
}
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
......@@ -34,6 +34,17 @@
# pragma clang diagnostic pop
# endif
# if defined(COMPILER_CLANG)
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wreserved-id-macro"
# pragma clang diagnostic ignored "-Wcast-qual"
# endif
#include "inih/ini.h"
#include "inih/ini.c"
# if defined(COMPILER_CLANG)
# pragma clang diagnostic pop
# endif
typedef struct Resources {
Sint memory;
} Resources;
......@@ -45,6 +56,7 @@ typedef struct Command {
#include "kafka.h"
#include "time.c"
#include "conf.c"
#include "command.h"
......@@ -55,17 +67,8 @@ typedef struct Thread_Manager_Command {
Thread_Manager_Command_DOCKER_DESTROY,
} kind;
Char *id;
union {
struct {
Sint placeholder;
} docker_create;
struct {
Sint placeholder;
} docker_destroy;
};
Char *entity_id;
Char *resource_id;
} Thread_Manager_Command;
typedef struct JSON_Print_Command {
......@@ -110,6 +113,19 @@ int main(int argc, char** argv)
signal(SIGINT, signalHandlerSIGINT);
Configuration conf = {0};
{ // Default config values
conf.kafka_address = "10.129.6.5:9092";
conf.message_read_gap = 10;
conf.heartbeat_gap = 1000;
}
if (ini_parse("config.ini", confCallback, &conf) < 0) {
printf("Can't load 'config.ini'\n");
return -1;
}
tmCommandInit();
instrumentCommandInit();
......@@ -118,7 +134,7 @@ int main(int argc, char** argv)
Kafka kafka = {0};
kafka.writer = kafkaCreateWriter(&kafka, "10.129.6.5:9092");
kafka.writer = kafkaCreateWriter(&kafka, conf.kafka_address);
#define CREATE_TOPIC(s) \
do { \
......@@ -134,23 +150,34 @@ int main(int argc, char** argv)
CREATE_TOPIC("RESPONSE_RD_2_RM"); //
CREATE_TOPIC("JOIN_RD_2_RM"); //
CREATE_TOPIC("HEARTBEAT_RD_2_RM"); //
CREATE_TOPIC("JOIN_ACK_RM_2_RD"); //
CREATE_TOPIC("REJOIN_RM_2_RD"); //
CREATE_TOPIC("LOG_COMMON"); //
kafka.reader = kafkaCreateReader(&kafka, "10.129.6.5:9092");
kafka.reader = kafkaCreateReader(&kafka, conf.kafka_address);
rd_kafka_topic_partition_list_t *kafka_reader_topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_t *topic_req_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REQUEST_RM_2_RD");
rd_kafka_topic_t *topic_jac_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"JOIN_ACK_RM_2_RD");
rd_kafka_topic_t *topic_rej_a2g = kafkaSubscribe(&kafka, kafka_reader_topics,
"REJOIN_RM_2_RD");
rd_kafka_topic_t *topic_log = kafkaSubscribe(&kafka, kafka_reader_topics,
"LOG_COMMON");
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader, kafka_reader_topics);
rd_kafka_resp_err_t kafka_reader_topics_err = rd_kafka_subscribe(kafka.reader,
kafka_reader_topics);
printf("Subscription finished\n");
fflush(stdout);
rd_kafka_topic_partition_list_destroy(kafka_reader_topics);
printf("Partition list destroyed\n");
fflush(stdout);
if (kafka_reader_topics_err) {
fprintf(stderr, "Subscribe failed: %s\n",
rd_kafka_err2str(kafka_reader_topics_err));
......@@ -158,30 +185,34 @@ int main(int argc, char** argv)
return -1;
}
{
Sint timestamp = (Sint)time(0);
Char *join_msg = NULL;
sbufPrint(join_msg, "{\"node_id\": \"%s\"", node_name);
sbufPrint(join_msg, ",\n\"timestamp\": %d", timestamp);
sbufPrint(join_msg, "\n}\n");
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) {
return -1;
}
}
B32 join_successful = false;
U64 time_begin = timeMilli();
U64 time_accum = 0;
Sint time_of_launch = (Sint)time(0);
while (global_keep_running) {
// NOTE(naman): Get the fd's that are ready
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 100);
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader,
conf.message_read_gap);
B32 command_found = false;
Command c = {0};
if (kafka_message_read != NULL) {
if (kafka_message_read->err) {
/* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read));
} else if (kafka_message_read->rkt == topic_req_a2g) {
fprintf(stderr,
"Received message on %s [%d] "
"at offset %"PRId64": \n%s\n",
......@@ -194,6 +225,22 @@ int main(int argc, char** argv)
const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true);
if ((cJSON_GetObjectItem(root, "timestamp") == NULL) ||
(cJSON_GetObjectItem(root, "timestamp")->valueint) < time_of_launch) {
printf("Ignoring : %s\n", buffer);
cJSON_Delete(root);
rd_kafka_message_destroy(kafka_message_read);
continue;
}
if (kafka_message_read->err) {
/* Consumer error: typically just informational. */
fprintf(stderr, "Consumer error: %s\n",
rd_kafka_message_errstr(kafka_message_read));
} else if (kafka_message_read->rkt == topic_jac_a2g) {
join_successful = true;
} else if (join_successful) {
if (kafka_message_read->rkt == topic_req_a2g) {
if (root == NULL) {
// TODO(naman): Error
} else {
......@@ -202,41 +249,55 @@ int main(int argc, char** argv)
c.res.memory = cJSON_GetObjectItem(root, "memory")->valueint;
}
} else if (kafka_message_read->rkt == topic_rej_a2g) {
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", join_msg)) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
if (strequal(node_id, node_name)) {
join_successful = false;
Sint timestamp = (Sint)time(0);
Char *rejoin_msg = NULL;
sbufPrint(rejoin_msg, "{\"node_id\": \"%s\"", node_name);
sbufPrint(rejoin_msg, ",\n\"timestamp\": %d", timestamp);
sbufPrint(rejoin_msg, "\n}\n");
if (!kafkaWrite(kafka.writer, "JOIN_RD_2_RM", "resource_daemon", rejoin_msg)) {
return -1;
}
}
} else if (kafka_message_read->rkt == topic_log) {
fprintf(stderr,
"Received message on %s [%d] "
"at offset %"PRId64": \n%s\n",
rd_kafka_topic_name(kafka_message_read->rkt),
(int)kafka_message_read->partition, kafka_message_read->offset,
cJSON_Print(cJSON_Parse((char *)kafka_message_read->payload)));
cJSON *msg_type_json = cJSON_GetObjectItem(root, "message_type");
if (msg_type_json == NULL) {
if (strequal(msg_type_json->valuestring, "deployment_launch")) {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
if (strequal(node_id, node_name)) {
Char *resource_id = cJSON_GetObjectItem(root, "resource_id")->valuestring;
Char *entity_id = cJSON_GetObjectItem(root, "entity_id")->valuestring;
Char *entity_type = cJSON_GetObjectItem(root, "entity_type")->valuestring;
char *buffer = (char *)kafka_message_read->payload;
Thread_Manager_Command tmc = {.entity_id = strdup(entity_id),
.resource_id = strdup(resource_id)};
B32 add_command = false;
const Char *json_error = NULL;
cJSON *root = cJSON_ParseWithOpts(buffer, &json_error, true);
if (strequal(entity_type, "docker")) {
tmc.kind = Thread_Manager_Command_DOCKER_CREATE;
add_command = true;
}
if (root == NULL) {
// TODO(naman): Error
} else {
Char *node_id = cJSON_GetObjectItem(root, "node_id")->valuestring;
if (strequal(node_id, node_name)) {
// FIXME(naman): Fix this placeholder
Thread_Manager_Command tmc = {0};
if (add_command) {
tmCommandEnqueue(tmc);
/* "resource_id": "logical-entity-id", */
/* "function_id": "unique-function-id", */
/* "timestamp" : "iso-8601-timestamp", */
/* "reason": "deployment"/"termination", */
/* "status": true/false // Only valid if reason==deployment; */
} else {
free(tmc.entity_id);
free(tmc.resource_id);
}
}
}
}
}
}
rd_kafka_message_destroy(kafka_message_read);
}
if (join_successful) {
int memory = 0;
FILE *meminfo = fopen("/proc/meminfo", "r");
......@@ -252,13 +313,16 @@ int main(int argc, char** argv)
if (command_found) {
Char *output = NULL;
Sint timestamp = (Sint)time(0);
sbufPrint(output, "{\n\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"resource_id\": \"%s\"", c.txn_id);
sbufPrint(output, ",\n\"timestamp\": %d", timestamp);
if (memory >= c.res.memory) {
sbufPrint(output, ",\n\"success\": true\n");
// TODO(naman): Add port
// sbufPrint(output, ",\n\"port\": %d\n", port);
// sbufPrint(output, ",\n\"port\": %d", port);
} else {
sbufPrint(output, ",\n\"success\": false\n");
}
......@@ -286,12 +350,14 @@ int main(int argc, char** argv)
time_begin = time_new;
time_accum += time_passed;
if (time_accum >= 1000) {
if (time_accum >= conf.heartbeat_gap) {
time_accum = 0;
Char *output = NULL;
Sint timestamp = (Sint)time(0);
sbufPrint(output, "{\"node_id\": \"%s\"", node_name);
sbufPrint(output, ",\n\"timestamp\": %d", timestamp);
sbufPrint(output, ",\n\"memory\": %d", memory);
sbufPrint(output, "\n}\n");
......@@ -302,6 +368,7 @@ int main(int argc, char** argv)
}
}
}
}
for (Size i = 0; i < sbufElemin(kafka.topics); i++) {
rd_kafka_topic_destroy(kafka.topics[i]);
......
......@@ -7,12 +7,22 @@ internal_function
noreturn
void* dockerProcessLoop (void *arg)
{
unused_variable(arg);
pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);
while (true) {
// TODO(naman): Get data
Char *data_cmd = NULL;
sbufPrint(data_cmd, "docker stats %s", (Char*)arg);
FILE* data_file = popen(data_cmd, "r");
fseek(data_file, 0, SEEK_END);
long size = ftell(data_file);
fseek(data_file, 0, SEEK_SET);
Char *data = calloc((Size)size + 1, sizeof(*data));
fread(data, 1, (Size)size + 1, data_file);
fclose(data_file);
Char *json = NULL;
Char *output = NULL;
......
......@@ -5,7 +5,8 @@
typedef struct Thread {
pthread_t thread;
Char *id;
Char *entity_id;
Char *resource_id;
} Thread;
typedef struct Thread_Tracker {
......@@ -33,7 +34,7 @@ void threadTrackBegin (Thread_Tracker *t, Thread th)
insertion_index = sbufElemin(t->threads) - 1;
}
htInsert(&t->map, hashString(th.id), insertion_index);
htInsert(&t->map, hashString(th.resource_id), insertion_index);
}
internal_function
......@@ -41,7 +42,8 @@ void threadTrackEnd (Thread_Tracker *t, Char *thread_id)
{
Size index = htLookup(&t->map, hashString(thread_id));
sbufAdd(t->free_list, index);
free(t->threads[index].id);
free(t->threads[index].resource_id);
free(t->threads[index].entity_id);
t->threads[index] = (Thread){0};
htRemove(&t->map, index);
}
......@@ -61,16 +63,18 @@ void* tmProcessLoop (void *arg)
switch (command.kind) {
case Thread_Manager_Command_DOCKER_CREATE: {
pthread_t thread;
pthread_create(&thread, NULL, &dockerProcessLoop, NULL);
threadTrackBegin(&tt, (Thread){.id = command.id, .thread = thread});
pthread_create(&thread, NULL, &dockerProcessLoop, command.entity_id);
threadTrackBegin(&tt, (Thread){.entity_id = command.entity_id,
.resource_id = command.resource_id,
.thread = thread});
} break;
case Thread_Manager_Command_DOCKER_DESTROY: {
Size index = htLookup(&tt.map, hashString(command.id));
Size index = htLookup(&tt.map, hashString(command.resource_id));
pthread_t thread = tt.threads[index].thread;
pthread_cancel(thread);
pthread_join(thread, NULL);
threadTrackEnd(&tt, command.id);
threadTrackEnd(&tt, command.resource_id);
} break;
case Thread_Manager_Command_NONE: {
......
......@@ -85,16 +85,19 @@ int main(int argc, char** argv)
Sint id = (Sint)time(NULL);
sbufPrint(output, "{\n\"resource_id\": \"%d\"", id);
sbufPrint(output, ",\n\"timestamp\": %d", id);
sbufPrint(output, ",\n\"memory\": %d", memory_required);
sbufPrint(output, "\n}\n");
printf("Sending to Arbiter:\n%s\n", cJSON_Print(cJSON_Parse(output)));
printf("%ld\n", time(0));
if (output != NULL) {
if (!kafkaWrite(kafka.writer, "REQUEST_DM_2_RM", "rm_test", output)) {
return -1;
}
}
printf("%ld\n", time(0));
rd_kafka_message_t *kafka_message_read = rd_kafka_consumer_poll(kafka.reader, 10);
while (true) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment