Commit b3f73a53 authored by Naman Dixit's avatar Naman Dixit

Fixed the bug with 256 byte keys and 256 KiB values

parent a02484f8
...@@ -133,6 +133,7 @@ int main(int argc, char** argv) ...@@ -133,6 +133,7 @@ int main(int argc, char** argv)
Char *xml_request_message = NULL; Char *xml_request_message = NULL;
// Response message will be of proper legth, since server will make sure of that. // Response message will be of proper legth, since server will make sure of that.
Size response_size = KiB(300); Size response_size = KiB(300);
Char *xml_response_message_read = calloc(response_size, sizeof(*xml_response_message_read));
Char *xml_response_message = calloc(response_size, sizeof(*xml_response_message)); Char *xml_response_message = calloc(response_size, sizeof(*xml_response_message));
Size index = 0; Size index = 0;
...@@ -142,26 +143,24 @@ int main(int argc, char** argv) ...@@ -142,26 +143,24 @@ int main(int argc, char** argv)
if(strcmp(query_type, "GET") == 0) { if(strcmp(query_type, "GET") == 0) {
xml_request_message = xmlCreateMessage(XML_Message_Kind_GET, xml_request_message = xmlCreateMessage(XML_Message_Kind_GET,
query_key, NULL, NULL); query_key, NULL, NULL);
} else if(strcmp(query_type, "PUT") == 0) {
write(sock_fd, xml_request_message, strlen(xml_request_message));
read(sock_fd, xml_response_message, response_size);
} else if(strcmp(query_type, "SET") == 0) {
Char *query_value = tokenGet(command, '\n', &index); Char *query_value = tokenGet(command, '\n', &index);
xml_request_message = xmlCreateMessage(XML_Message_Kind_PUT, xml_request_message = xmlCreateMessage(XML_Message_Kind_PUT,
query_key, query_value, NULL); query_key, query_value, NULL);
sbufDelete(query_value); sbufDelete(query_value);
write(sock_fd, xml_request_message, strlen(xml_request_message));
read(sock_fd, xml_response_message, response_size);
} else if(strcmp(query_type,"DEL") == 0) { } else if(strcmp(query_type,"DEL") == 0) {
xml_request_message = xmlCreateMessage(XML_Message_Kind_DELETE, xml_request_message = xmlCreateMessage(XML_Message_Kind_DELETE,
query_key, NULL, NULL); query_key, NULL, NULL);
}
write(sock_fd, xml_request_message, strlen(xml_request_message)); write(sock_fd, xml_request_message, strlen(xml_request_message));
read(sock_fd, xml_response_message, response_size); U64 read_len = 0;
while (!strsuffix(xml_response_message, "</KVMessage>\n") || (read_len >= KiB(299))) {
read_len += read(sock_fd, xml_response_message_read, response_size);
strcat(xml_response_message, xml_response_message_read);
memset(xml_response_message_read, 0, response_size);
} }
sbufDelete(query_type); sbufDelete(query_type);
......
...@@ -644,7 +644,7 @@ void* hmRemove (Hashmap *hm, void *key) ...@@ -644,7 +644,7 @@ void* hmRemove (Hashmap *hm, void *key)
} }
header_function header_function
Size strprefix(Char *pre, Char *str) Size strprefix(Char *str, Char *pre)
{ {
Size lenpre = strlen(pre); Size lenpre = strlen(pre);
Size lenstr = strlen(str); Size lenstr = strlen(str);
...@@ -660,6 +660,21 @@ Size strprefix(Char *pre, Char *str) ...@@ -660,6 +660,21 @@ Size strprefix(Char *pre, Char *str)
} }
} }
header_function
B32 strsuffix (Char *str, Char *suf)
{
Char *string = strrchr(str, suf[0]);
B32 result = false;
if(string != NULL) {
if (strcmp(string, suf) == 0) {
result = true;
}
}
return result;
}
header_function header_function
U64 nextPowerOf2(U64 v) U64 nextPowerOf2(U64 v)
{ {
......
...@@ -26,7 +26,7 @@ XML_Message xmlParseMessage (Char *xml) { ...@@ -26,7 +26,7 @@ XML_Message xmlParseMessage (Char *xml) {
XML_Message result = {0}; XML_Message result = {0};
Char *error_message = "XML Error: Received unparseable message"; Char *error_message = "XML Error: Received unparseable message";
#define PARSE_CHAR(c) do { if (stream[0] != c) goto parse_failed; else stream++;} while (0) #define PARSE_CHAR(c) do { if (stream[0] != c) {goto parse_failed;} else stream++;} while (0)
#define PARSE_SPACE() do { while (isspace(stream[0])) { stream++; }} while (0) #define PARSE_SPACE() do { while (isspace(stream[0])) { stream++; }} while (0)
PARSE_CHAR('<'); PARSE_CHAR('<');
......
...@@ -5,10 +5,10 @@ void* dump (void *arg) ...@@ -5,10 +5,10 @@ void* dump (void *arg)
XML_Dump_Metadata *xml_dump = arg; XML_Dump_Metadata *xml_dump = arg;
fprintf(xml_dump->file, fprintf(xml_dump->file,
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVStore>\n\n"
"<!--This XML file is generated in a lazy fashion, when the server is in a \n" "<!--This XML file is generated in a lazy fashion, when the server is in a \n"
"quiscent state. This means that this file may end up being incomplete if\n" "quiscent state. This means that this file may end up being incomplete if\n"
"the server is killed forcefully before this dump is complete. -->\n\n" "the server is killed forcefully before this dump is complete. -->\n\n");
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVStore>\n");
fflush(xml_dump->file); fflush(xml_dump->file);
DIR *dir0 = opendir("."); DIR *dir0 = opendir(".");
......
...@@ -48,6 +48,12 @@ typedef struct Thread_Pool_Metadata { ...@@ -48,6 +48,12 @@ typedef struct Thread_Pool_Metadata {
#define MAX_SOCKET_CONNECTIONS_REQUEST 64 #define MAX_SOCKET_CONNECTIONS_REQUEST 64
#define MAX_EPOLL_EVENTS 1024 #define MAX_EPOLL_EVENTS 1024
typedef struct Input_Resume {
Char *buffer;
Size buffer_len;
int fd;
} Input_Resume;
typedef struct Output_Resume { typedef struct Output_Resume {
Char *output; Char *output;
Size output_pos; Size output_pos;
...@@ -113,6 +119,7 @@ Sint main (Sint argc, Char *argv[]) ...@@ -113,6 +119,7 @@ Sint main (Sint argc, Char *argv[])
} }
Hashmap output_map = hmCreate(memCRT, 1024); Hashmap output_map = hmCreate(memCRT, 1024);
Hashmap input_map = hmCreate(memCRT, 1024);
Hashmap busy_map = hmCreate(memCRT, 1024); Hashmap busy_map = hmCreate(memCRT, 1024);
// NOTE(naman): Create a socket for IPv4 and TCP. // NOTE(naman): Create a socket for IPv4 and TCP.
...@@ -247,20 +254,35 @@ Sint main (Sint argc, Char *argv[]) ...@@ -247,20 +254,35 @@ Sint main (Sint argc, Char *argv[])
char *buffer = calloc(KiB(300), sizeof(*buffer)); char *buffer = calloc(KiB(300), sizeof(*buffer));
int len = read(fd, buffer, KiB(300) - 1); int len = read(fd, buffer, KiB(300) - 1);
if (len == 0) { if (len == 0) {
close(fd); close(fd);
continue; continue;
} }
if (hmLookupI(&busy_map, fd)) { B32 is_busy = hmLookupI(&busy_map, fd);
Input_Resume *ir = (Input_Resume*)hmLookupI(&input_map, fd);
if (is_busy && (ir == NULL)) {
continue; continue;
} }
hmInsertI(&busy_map, fd, true); hmInsertI(&busy_map, fd, true);
XML_Message msg = xmlParseMessage(buffer); if (ir == NULL) {
ir = calloc(1, sizeof(*ir));
ir->buffer = calloc(KiB(300), sizeof(*(ir->buffer)));
ir->fd = fd;
hmInsertI(&input_map, fd, (Uptr)ir);
}
strcat(ir->buffer, buffer);
ir->buffer_len += len;
free(buffer); free(buffer);
if (strsuffix(ir->buffer, "</KVMessage>\n") && (ir->buffer_len <= KiB(299))) {
XML_Message msg = xmlParseMessage(ir->buffer);
switch (msg.kind) { switch (msg.kind) {
case XML_Message_Kind_GET: { case XML_Message_Kind_GET: {
Command command = {.fd = fd}; Command command = {.fd = fd};
...@@ -307,19 +329,24 @@ Sint main (Sint argc, Char *argv[]) ...@@ -307,19 +329,24 @@ Sint main (Sint argc, Char *argv[])
} break; } break;
} }
hmRemoveI(&input_map, fd);
free(ir->buffer);
free(ir);
struct epoll_event event = {.data.fd = fd, struct epoll_event event = {.data.fd = fd,
.events = EPOLLIN | EPOLLOUT}; .events = EPOLLIN | EPOLLOUT};
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) { if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) {
perror("epoll_ctl EPOLL_CTL_MOD"); perror("epoll_ctl EPOLL_CTL_MOD");
exit(-1); exit(-1);
} }
}
} else if (events[i].events & EPOLLOUT) { } else if (events[i].events & EPOLLOUT) {
// Writing into fd in which we previously were not able to finish writing to // Writing into fd in which we previously were not able to finish writing to
int fd = events[i].data.fd; int fd = events[i].data.fd;
Output_Resume *or = hmLookup(&output_map, (void*)(Uptr)fd); Output_Resume *or = hmLookup(&output_map, (void*)(Uptr)fd);
if (or == NULL) { if (or == NULL) {
// fprintf(stderr, "hmLookup returned NULL"); // fprintf(stderr, "hmLookup returned NULL\n");
continue; continue;
} else { } else {
Char *output = or->output; Char *output = or->output;
...@@ -332,7 +359,7 @@ Sint main (Sint argc, Char *argv[]) ...@@ -332,7 +359,7 @@ Sint main (Sint argc, Char *argv[])
if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Try next time // Try next time
} else { } else {
perror("write() failed"); perror("write() failed\n");
exit(-1); exit(-1);
} }
} else if ((Size)nsent < (output_len - output_pos)) { } else if ((Size)nsent < (output_len - output_pos)) {
......
...@@ -50,7 +50,8 @@ B32 storageSetFileData(U64 hash, Char *key, Char *value, Char **error) ...@@ -50,7 +50,8 @@ B32 storageSetFileData(U64 hash, Char *key, Char *value, Char **error)
strcat(path, "/"); strcat(path, "/");
if (!storageMakeDirectoryIfNotExist(path, error)) goto error; if (!storageMakeDirectoryIfNotExist(path, error)) goto error;
strcat(path, key); strncat(path, key, 255);
if((file = fopen(path, "w")) != NULL) { if((file = fopen(path, "w")) != NULL) {
fprintf(file, "%s", value); fprintf(file, "%s", value);
fclose(file); fclose(file);
...@@ -102,7 +103,7 @@ Char* storageGetFileData(U64 hash, Char *key, Char **error) ...@@ -102,7 +103,7 @@ Char* storageGetFileData(U64 hash, Char *key, Char **error)
strcat(path, "/"); strcat(path, "/");
strcat(path, byte_seven_eight); strcat(path, byte_seven_eight);
strcat(path, "/"); strcat(path, "/");
strcat(path, key); strncat(path, key, 255);
if(access(path, F_OK) == 0) { if(access(path, F_OK) == 0) {
if((file = fopen(path, "r")) != NULL) { if((file = fopen(path, "r")) != NULL) {
...@@ -201,7 +202,7 @@ B32 storageDeleteFileData(U64 hash, Char *key, Char **error) ...@@ -201,7 +202,7 @@ B32 storageDeleteFileData(U64 hash, Char *key, Char **error)
strcat(path, "/"); strcat(path, "/");
strcat(path, byte_seven_eight); strcat(path, byte_seven_eight);
strcat(path, "/"); strcat(path, "/");
strcat(path, key); strncat(path, key, 255);
// FIXME(namna): This is in a race condition with the SetData // FIXME(namna): This is in a race condition with the SetData
int ret = 0; int ret = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment