Commit a5d82517 authored by Roshan Rabinarayan's avatar Roshan Rabinarayan

added code for persistent storage

parents f01d4875 3a427d2d
No preview for this file type
...@@ -2,13 +2,67 @@ ...@@ -2,13 +2,67 @@
#define PORT 8000 #define PORT 8000
#define SA struct sockaddr #define SA struct sockaddr
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#include <sys/types.h> #include <sys/types.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#include "KVClientLibrary.h" #include "KVClientLibrary.h"
#include <string.h>
struct message* requestMessage; struct message* requestMessage;
void interactive (int sock) {
char command[515];
int readlen, i, j;
int status_code;
char key[256];
char value[256];
char error[256];
do {
memset(key, 0, sizeof(key));
memset(value, 0, sizeof(value));
memset(error, 0, sizeof(error));
printf("interactive> ");
fflush(stdout);
readlen = read(0, command, sizeof(command));
i=5;
while(command[i] && command[i]!=' ' && command[i]!='\n') {
i++;
}
memcpy(key, command+4, i-4);
if(command[i]==' ') {
j=i+1;
while(command[j] && command[j]!=':' && command[j]!='\n') {
j++;
}
memcpy(value, command+i+1, j-i-1);
}
// printf("key: %s\n", key);
// printf("value: %s\n", value);
// printf("error: %s\n", error);
switch(command[0]) {
case 'g': status_code = get(sock, key, value, error);
printf("[%d]", status_code);
printf(" Value recieved: %s\n", value);
break;
case 'p':status_code = put(sock, key, value, error);
printf("[%d]\n", status_code);
break;
case 'd':status_code = del(sock, key, error);
printf("[%d]\n", status_code);
break;
default: return;
}
} while(command[0] != 'q');
}
int main(int argc, char const *argv[]) int main(int argc, char const *argv[])
{ {
int sock = 0, valread; int sock = 0, valread;
...@@ -36,16 +90,22 @@ int main(int argc, char const *argv[]) ...@@ -36,16 +90,22 @@ int main(int argc, char const *argv[])
printf("\nConnection Failed \n"); printf("\nConnection Failed \n");
return -1; return -1;
} }
char key[256]="abc"; char key1[256]="abc";
char value[256]="cde"; char value1[256]="cde";
char error[256]; char error1[256];
char key2[256]="test";
char value2[256]="cdde";
char error2[256];
char value3[256];
//printf("%d",(int)put(sock,key, value,error)); //printf("%d",(int)put(sock,key, value,error));
//printf("%d",(int)del(sock,key,error)); //printf("%d",(int)del(sock,key,error));
//printf("%d",(int)get(sock,key, value,error)); //printf("%d",(int)get(sock,key, value,error));
//while (1) { //while (1) {
for(int i=0; i<10; i++) { //printf("%d",(int)put(sock, key1, value1, error1));
printf("%d",(int)put(sock, key, value, error)); //printf("%d",(int)put(sock, key2, value2, error2));
} //printf("%d\n",(int)get(sock, key1, value3, error2));
//printf("%s", value3);
interactive(sock);
return 0; return 0;
} }
\ No newline at end of file
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
#include <unistd.h> #include <unistd.h>
#include "KVClientLibrary.h" #include "KVClientLibrary.h"
// to print a message // to print a message
void printMessage(struct message *requestMessage) void printMessage(struct message *requestMessage)
{ {
...@@ -90,7 +89,7 @@ int put(int sockfd,char *key,char *value,char *error) ...@@ -90,7 +89,7 @@ int put(int sockfd,char *key,char *value,char *error)
} }
else else
{ {
printMessage(reply); //printMessage(reply);
} }
free(request); free(request);
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
#include "LRU.h" #include "LRU.h"
#include "KVMessageFormat.h" #include "KVMessageFormat.h"
#define MAX_EVENTS 10 #define MAX_EVENTS 10
#define DEBUG (1) #define DEBUG (0)
void gen_random(char *s, const int len) { void gen_random(char *s, const int len) {
static const char alphanum[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; static const char alphanum[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
...@@ -32,15 +32,15 @@ void *worker(void *args) { ...@@ -32,15 +32,15 @@ void *worker(void *args) {
int newfd; int newfd;
int status; int status;
// Generate name for each thread for debugging // Generate name for each thread for debugging
char *name = (char *) malloc(5*sizeof(char)); char *name = (char *) malloc(5*sizeof(char));
gen_random(name, 5); gen_random(name, 5);
if DEBUG printf("[%s] Thread started!\n", name); if DEBUG printf("[%s] Thread started!\n", name);
epollfd = epoll_create1(0); epollfd = epoll_create1(0);
if (epollfd == -1) { while (epollfd == -1) {
perror("epoll_create1"); epollfd = epoll_create1(0);
exit(EXIT_FAILURE);
} }
/* Add the pipe which we get from main thread */ /* Add the pipe which we get from main thread */
...@@ -98,11 +98,11 @@ void *worker(void *args) { ...@@ -98,11 +98,11 @@ void *worker(void *args) {
switch(requestMessage->status) { switch(requestMessage->status) {
case STATUS_GET: case STATUS_GET:
if DEBUG printf("[%s] GET \n", name); if DEBUG printf("[%s] GET \n", name);
memcpy(requestMessage->value, cache_get(requestMessage->key), 256); status = cache_get(requestMessage->key, requestMessage->value);
if(requestMessage->value == 0) { if(status) {
requestMessage->status = 240;
} else {
requestMessage->status = 200; requestMessage->status = 200;
} else {
requestMessage->status = 240;
} }
break; break;
case STATUS_PUT: case STATUS_PUT:
...@@ -121,10 +121,7 @@ void *worker(void *args) { ...@@ -121,10 +121,7 @@ void *worker(void *args) {
break; break;
} }
if DEBUG print_cache();
requestMessage->status=200;
write(events[i].data.fd, requestMessage, sizeof(struct message)); write(events[i].data.fd, requestMessage, sizeof(struct message));
free(requestMessage); free(requestMessage);
} }
...@@ -144,12 +141,12 @@ int main (int argc, int argv) { ...@@ -144,12 +141,12 @@ int main (int argc, int argv) {
int i; int i;
int next_thread_to_assign = 0; int next_thread_to_assign = 0;
pthread_t *worker_threads; pthread_t *worker_threads;
int pool_thread_size = 5; // TODO: get pool thread size from config file int pool_thread_size = 1; // TODO: get pool thread size from config file
int sockfd, newsockfd, portno, clilen, n; int sockfd, newsockfd, portno, clilen, n;
struct sockaddr_in serv_addr, cli_addr; struct sockaddr_in serv_addr, cli_addr;
//int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes)); //int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes));
int pipes[5][2]; // TODO: initialize pipes dynamically int pipes[1][2]; // TODO: initialize pipes dynamically
worker_threads = malloc(pool_thread_size * sizeof(pthread_t)); worker_threads = malloc(pool_thread_size * sizeof(pthread_t));
for( i=0; i < pool_thread_size; i++ ) { for( i=0; i < pool_thread_size; i++ ) {
......
...@@ -70,7 +70,7 @@ int cache_del(char *key) ...@@ -70,7 +70,7 @@ int cache_del(char *key)
{ {
for(int i=0;i<MAX_SIZE;i++) for(int i=0;i<MAX_SIZE;i++)
{ {
if(array[i]->key == key) if(strcmp(array[i]->key , key) == 0)
{ {
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1); while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1);
remove_element_from_deque(key); remove_element_from_deque(key);
...@@ -79,6 +79,8 @@ int cache_del(char *key) ...@@ -79,6 +79,8 @@ int cache_del(char *key)
return 1; return 1;
} }
} }
printf("Cache after DEL: \n");
print_cache();
return 0; return 0;
//TODO remove key from file also //TODO remove key from file also
} }
...@@ -88,14 +90,17 @@ void cache_put(char *key, char *value) ...@@ -88,14 +90,17 @@ void cache_put(char *key, char *value)
int indx=-1; int indx=-1;
for(int i=0;i<MAX_SIZE;i++) for(int i=0;i<MAX_SIZE;i++)
{ {
if(array[i]->key == key) if(array[i]->key!=NULL) {
{ if(strcmp(array[i]->key , key) == 0)
indx = i; {
remove_element_from_deque(key); indx = i;
break; remove_element_from_deque(key);
break;
}
} }
} }
printf("key is present at index: %d\n", indx);
if(indx == -1) if(indx == -1)
{ {
indx = find_empty_location(); indx = find_empty_location();
...@@ -104,25 +109,28 @@ void cache_put(char *key, char *value) ...@@ -104,25 +109,28 @@ void cache_put(char *key, char *value)
} }
while(ATOMIC_TEST_AND_SET(&(array[indx]->lock),1) == 1); while(ATOMIC_TEST_AND_SET(&(array[indx]->lock),1) == 1);
array[indx]->key = key; memcpy(array[indx]->key, key, KEY_SIZE);
array[indx]->value = value; memcpy(array[indx]->value, value, VAL_SIZE);
array[indx]->valid = TRUE; array[indx]->valid = TRUE;
array[indx]->modified = TRUE; array[indx]->modified = TRUE;
insert_into_queue(key); insert_into_queue(key);
CLEAR(&(array[indx]->lock),0); CLEAR(&(array[indx]->lock),0);
printf("Cache after PUT: \n");
print_cache();
} }
char* cache_get(char *key) int cache_get(char *key, char *value)
{ {
for(int i=0;i<MAX_SIZE;i++) for(int i=0;i<MAX_SIZE;i++)
{ {
if(array[i]->key == key) if(strcmp(array[i]->key , key) == 0 && array[i]->valid)
{ {
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1); while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1);
remove_element_from_deque(key); remove_element_from_deque(key);
insert_into_queue(key); insert_into_queue(key);
CLEAR(&(array[i]->lock),0); CLEAR(&(array[i]->lock),0);
return array[i]->value; memcpy(value, array[i]->value, VAL_SIZE);
return 1;
} }
} }
return 0; return 0;
...@@ -140,6 +148,6 @@ void init_cache() { ...@@ -140,6 +148,6 @@ void init_cache() {
void print_cache() { void print_cache() {
for(int j=0;j<MAX_SIZE;j++) for(int j=0;j<MAX_SIZE;j++)
{ {
printf("(%s) : (%s)\n", array[j]->key, array[j]->value); printf("[%d] (%s) : (%s)\n", array[j]->valid, array[j]->key, array[j]->value);
} }
} }
\ No newline at end of file
#define MAX_SIZE 10
#define KEY_SIZE 256
#define VAL_SIZE 256
#define MAX_SIZE 10
typedef enum{ typedef enum{
FALSE,TRUE FALSE,TRUE
}bool; }bool;
struct KV{ struct KV{
char *key; char key[KEY_SIZE];
char *value; char value[VAL_SIZE];
bool valid; bool valid;
bool modified; bool modified;
unsigned _Atomic lock; unsigned _Atomic lock;
...@@ -38,7 +40,7 @@ int cache_del(char *key); ...@@ -38,7 +40,7 @@ int cache_del(char *key);
void cache_put(char *key, char *value); void cache_put(char *key, char *value);
char* cache_get(char *key); int cache_get(char *key, char *value);
void init_cache(); void init_cache();
......
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment