Commit d652def6 authored by Naman Dixit's avatar Naman Dixit

Done, hopefully

parents
server/data_store
server/server
client/client
Team members:
Naman Dixit
19305R005
namandixit@cse.iitb.ac.in
Varad Bhatnagar
19305R005
varadhbhatnagar@cse.iitb.ac.in
Instructions:
Server:
Compilation: Run `make` in "server/" directory.
Execution: Run `./server` in "server/" directory.
It supports the following command line parameters:
1. -port=XXX (the port number on which the server
will listen)
2. -threadPoolSize=YYY (number of threads in the
thread pool)
3. -numSetsInCache=ZZZ (number of sets in the cache)
4. -sizeOfSet=AAA (associativity of sets)
Storage: The server stores the data in "${PWD}/data_store/" directory.
A file of special note is "${PWD}/data_store/kvstore.xml" which
contains an XML dump of the stored key-value pairs. This is the
file to be used for grading, and is generated in a lazy fashion
as instructed here:
moodle.iitb.ac.in/mod/forum/discuss.php?d=125057#p191609
Logs: When the server start, it prints a log message:
Log: Waiting for connection on port <port>...
When a new connection is established, it prints:
Log: Connection made: client_fd=<client_fd>
If there are any errors, it will quit after printing a message
prefixed with "Error:".
Client:
Compilation: Run `make` in "client/" directory.
Execution: The client can be launched inside "client/" directory
in either:
1. interactive mode: `./client -i`
2. batch mode: `./client <input_file> <output_file>`
It supports the following command line parameters:
1. -i (run in interactive mode)
2. -port=XXX (the port number on which the client
will connect)
Logs: When the server start, it prints a log message:
Log: Starting communication with server on port <port>...
If there are any errors, it will quit after printing a message
prefixed with "Error:". If wrong command line parameters are
given, it quits after printing a help message prefixed with
"Help:".
ClientSource := client.c
ClientTarget := client
.PHONY: all ClientTarget
all: ClientTarget
ClientTarget:
@echo "Building client..."
@gcc -g3 -O0 -fno-strict-aliasing -fwrapv -msse2 -I../common \
--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -D_POSIX_C_SOURCE=200809L \
-Wall -Wextra -Wpedantic -pedantic-errors -Werror \
$(ClientSource) -o ${ClientTarget} \
-Wl,-rpath=\$$ORIGIN -Wl,-z,origin -Wl,--enable-new-dtags \
-static-libgcc
/*
* Creator: Naman Dixit
* Notice: © Copyright 2019 Naman Dixit
*/
#include "aux.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <ctype.h>
#if 0
# define log(...) printf("Log: " __VA_ARGS__)
#else
# define log(...)
#endif
#include "xml.h"
Char* tokenGet(Char *line, Char separator, Size *position)
{
Char *elem = NULL;
Size i = *position;
Size line_len = strlen(line);
while ((i < line_len) && (line[i] != '\n') && (line[i] != separator)) {
sbufPrint(elem, "%c", line[i]);
i++;
}
sbufPrint(elem, "%c", '\0');
*position = i + 1;
return elem;
}
int main(int argc, char** argv)
{
B32 interactive_mode = false;
Char *port = NULL;
for (Size i = 1; i < (Size)argc; i++) {
Size arg_pos = 0;
if ((arg_pos = strprefix("-i", argv[i]))) {
interactive_mode = true;
} else if ((arg_pos = strprefix("--interactive", argv[i]))) {
interactive_mode = true;
} else if ((arg_pos = strprefix("-port=", argv[i]))) {
port = &(argv[i][arg_pos]);
}
}
if (port == NULL) port = "8080";
FILE *file_input = NULL, *file_output = NULL;
if (interactive_mode == false) {
if (argc != 3) {
printf("Help: Incorrect command line parameters\n");
printf("Help: See README.txt file for info on correct invocation\n");
exit(-1);
}
Char *file_input_name = argv[1];
Char *file_output_name = argv[2];
if((file_input = fopen(file_input_name, "r")) == NULL) {
fprintf(stderr, "Error: The input file %s cannot be opened\n", file_input_name);
exit(-1);
}
if((file_output = fopen(file_output_name, "w")) == NULL) {
fprintf(stderr, "Error: The output file %s cannot be opened\n", file_output_name);
exit(-1);
}
} else {
file_input = stdin;
file_output = stdout;
}
Sint sock_fd = socket(AF_INET, SOCK_STREAM, 0);
struct addrinfo hints = {.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM};
struct addrinfo *result = NULL;
Sint s = getaddrinfo(NULL, "8080", &hints, &result);
if (s != 0) {
fprintf(stderr, "Error: getaddrinfo: %s\n", gai_strerror(s));
exit(-1);
}
while (connect(sock_fd, result->ai_addr, result->ai_addrlen) == -1) {
fprintf(stderr, "Error: Couldn't connect on port %s, trying again in one second...\n", port);
sleep(1);
}
printf("Log: Starting communication with server on port %s...\n", port);
while (true) {
if (interactive_mode == true) {
printf("===QUERY: ");
}
Char *command = NULL;
{ // Read command
Sint c = 0;
while (((c = getc(file_input)) != '\n') && (c != EOF)) {
sbufPrint(command, "%c", (Char)c);
}
if (interactive_mode && (c == EOF)) {
printf("\n");
break;
}
if (command == NULL) {
break;
}
sbufPrint(command, "%c", '\0');
}
if (interactive_mode == true) {
printf("RESPONSE: ");
}
Char *xml_request_message = NULL;
// Response message will be of proper legth, since server will make sure of that.
Size response_size = KiB(300);
Char *xml_response_message = calloc(response_size, sizeof(*xml_response_message));
Size index = 0;
Char *query_type = tokenGet(command, ',', &index);
Char *query_key = tokenGet(command, ',', &index);
if(strcmp(query_type, "GET") == 0) {
xml_request_message = xmlCreateMessage(XML_Message_Kind_GET,
query_key, NULL, NULL);
write(sock_fd, xml_request_message, strlen(xml_request_message));
read(sock_fd, xml_response_message, response_size);
} else if(strcmp(query_type, "SET") == 0) {
Char *query_value = tokenGet(command, '\n', &index);
xml_request_message = xmlCreateMessage(XML_Message_Kind_PUT,
query_key, query_value, NULL);
sbufDelete(query_value);
write(sock_fd, xml_request_message, strlen(xml_request_message));
read(sock_fd, xml_response_message, response_size);
} else if(strcmp(query_type,"DEL") == 0) {
xml_request_message = xmlCreateMessage(XML_Message_Kind_DELETE,
query_key, NULL, NULL);
write(sock_fd, xml_request_message, strlen(xml_request_message));
read(sock_fd, xml_response_message, response_size);
}
sbufDelete(query_type);
sbufDelete(query_key);
XML_Message msg = xmlParseMessage(xml_response_message);
switch (msg.kind) {
case XML_Message_Kind_RESP_GET: {
fprintf(file_output, "%s,%s\n", msg.key, msg.value);
} break;
case XML_Message_Kind_RESP_PUT_DELETE: {
fprintf(file_output, "success\n");
} break;
case XML_Message_Kind_RESP_ERROR: {
fprintf(file_output, "error,%s\n", msg.error);
} break;
default: {
} break;
}
sbufDelete(xml_request_message);
free(xml_response_message);
free(msg.key);
free(msg.value);
free(msg.error);
}
return 0;
}
This diff is collapsed.
This diff is collapsed.
ServerSource := server.c
ServerTarget := server
.PHONY: all ServerTarget
all: ServerTarget
ServerTarget:
@echo "Building server..."
@gcc -g3 -O0 -fno-strict-aliasing -fwrapv -msse2 -I../common \
--std=c11 -DBUILD_INTERNAL -DBUILD_SLOW -D_POSIX_C_SOURCE=200809L -D_DEFAULT_SOURCE \
-Wall -Wextra -Wpedantic -pedantic-errors -Werror \
$(ServerSource) -o $(ServerTarget) \
-Wl,-rpath=\$$ORIGIN -Wl,-z,origin -Wl,--enable-new-dtags \
-static-libgcc -pthread
typedef struct Cache_Slot {
Char *key;
Char *value;
B8 valid;
B8 chance;
} Cache_Slot;
typedef struct Cache_Bucket {
pthread_mutex_t lock;
Cache_Slot *slots;
} Cache_Bucket;
typedef struct Cache {
Size bucket_count;
Size slot_count;
Size associativity;
Cache_Bucket buckets[];
} Cache;
global_variable Cache *global_cache;
internal_function
void cacheInit (Size bucket_count, Size associativity)
{
Size cache_size = sizeof(*global_cache) + (bucket_count * sizeof(global_cache->buckets[0]));
global_cache = malloc(cache_size);
memset(global_cache, 0, cache_size);
for (Size i = 0; i < bucket_count; i++) {
global_cache->buckets[i].slots = calloc(associativity, sizeof(global_cache->buckets[i].slots[0]));
pthread_mutex_init(&(global_cache->buckets[i].lock), NULL);
}
global_cache->bucket_count = bucket_count;
global_cache->associativity = associativity;
global_cache->slot_count = bucket_count * associativity;
}
internal_function
Size cacheGetBucket (U64 hash)
{
return hash >> (64 - u64Log2(global_cache->bucket_count));
}
internal_function
void cacheInvalidateSlot (Size bucket, Size slot)
{
pthread_mutex_lock(&(global_cache->buckets[bucket].lock));
global_cache->buckets[bucket].slots[slot].valid = false;
free(global_cache->buckets[bucket].slots[slot].key);
free(global_cache->buckets[bucket].slots[slot].value);
global_cache->buckets[bucket].slots[slot].key = NULL;
global_cache->buckets[bucket].slots[slot].value = NULL;
pthread_mutex_unlock(&(global_cache->buckets[bucket].lock));
}
internal_function
Cache_Slot cacheGetSlot (Size bucket, Size slot)
{
Cache_Slot result = {0};
if (global_cache->buckets[bucket].slots[slot].valid) {
result = global_cache->buckets[bucket].slots[slot];
}
return result;
}
internal_function
Char* cacheGetKV (U64 hash, Char *key, Size *slotptr)
{
Size bucket = cacheGetBucket(hash);
for (Size i = 0; i < global_cache->associativity; i++) {
Cache_Slot cs = cacheGetSlot(bucket, i);
if (cs.valid && (strcmp(cs.key, key) == 0)) {
if (slotptr != NULL) {
*slotptr = i;
}
return cs.value;
}
}
return NULL;
}
internal_function
void cacheSetKV (U64 hash, Char *key, Char *value)
{
Size bucket = cacheGetBucket(hash);
pthread_mutex_lock(&(global_cache->buckets[bucket].lock));
Cache_Slot *slots = global_cache->buckets[bucket].slots;
// If data already present in cache
for (Size i = 0; i < global_cache->associativity; i++) {
if(slots[i].valid && (strcmp(slots[i].key, key) == 0)) {
free(slots[i].value);
slots[i].value = strdup(value);
slots[i].chance = true;
pthread_mutex_unlock(&(global_cache->buckets[bucket].lock));
return;
}
}
// Else if, there is an empty slot
for (Size i = 0; i < global_cache->associativity; i++) {
if (slots[i].valid == false) {
free(slots[i].key);
slots[i].key = strdup(key);
free(slots[i].value);
slots[i].value = strdup(value);
slots[i].valid = true;
slots[i].chance = true;
pthread_mutex_unlock(&(global_cache->buckets[bucket].lock));
return;
}
}
// Else, evict using second chance algorithm
while (true) {
for (Size i = 0; i < global_cache->associativity; i++) {
if(slots[i].chance == false) {
free(slots[i].key);
slots[i].key = strdup(key);
free(slots[i].value);
slots[i].value = strdup(value);
slots[i].valid = true;
slots[i].chance = true;
pthread_mutex_unlock(&(global_cache->buckets[bucket].lock));
return;
} else {
slots[i].chance = false;
}
}
}
pthread_mutex_unlock(&(global_cache->buckets[bucket].lock));
}
void cacheClear (void)
{
for (Size i = 0; i < global_cache->bucket_count; i++) {
for (Size j = 0; j < global_cache->associativity; j++) {
free(global_cache->buckets[i].slots[j].key);
free(global_cache->buckets[i].slots[j].value);
global_cache->buckets[i].slots[j].valid = false;
global_cache->buckets[i].slots[j].chance = false;
}
}
}
typedef struct Command {
struct Command *next;
union {
struct {
Char *key;
} get;
struct {
Char *key;
Char *value;
} put;
struct {
Char *key;
} del;
};
enum Command_Kind {
Command_Kind_NONE,
Command_Kind_GET,
Command_Kind_PUT,
Command_Kind_DEL,
} kind;
Sint fd;
} Command;
global_variable Command *global_command_first, *global_command_last, *global_command_divider;
global_variable pthread_mutex_t global_command_lock = PTHREAD_MUTEX_INITIALIZER;
global_variable pthread_cond_t global_command_cond_var = PTHREAD_COND_INITIALIZER;
internal_function
void commandInitBuffer (void)
{
global_command_first = calloc(1, sizeof(*global_command_first));
global_command_last = global_command_first;
global_command_divider = global_command_first;
return;
}
internal_function
void commandEnqueue (Command c)
{
global_command_last->next = calloc(1, sizeof(*(global_command_last->next)));
*(global_command_last->next) = c;
global_command_last = global_command_last->next;
while (global_command_first != global_command_divider) {
Command *temp = global_command_first;
global_command_first = global_command_first->next;
free(temp);
}
pthread_cond_broadcast(&global_command_cond_var);
return;
}
internal_function
void commandDequeue (Command *c)
{
pthread_mutex_lock(&global_command_lock);
while (global_command_divider == global_command_last) {
pthread_cond_wait(&global_command_cond_var, &global_command_lock);
}
*c = *(global_command_divider->next);
global_command_divider = global_command_divider->next;
pthread_mutex_unlock(&global_command_lock);
}
internal_function
void* commandProcessLoop (void *arg)
{
Thread_Pool_Metadata *data = arg;
XML_Dump_Metadata *xml_dump = data->xml_dump;
while (true) {
Command command = {0};
commandDequeue(&command);
switch (command.kind) {
case Command_Kind_GET: {
Char *error = NULL;
U64 hash = hashFNV1a(command.get.key);
// pthread_mutex_lock(&data_lock);
Char *value = storageGetFileData(hash, command.get.key, &error);
// pthread_mutex_unlock(&data_lock);
Result result = {0};
if (value == NULL) {
result = (Result){.kind = Result_Kind_ERROR,
.fd = command.fd,
.error.error = error};
} else {
result = (Result){.kind = Result_Kind_GET,
.fd = command.fd,
.get.key = command.get.key,
.get.value = value};
}
resultEnqueue(result);
} break;
case Command_Kind_PUT: {
Char *error = NULL;
U64 hash = hashFNV1a(command.put.key);
// pthread_mutex_lock(&data_lock);
B32 success = storageSetFileData(hash, command.put.key, command.put.value,
&error);
// pthread_mutex_unlock(&data_lock);
Result result = {0};
if (success == false) {
result = (Result){.kind = Result_Kind_ERROR,
.fd = command.fd,
.error.error = error};
} else {
pthread_cancel(xml_dump->thread);
pthread_join(xml_dump->thread, NULL);
fclose(xml_dump->file);
xml_dump->file = fopen(xml_dump->file_path, "w");
pthread_create(&(xml_dump->thread), NULL, &dump, xml_dump);
result = (Result){.kind = Result_Kind_PUT,
.fd = command.fd};
}
resultEnqueue(result);
} break;
case Command_Kind_DEL: {
Char *error = NULL;
U64 hash = hashFNV1a(command.del.key);
// pthread_mutex_lock(&data_lock);
B32 success = storageDeleteFileData(hash, command.del.key, &error);
// pthread_mutex_unlock(&data_lock);
Result result = {0};
if (success == false) {
result = (Result){.kind = Result_Kind_ERROR,
.fd = command.fd,
.error.error = error};
} else {
pthread_cancel(xml_dump->thread);
pthread_join(xml_dump->thread, NULL);
fclose(xml_dump->file);
xml_dump->file = fopen(xml_dump->file_path, "w");
pthread_create(&(xml_dump->thread), NULL, &dump, xml_dump);
result = (Result){.kind = Result_Kind_DEL,
.fd = command.fd};
}
resultEnqueue(result);
} break;
default: {
} break;
}
}
return NULL;
}
internal_function
void* dump (void *arg)
{
pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
XML_Dump_Metadata *xml_dump = arg;
fprintf(xml_dump->file,
"<!--This XML file is generated in a lazy fashion, when the server is in a \n"
"quiscent state. This means that this file may end up being incomplete if\n"
"the server is killed forcefully before this dump is complete. -->\n\n"
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVStore>\n");
fflush(xml_dump->file);
DIR *dir0 = opendir(".");
if (dir0 == NULL) {
perror("Can't open directory for xml dump");
return NULL;
}
for (struct dirent *entry1 = readdir(dir0); entry1 != NULL; entry1 = readdir(dir0)) {
Char path1[300] = {0};
strcat(path1, entry1->d_name);
strcat(path1, "/");
DIR *dir1 = opendir(path1);
if (strcmp(entry1->d_name, ".") == 0 || strcmp(entry1->d_name, "..") == 0) continue;
if (entry1->d_type != DT_DIR) continue;
for (struct dirent *entry2 = readdir(dir1); entry2 != NULL; entry2 = readdir(dir1)) {
Char path2[300] = {0};
strcat(path2, path1);
strcat(path2, entry2->d_name);
strcat(path2, "/");
DIR *dir2 = opendir(path2);
if (strcmp(entry2->d_name, ".") == 0 || strcmp(entry2->d_name, "..") == 0) continue;
for (struct dirent *entry3 = readdir(dir2); entry3 != NULL; entry3 = readdir(dir2)) {
Char path3[300] = {0};
strcat(path3, path2);
strcat(path3, entry3->d_name);
strcat(path3, "/");
DIR *dir3 = opendir(path3);
if (strcmp(entry3->d_name, ".") == 0 || strcmp(entry3->d_name, "..") == 0) continue;
for (struct dirent *entry4 = readdir(dir3); entry4 != NULL; entry4 = readdir(dir3)) {
Char path4[300] = {0};
strcat(path4, path3);
strcat(path4, entry4->d_name);
strcat(path4, "/");
DIR *dir4 = opendir(path4);
if (strcmp(entry4->d_name, ".") == 0 || strcmp(entry4->d_name, "..") == 0) continue;
for (struct dirent *entry_f = readdir(dir4); entry_f != NULL; entry_f = readdir(dir4)) {
if (strcmp(entry_f->d_name, ".") == 0 || strcmp(entry_f->d_name, "..") == 0) continue;
Char path_f[300] = {0};
strcat(path_f, path4);
strcat(path_f, entry_f->d_name);
Char *value = calloc((KiB(256) + 1), sizeof(*value));
FILE* value_file = fopen(path_f, "r");
fseek(value_file, 0L, SEEK_END);
Size size = ftell(value_file);
fseek(value_file, 0L, SEEK_SET);
fread(value, size, sizeof(*value), value_file);
fclose(value_file);
fprintf(xml_dump->file, "\t<KVPair>\n\t\t<Key>​%s​</Key>\n\t\t<Value>%s</Value>\n\t</KVPair>\n",
entry_f->d_name, value);
fflush(xml_dump->file);
free(value);
}
closedir(dir4);
}
closedir(dir3);
}
closedir(dir2);
}
closedir(dir1);
}
fprintf(xml_dump->file, "</KVStore>");
fflush(xml_dump->file);
closedir(dir0);
return NULL;
}
typedef struct Result {
struct Result *next;
union {
struct {
Char *value;
Char *key;
} get;
struct {
Char *error;
} error;
};