Commit 1b5d4448 authored by Roshan Rabinarayan's avatar Roshan Rabinarayan
parents 4f004cad 4fbaf0a8
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <sys/epoll.h> #include <sys/epoll.h>
#include <string.h> #include <string.h>
#include "LRU.h" #include "LRU.h"
#include "StorageHandler.h"
#include "KVMessageFormat.h" #include "KVMessageFormat.h"
#define MAX_EVENTS 10 #define MAX_EVENTS 10
#define DEBUG (0) #define DEBUG (0)
...@@ -33,7 +34,7 @@ void *worker(void *args) { ...@@ -33,7 +34,7 @@ void *worker(void *args) {
int status; int status;
// Generate name for each thread for debugging /* Generate name for each thread for debugging */
char *name = (char *) malloc(5*sizeof(char)); char *name = (char *) malloc(5*sizeof(char));
gen_random(name, 5); gen_random(name, 5);
if DEBUG printf("[%s] Thread started!\n", name); if DEBUG printf("[%s] Thread started!\n", name);
...@@ -51,7 +52,7 @@ void *worker(void *args) { ...@@ -51,7 +52,7 @@ void *worker(void *args) {
perror("epoll_ctl: read_pipe"); perror("epoll_ctl: read_pipe");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
//if DEBUG printf("[%s] Added read pipe to epoll fd set!\n", name);
while (1) { while (1) {
if DEBUG printf("[%s] waiting for epoll event!\n", name); if DEBUG printf("[%s] waiting for epoll event!\n", name);
...@@ -91,8 +92,7 @@ void *worker(void *args) { ...@@ -91,8 +92,7 @@ void *worker(void *args) {
if (flag & EPOLLIN) { if (flag & EPOLLIN) {
/* Parse the actual message from client */ /* Parse the actual message from client */
struct message *requestMessage= malloc(sizeof(struct message)); struct message *requestMessage= malloc(sizeof(struct message));
int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message)); int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message));
//printf("[Message Received from client] (%d, %s, %s)\n",requestMessage->status,requestMessage->key,requestMessage->value);
if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name); if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name);
switch(requestMessage->status) { switch(requestMessage->status) {
...@@ -175,6 +175,7 @@ int main (int argc, int argv) { ...@@ -175,6 +175,7 @@ int main (int argc, int argv) {
clilen = sizeof(cli_addr); clilen = sizeof(cli_addr);
init_cache(); init_cache();
init_storage()
while(1) { while(1) {
......
#include <stdio.h> #include <stdio.h>
#include <pthread.h> #include <pthread.h>
#include <stdlib.h> #include <stdlib.h>
#define _GNU_SOURCE #define _GNU_SOURCE
#include <fcntl.h> #include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include <stdatomic.h> #include <stdatomic.h>
#include "LRU.h" #include <assert.h>
#define ATOMIC_TEST_AND_SET __atomic_test_and_set #include "LRU.h"
#define CLEAR __atomic_clear #define ATOMIC_TEST_AND_SET __atomic_test_and_set
#define MAX_SIZE 10 #define CLEAR __atomic_clear
#define MAX_SIZE 10
struct KV *array[MAX_SIZE];
struct queue *qu; struct KV *array[MAX_SIZE];
struct queue *last; struct queue *qu=NULL;
struct queue *last;
void remove_element_from_deque(char *key)
{ void remove_element_from_deque(char *key)
queue *present = qu , *previous=NULL; {
while(present->next != NULL) queue *present = qu , *previous=NULL;
{ if(present == NULL)
if(present->key == key) return;
{
if(previous == NULL) if(strcmp(present->key, key) == 0)
qu = qu->next; {
if(last == present) qu = qu->next;
last = previous; if(last == present)
if(previous) last = last->next;
previous->next = present->next; free(present);
free(present); return;
return; }
}
previous = present; while(present->next != NULL)
present = present->next; {
} if(strcmp(present->key, key) == 0)
} {
if(last == present)
void insert_into_queue(char *key) last = previous;
{ previous->next = present->next;
queue *temp = (queue *)malloc(sizeof(queue)); free(present);
temp->key = key; return;
temp->next = NULL; }
if(qu == NULL) previous = present;
{ present = present->next;
qu = temp; }
last = temp; }
}
else void insert_into_queue(char *key)
{ {
last->next = temp; queue *temp = (queue *)malloc(sizeof(queue));
last = temp; memcpy(temp->key, key, KEY_SIZE);
} temp->next = NULL;
} if(qu == NULL)
{
int find_empty_location() qu = temp;
{ last = temp;
for(int i=0;i<MAX_SIZE;i++) }
{ else
if(array[i]->valid == FALSE) {
return i; last->next = temp;
} last = temp;
} }
}
int cache_del(char *key)
{ int remove_front_element()
for(int i=0;i<MAX_SIZE;i++) {
{ queue *temp = qu;
if(strcmp(array[i]->key , key) == 0) qu = qu->next;
{ assert(temp != NULL);
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1); assert(qu != NULL);
remove_element_from_deque(key); for(int i=0;i<MAX_SIZE;i++)
array[i]->valid = FALSE; {
CLEAR(&(array[i]->lock),0); if(strcmp(array[i]->key , temp->key) == 0)
return 1; {
} // check if it is modified to write to file
} array[i]->valid = FALSE;
printf("Cache after DEL: \n"); return i;
print_cache(); }
return 0; }
//TODO remove key from file also }
}
int find_empty_location()
void cache_put(char *key, char *value) {
{ for(int i=0;i<MAX_SIZE;i++)
int indx=-1; {
for(int i=0;i<MAX_SIZE;i++) if(array[i]->valid == FALSE)
{ return i;
if(array[i]->key!=NULL) { }
if(strcmp(array[i]->key , key) == 0) return remove_front_element();
{ }
indx = i;
remove_element_from_deque(key); int cache_del(char *key)
break; {
} for(int i=0;i<MAX_SIZE;i++)
} {
} if(strcmp(array[i]->key , key) == 0)
printf("key is present at index: %d\n", indx); {
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1);
if(indx == -1) remove_element_from_deque(key);
{ array[i]->valid = FALSE;
indx = find_empty_location(); CLEAR(&(array[i]->lock),0);
// TODO should write to file if modified is true return 1;
// replacment from cache }
} }
printf("Cache after DEL: \n");
while(ATOMIC_TEST_AND_SET(&(array[indx]->lock),1) == 1); print_cache();
memcpy(array[indx]->key, key, KEY_SIZE); return 0;
memcpy(array[indx]->value, value, VAL_SIZE); //TODO remove key from file also
array[indx]->valid = TRUE; }
array[indx]->modified = TRUE;
insert_into_queue(key); void cache_put(char *key, char *value)
CLEAR(&(array[indx]->lock),0); {
printf("Cache after PUT: \n"); int indx=-1;
print_cache(); for(int i=0;i<MAX_SIZE;i++)
} {
if(array[i]->key!=NULL) {
int cache_get(char *key, char *value) if(strcmp(array[i]->key , key) == 0)
{ {
for(int i=0;i<MAX_SIZE;i++) printf("ENTERING INTO QUEUE\n");
{ indx = i;
if(strcmp(array[i]->key , key) == 0 && array[i]->valid) remove_element_from_deque(key);
{ break;
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1); }
remove_element_from_deque(key); }
insert_into_queue(key); }
CLEAR(&(array[i]->lock),0); printf("key is present at index: %d\n", indx);
memcpy(value, array[i]->value, VAL_SIZE);
return 1; if(indx == -1)
} {
} indx = find_empty_location();
return 0; assert(indx>=0);
} // TODO should write to file if modified is true
// replacment from cache
void init_cache() { }
for(int j=0;j<MAX_SIZE;j++)
{ while(ATOMIC_TEST_AND_SET(&(array[indx]->lock),1) == 1);
array[j] = (KV *)malloc(sizeof(KV)); memcpy(array[indx]->key, key, KEY_SIZE);
array[j]->valid = FALSE; memcpy(array[indx]->value, value, VAL_SIZE);
array[j]->lock = 0; array[indx]->valid = TRUE;
} array[indx]->modified = TRUE;
} insert_into_queue(key);
CLEAR(&(array[indx]->lock),0);
void print_cache() { print_cache();
for(int j=0;j<MAX_SIZE;j++) }
{
printf("[%d] (%s) : (%s)\n", array[j]->valid, array[j]->key, array[j]->value); int cache_get(char *key, char *value)
} {
} for(int i=0;i<MAX_SIZE;i++)
\ No newline at end of file {
if(strcmp(array[i]->key , key) == 0 && array[i]->valid)
{
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1);
remove_element_from_deque(key);
insert_into_queue(key);
CLEAR(&(array[i]->lock),0);
memcpy(value, array[i]->value, VAL_SIZE);
return 1;
}
}
return 0;
}
void init_cache() {
for(int j=0;j<MAX_SIZE;j++)
{
array[j] = (KV *)malloc(sizeof(KV));
array[j]->valid = FALSE;
array[j]->lock = 0;
}
}
void print_cache() {
for(int j=0;j<MAX_SIZE;j++)
{
printf("[%d] (%s) : (%s)\n", array[j]->valid, array[j]->key, array[j]->value);
}
}
...@@ -22,7 +22,7 @@ typedef struct KV KV; ...@@ -22,7 +22,7 @@ typedef struct KV KV;
typedef struct queue queue; typedef struct queue queue;
struct queue{ struct queue{
char *key; char key[KEY_SIZE];
struct queue *next; struct queue *next;
}; };
...@@ -44,4 +44,6 @@ int cache_get(char *key, char *value); ...@@ -44,4 +44,6 @@ int cache_get(char *key, char *value);
void init_cache(); void init_cache();
void print_cache(); void print_cache();
\ No newline at end of file
int remove_front_element();
\ No newline at end of file
#include<semaphore.h>
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include <stdint.h>
#include <limits.h>
#include<fcntl.h>
#include<unistd.h>
#include<string.h>
#include<sys/stat.h>
#include <sys/types.h>
int *fds;
int setSize=2;
int *readCounters;
sem_t *mutex;
sem_t *readerLocks;
sem_t x,y;
pthread_t tid;
pthread_t writerthreads[100],readerthreads[100];
int readercount = 0;
int modulus(char *num, int size, int divisor) {
int rem = 0;
while (size-- > 0) {
rem = ((UCHAR_MAX + 1)*rem + *num) % divisor;
num++;
}
return rem;
}
void file_del(off_t offset, char *key)
{
int index=modulus(key,256,setSize);
fflush(stdout);
int length=0;
sem_wait(&mutex[index]);
char blankspace[512];
char ch;
int k=-1;
lseek(fds[index], offset, SEEK_SET);
while(read(fds[index], &ch,sizeof(ch))!=-1 && ch!='\n')
{
length++;
}
printf("length %d",length);
memset(blankspace, 0, length);
lseek(fds[index], offset, SEEK_SET);
write(fds[index], blankspace, length);
sem_post(&mutex[index]);
}
void file_get(off_t offset, char *key, char *value)
{
/* Gets the value stored at offset */
/* Does not depend on key argument */
int index=modulus(key,256,setSize);
sem_wait(&readerLocks[index]);
readCounters[index]+=1;
if(readCounters[index]==1)
sem_wait(&mutex[index]);
sem_post(&readerLocks[index]);
lseek(fds[index], offset, SEEK_SET);
char line[10];
//FILE *fp =fdopen(fds[index],"r+");
// fseek(fp, offset,SEEK_SET);
size_t len=0;
char ch;
int k=-1;
while(read(fds[index], &ch,sizeof(ch))!=-1 && ch!='\n')
{
if(k>=0)
{
value[k++]=ch;
}
if(ch==':')
{
k=0;
}
}
sem_wait(&readerLocks[index]);
readCounters[index]-=1;
if(readCounters[index]==0)
{
sem_post(&mutex[index]);
}
sem_post(&readerLocks[index]);
}
off_t file_put(char *key,char *value) {
int index=modulus(key,256,setSize);
int bytes, rembytes, i;
off_t position;
sem_wait(&mutex[index]);
printf("[Write to File: %d]\n",index);
position = lseek(fds[index], 0, SEEK_END);
char line [514];
snprintf(line, sizeof(line),"%s:%s\n",key,value);
if(write(fds[index], line, strlen(line))<0)
printf("\n[unable to perform file_put]\n");
sem_post(&mutex[index]);
return position;
}
int init_storage() {
/*
define the array of file descriptors depending on the prefix
define the array of readCount as well as the semaphore (read x and write y) for the same
PUT,DEL would use write lock
GET would use read lock
each write should return the line number
*/
fds=(int *)malloc(sizeof(int)*setSize);
readCounters=(int *)malloc(sizeof(int)*setSize);
readerLocks=(sem_t *)malloc(sizeof(sem_t)*setSize);
mutex=(sem_t *)malloc(sizeof(sem_t)*setSize);
int i=0;
char fileName[setSize+20];
for(i=0;i<setSize;i++)
{
snprintf(fileName,sizeof(fileName),"File%d.txt",i);
fds[i]=open(fileName, O_CREAT|O_RDWR,S_IRWXU);
if(fds[i]<0)
{
printf("\n[Unable to Open File%d.txt]\n",i);
}
sem_init(&readerLocks[i],0,1);
sem_init(&mutex[i],0,1);
readCounters[i]=0;
}
return 0;
}
\ No newline at end of file
#include <sys/types.h>
void * file_del(off_t offset, char *key);
void file_get(off_t offset, char *key, char *value);
off_t file_put(char *key,char *value);
int init_storage();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment