Commit 47235b37 authored by Samarth Joshi's avatar Samarth Joshi

Removing debugging info and adding final touches

parent 18527950
No preview for this file type
No preview for this file type
No preview for this file type
...@@ -14,6 +14,49 @@ ...@@ -14,6 +14,49 @@
struct message* requestMessage; struct message* requestMessage;
struct config {
int listening_port;
int cache_size;
int thread_pool_size;
};
struct config *settings;
int read_config(struct config *settings) {
FILE * fp;
char * line = NULL;
size_t len = 0;
ssize_t read;
char *param;
char *value;
int i;
fp = fopen("settings.conf", "r");
if (fp == NULL)
return -1;
while ((read = getline(&line, &len, fp)) != -1) {
i = 0;
if(line[0]=='#') {
break;
} else {
param=strtok(line,"=");
value=strtok(NULL,"=");
if(strcmp(param,"listening_port")==0) {
settings->listening_port = atoi(value);
} else if (strcmp(param,"cache_size")==0) {
settings->cache_size = atoi(value);
} else if (strcmp(param,"thread_pool_size")==0) {
settings->thread_pool_size = atoi(value);
} else {
}
}
}
fclose(fp);
}
void interactive (int sock) { void interactive (int sock) {
char command[515]; char command[515];
...@@ -51,13 +94,22 @@ void interactive (int sock) { ...@@ -51,13 +94,22 @@ void interactive (int sock) {
switch(command[0]) { switch(command[0]) {
case 'g': status_code = get(sock, key, value, error); case 'g': status_code = get(sock, key, value, error);
printf("[%d]", status_code); printf("[%d]", status_code);
if(status_code==200) {
printf(" Value recieved: %s\n", value); printf(" Value recieved: %s\n", value);
} else {
printf(" Error: %s\n", error);
}
break; break;
case 'p':status_code = put(sock, key, value, error); case 'p':status_code = put(sock, key, value, error);
printf("[%d]\n", status_code); printf("[%d]\n", status_code);
break; break;
case 'd':status_code = del(sock, key, error); case 'd':status_code = del(sock, key, error);
printf("[%d]\n", status_code); printf("[%d]", status_code);
if(status_code==200) {
printf(" Deleted successfully %s\n", value);
} else {
printf(" Error: %s\n", error);
}
break; break;
default: return; default: return;
} }
...@@ -89,7 +141,7 @@ int client_init() { ...@@ -89,7 +141,7 @@ int client_init() {
} }
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT); serv_addr.sin_port = htons(settings->listening_port);
// Convert IPv4 and IPv6 addresses from text to binary form // Convert IPv4 and IPv6 addresses from text to binary form
if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0) if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0)
...@@ -148,6 +200,12 @@ int main(int argc, char const *argv[]) ...@@ -148,6 +200,12 @@ int main(int argc, char const *argv[])
int seconds = 1; int seconds = 1;
int i; int i;
settings = (struct config *) malloc(sizeof(struct config));
if(read_config(settings)<0) {
printf("Invalid config file!\n");
return -1;
}
if(1) { if(1) {
sock = client_init(); sock = client_init();
interactive(sock); interactive(sock);
......
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#include <unistd.h> #include <unistd.h>
#include "KVClientLibrary.h" #include "KVClientLibrary.h"
char error_msg[256] = "Key not found!";
// to print a message // to print a message
void printMessage(struct message *requestMessage) void printMessage(struct message *requestMessage)
{ {
...@@ -27,9 +29,9 @@ int get(int sockfd,char* key,char *value,char *error) ...@@ -27,9 +29,9 @@ int get(int sockfd,char* key,char *value,char *error)
write(sockfd, request, sizeof(struct message)); //passing to server and waiting for response write(sockfd, request, sizeof(struct message)); //passing to server and waiting for response
read(sockfd,reply,sizeof(struct message)); read(sockfd,reply,sizeof(struct message));
int status=reply->status; int status=reply->status;
if(status==400) if(status==240)
{ {
memcpy(error,reply->key,256*sizeof(char)); //copying the error in the key field for an failed get memcpy(error,error_msg,256*sizeof(char)); //copying the error in the key field for an failed get
} }
else else
...@@ -59,9 +61,9 @@ int del(int sockfd,char *key,char *error) ...@@ -59,9 +61,9 @@ int del(int sockfd,char *key,char *error)
write(sockfd, request, sizeof(struct message)); //passing to server and waiting for response to the del request write(sockfd, request, sizeof(struct message)); //passing to server and waiting for response to the del request
read(sockfd,reply,sizeof(struct message)); read(sockfd,reply,sizeof(struct message));
int status=(int)reply->status; int status=(int)reply->status;
if(status==400) if(status==240)
{ {
memcpy(error,reply->key,256*sizeof(char)); //copying the error in the key field for an failed del memcpy(error,error_msg,256*sizeof(char)); //copying the error in the key field for an failed del
} }
free(request); free(request);
free(reply); free(reply);
...@@ -83,9 +85,9 @@ int put(int sockfd,char *key,char *value,char *error) ...@@ -83,9 +85,9 @@ int put(int sockfd,char *key,char *value,char *error)
write(sockfd, request, sizeof(struct message)); //passing to server and waiting for response to the del request write(sockfd, request, sizeof(struct message)); //passing to server and waiting for response to the del request
read(sockfd,reply,sizeof(struct message)); read(sockfd,reply,sizeof(struct message));
int status=reply->status; int status=reply->status;
if(status==400) if(status==249)
{ {
memcpy(error,reply->key,256*sizeof(char)); //copying the error in the key field for an failed del memcpy(error,error_msg,256*sizeof(char)); //copying the error in the key field for an failed del
} }
else else
{ {
......
...@@ -19,17 +19,6 @@ struct config { ...@@ -19,17 +19,6 @@ struct config {
int thread_pool_size; int thread_pool_size;
}; };
void gen_random(char *s, const int len) {
static const char alphanum[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
for (int i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
s[len] = 0;
}
void *worker(void *args) { void *worker(void *args) {
struct epoll_event ev,events[MAX_EVENTS]; struct epoll_event ev,events[MAX_EVENTS];
struct epoll_event socketEvent; struct epoll_event socketEvent;
...@@ -40,12 +29,6 @@ void *worker(void *args) { ...@@ -40,12 +29,6 @@ void *worker(void *args) {
int newfd; int newfd;
int status; int status;
/* Generate name for each thread for debugging */
char *name = (char *) malloc(5*sizeof(char));
gen_random(name, 5);
if DEBUG printf("[%s] Thread started!\n", name);
epollfd = epoll_create1(0); epollfd = epoll_create1(0);
while (epollfd == -1) { while (epollfd == -1) {
epollfd = epoll_create1(0); epollfd = epoll_create1(0);
...@@ -62,19 +45,17 @@ void *worker(void *args) { ...@@ -62,19 +45,17 @@ void *worker(void *args) {
while (1) { while (1) {
if DEBUG printf("[%s] waiting for epoll event!\n", name);
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1); nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) { if (nfds == -1) {
perror("epoll_wait"); perror("epoll_wait");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if DEBUG printf("[%s] %d events !\n", name, nfds);
for ( i= 0; i < nfds; ++i ) { for ( i= 0; i < nfds; ++i ) {
if (events[i].data.fd == read_pipe) { if (events[i].data.fd == read_pipe) {
/* if we get a request from main thread to add new client */ /* if we get a request from main thread to add new client */
read(read_pipe, &newfd, sizeof(newfd)); read(read_pipe, &newfd, sizeof(newfd));
printf("[Worker thread] New client assigned at socket %d\n", newfd);
if DEBUG printf("[%s][EVENT] New Client assigned by main thread fd:%d!\n", name, newfd);
ev.data.fd=newfd; ev.data.fd=newfd;
ev.events = EPOLLIN | EPOLLRDHUP; ev.events = EPOLLIN | EPOLLRDHUP;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, newfd, &ev) == -1) { if (epoll_ctl(epollfd, EPOLL_CTL_ADD, newfd, &ev) == -1) {
...@@ -87,10 +68,8 @@ void *worker(void *args) { ...@@ -87,10 +68,8 @@ void *worker(void *args) {
/* if we get a request from client (GET, PUSH, DEL) */ /* if we get a request from client (GET, PUSH, DEL) */
int flag = events[i].events; int flag = events[i].events;
if DEBUG printf("[%s][EVENT] flag set: %d\n", name, flag);
if (flag & EPOLLRDHUP) { if (flag & EPOLLRDHUP) {
/* Connection was closed. */ /* Connection was closed. */
if DEBUG printf("[%s][EVENT][EPOLLRDHUP] \n", name);
epoll_ctl( epollfd, EPOLL_CTL_DEL, events[i].data.fd , NULL ); epoll_ctl( epollfd, EPOLL_CTL_DEL, events[i].data.fd , NULL );
close(events[i].data.fd); close(events[i].data.fd);
continue; continue;
...@@ -102,10 +81,9 @@ void *worker(void *args) { ...@@ -102,10 +81,9 @@ void *worker(void *args) {
memset(requestMessage->key, 0, 256); memset(requestMessage->key, 0, 256);
memset(requestMessage->value, 0, 256); memset(requestMessage->value, 0, 256);
int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message)); int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message));
if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name);
switch(requestMessage->status) { switch(requestMessage->status) {
case STATUS_GET: case STATUS_GET:
if DEBUG printf("[%s] GET \n", name); printf("[GET][Socket:%d] Key:%s\n", newfd, requestMessage->key);
status = cache_get(requestMessage->key, requestMessage->value); status = cache_get(requestMessage->key, requestMessage->value);
if(status) { if(status) {
requestMessage->status = 200; requestMessage->status = 200;
...@@ -120,15 +98,15 @@ void *worker(void *args) { ...@@ -120,15 +98,15 @@ void *worker(void *args) {
} }
break; break;
case STATUS_PUT: case STATUS_PUT:
if DEBUG printf("[%s] PUT \n", name); printf("[PUT][Socket:%d] Key:%s Value:%s\n", newfd, requestMessage->key, requestMessage->value);
cache_put(requestMessage->key, requestMessage->value); cache_put(requestMessage->key, requestMessage->value);
file_put(requestMessage->key, requestMessage->value); file_put(requestMessage->key, requestMessage->value);
requestMessage->status = 200; requestMessage->status = 200;
break; break;
case STATUS_DEL: case STATUS_DEL:
if DEBUG printf("[%s] DEL \n", name); printf("[DEL][Socket:%d] Key:%s\n", newfd, requestMessage->key);
status = cache_del(requestMessage->key); status = cache_del(requestMessage->key);
file_del(requestMessage->key); status = file_del(requestMessage->key);
if(status) { if(status) {
requestMessage->status = 200; requestMessage->status = 200;
} else { } else {
...@@ -147,8 +125,6 @@ void *worker(void *args) { ...@@ -147,8 +125,6 @@ void *worker(void *args) {
} }
free(name);
} }
int read_config(struct config *settings) { int read_config(struct config *settings) {
...@@ -201,10 +177,6 @@ int main (int argc, int argv) { ...@@ -201,10 +177,6 @@ int main (int argc, int argv) {
return -1; return -1;
} }
printf("%d\n", settings->listening_port);
printf("%d\n", settings->cache_size);
printf("%d\n", settings->thread_pool_size);
//int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes)); //int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes));
int pipes[50][2]; // TODO: initialize pipes dynamically int pipes[50][2]; // TODO: initialize pipes dynamically
......
...@@ -121,8 +121,6 @@ int cache_del(char *key) ...@@ -121,8 +121,6 @@ int cache_del(char *key)
return 1; return 1;
} }
} }
printf("Cache after DEL: \n");
print_cache();
return 0; return 0;
//TODO remove key from file also //TODO remove key from file also
} }
...@@ -141,7 +139,6 @@ void cache_put(char *key, char *value) ...@@ -141,7 +139,6 @@ void cache_put(char *key, char *value)
} }
} }
} }
printf("key is present at index: %d\n", indx);
if(indx == -1) if(indx == -1)
{ {
...@@ -158,7 +155,6 @@ void cache_put(char *key, char *value) ...@@ -158,7 +155,6 @@ void cache_put(char *key, char *value)
array[indx]->modified = TRUE; array[indx]->modified = TRUE;
insert_into_queue(key); insert_into_queue(key);
CLEAR(&(array[indx]->lock),0); CLEAR(&(array[indx]->lock),0);
print_cache();
} }
int cache_get(char *key, char *value) int cache_get(char *key, char *value)
......
No preview for this file type
...@@ -57,13 +57,14 @@ int file_search(char *key, char *value, int index) { ...@@ -57,13 +57,14 @@ int file_search(char *key, char *value, int index) {
return -1; return -1;
} }
void file_del( char *key) int file_del( char *key)
{ {
int offset; int offset;
int index=modulus(key,256,setSize); int index=modulus(key,256,setSize);
char fkey[256]; char fkey[256];
char fval[256]; char fval[256];
sem_wait(&mutex[index]); sem_wait(&mutex[index]);
int found=0;
offset = file_search(key, NULL, index); offset = file_search(key, NULL, index);
if(offset >= 0) { if(offset >= 0) {
...@@ -72,9 +73,11 @@ void file_del( char *key) ...@@ -72,9 +73,11 @@ void file_del( char *key)
memset(fval, 0, 256); memset(fval, 0, 256);
write(fds[index], fkey, 256); write(fds[index], fkey, 256);
write(fds[index], fval, 256); write(fds[index], fval, 256);
found = 1;
} }
sem_post(&mutex[index]); sem_post(&mutex[index]);
return found;
} }
int file_get(char *key, char *value) int file_get(char *key, char *value)
......
...@@ -2,11 +2,10 @@ ...@@ -2,11 +2,10 @@
int storage_init(); int storage_init();
void file_del( char *key); int file_del( char *key);
void file_put(char *key,char *value); void file_put(char *key,char *value);
int file_get(char *key, char *value); int file_get(char *key, char *value);
void file_del( char *key);
listening_port=8000 listening_port=80
cache_size=15 cache_size=15
thread_pool_size=10 thread_pool_size=10
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment