Commit 00b787b3 authored by Shivaji's avatar Shivaji

Merge branch 'master' of https://git.cse.iitb.ac.in/samarthjoshi/key-value-store into lru-sample

parents d59e7d2c ead88885
File added
1:test
2:test
3:test
4:test
5:test
6:test
7:test
8:test
9:test
10:tet
11:test
...@@ -93,7 +93,8 @@ void *worker(void *args) { ...@@ -93,7 +93,8 @@ void *worker(void *args) {
/* Parse the actual message from client */ /* Parse the actual message from client */
struct message *requestMessage= malloc(sizeof(struct message)); struct message *requestMessage= malloc(sizeof(struct message));
int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message)); int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message));
memset(requestMessage->key, 0, 256);
memset(requestMessage->value, 0, 256);
if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name); if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name);
switch(requestMessage->status) { switch(requestMessage->status) {
case STATUS_GET: case STATUS_GET:
...@@ -121,10 +122,10 @@ void *worker(void *args) { ...@@ -121,10 +122,10 @@ void *worker(void *args) {
if DEBUG printf("[%s] DEL \n", name); if DEBUG printf("[%s] DEL \n", name);
status = cache_del(requestMessage->key); status = cache_del(requestMessage->key);
file_del(requestMessage->key); file_del(requestMessage->key);
if(status == 0) { if(status) {
requestMessage->status = 240;
} else {
requestMessage->status = 200; requestMessage->status = 200;
} else {
requestMessage->status = 240;
} }
break; break;
} }
...@@ -149,12 +150,12 @@ int main (int argc, int argv) { ...@@ -149,12 +150,12 @@ int main (int argc, int argv) {
int i; int i;
int next_thread_to_assign = 0; int next_thread_to_assign = 0;
pthread_t *worker_threads; pthread_t *worker_threads;
int pool_thread_size = 10; // TODO: get pool thread size from config file int pool_thread_size = 1; // TODO: get pool thread size from config file
int sockfd, newsockfd, portno, clilen, n; int sockfd, newsockfd, portno, clilen, n;
struct sockaddr_in serv_addr, cli_addr; struct sockaddr_in serv_addr, cli_addr;
//int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes)); //int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes));
int pipes[10][2]; // TODO: initialize pipes dynamically int pipes[1][2]; // TODO: initialize pipes dynamically
worker_threads = malloc(pool_thread_size * sizeof(pthread_t)); worker_threads = malloc(pool_thread_size * sizeof(pthread_t));
for( i=0; i < pool_thread_size; i++ ) { for( i=0; i < pool_thread_size; i++ ) {
......
No preview for this file type
...@@ -24,44 +24,50 @@ int readercount = 0; ...@@ -24,44 +24,50 @@ int readercount = 0;
unsigned modulus( unsigned char *num, size_t size, unsigned divisor) { unsigned modulus( unsigned char *num, size_t size, unsigned divisor) {
unsigned rem = 0; unsigned rem = 0;
while (size-- > 0) { while(size>0)
rem = ((UCHAR_MAX + 1ULL)*rem + *num) % divisor; {
num++; rem+=num[size--]%divisor;
} }
return rem; return rem%divisor;
} }
int file_search(char *key, char *value, int index) {
lseek(fds[index], 0, SEEK_SET);
int offset = 0;
int size;
char fkey[256];
char fval[256];
do {
size = read(fds[index], fkey, sizeof(fkey));
if( size <= 0 )
break;
size = read(fds[index], fval, sizeof(fval));
if( size <= 0 )
break;
if( strcmp(fkey, key)==0 ) {
if(value)
memcpy(value, fval, sizeof(fval));
return offset;
}
offset += sizeof(fkey) + sizeof(fval);
} while (1);
return -1;
}
void file_del( char *key) void file_del( char *key)
{ {
int offset;
int index=modulus(key,256,setSize); int index=modulus(key,256,setSize);
fflush(stdout);
int length=0;
sem_wait(&mutex[index]); sem_wait(&mutex[index]);
char blankspace[512];
char ch; offset = file_search(key, NULL, index);
int k=-1; if(offset >= 0) {
lseek(fds[index], 0, SEEK_SET); lseek(fds[index], offset, SEEK_SET);
memset(blankspace,0,512); write(fds[index], 0, 256);
char *line; write(fds[index], 0, 256);
char temp[514];
size_t len=0;
off_t position=0;
FILE *fp=fdopen(fds[index],"rb+");
while ((getline(&line, &len, fp)) != -1)
{
memcpy(temp,line,strlen(line)-1);
char *fkey=strtok(line,":");
char *fvalue=strtok(NULL,":");
if(strcmp(key,fkey)==0){
lseek(fds[index],position,SEEK_SET);
write(fds[index],blankspace,strlen(temp));
break;
}
position=ftell(fp);
} }
printf("length %d",length);
sem_post(&mutex[index]); sem_post(&mutex[index]);
} }
...@@ -70,28 +76,17 @@ int file_get(char *key, char *value) ...@@ -70,28 +76,17 @@ int file_get(char *key, char *value)
/* Gets the value stored at offset */ /* Gets the value stored at offset */
/* Does not depend on key argument */ /* Does not depend on key argument */
int found =0; int found =0;
int index=modulus(key,256,setSize); int index=modulus(key,256,setSize);
lseek(fds[index],0,SEEK_SET);
sem_wait(&readerLocks[index]); sem_wait(&readerLocks[index]);
readCounters[index]+=1; readCounters[index]+=1;
if(readCounters[index]==1) if(readCounters[index]==1)
sem_wait(&mutex[index]); sem_wait(&mutex[index]);
sem_post(&readerLocks[index]); sem_post(&readerLocks[index]);
char *line;
size_t len=0; if(file_search(key, value, index)>=0) {
FILE *fp=fdopen(fds[index],"rb+"); found = 1;
while ((getline(&line, &len, fp)) != -1)
{
char *fkey=strtok(line,":");
char *fvalue=strtok(NULL,":");
if(strcmp(key,fkey)==0){
memcpy(value,fvalue,strlen(fvalue)-1);
found=1;
break;
}
} }
sem_wait(&readerLocks[index]); sem_wait(&readerLocks[index]);
readCounters[index]-=1; readCounters[index]-=1;
if(readCounters[index]==0) if(readCounters[index]==0)
...@@ -102,55 +97,26 @@ int file_get(char *key, char *value) ...@@ -102,55 +97,26 @@ int file_get(char *key, char *value)
return found; return found;
} }
off_t file_put(char *key,char *value) void file_put(char *key,char *value) {
{
/* /*
if found then ask the offset where it is present and if the value noy matches with the present value ,update the given line if found then ask the offset where it is present and if the value noy matches with the present value ,update the given line
if not present then search for empty line and insert there! if not present then search for empty line and insert there!
*/ */
unsigned int index=modulus(key,256,setSize); int offset;
lseek(fds[index],0,SEEK_SET); int index=modulus(key,256,setSize);
off_t position; sem_wait(&mutex[index]);
//sem_wait(&mutex[index]);
printf("[Write to File: %d]\n",index); offset = file_search(key, NULL, index);
char *line; if(offset < 0) {
char lin[514]; lseek(fds[index], 0, SEEK_END);
size_t len=0; } else {
FILE *fp=fdopen(fds[index],"rb+"); lseek(fds[index], offset, SEEK_SET);
position=ftell(fp); }
while ((getline(&line, &len, fp)) != -1) write(fds[index], key, 256);
{ write(fds[index], value, 256);
char *fkey=strtok(line,":");
char *fvalue=strtok(NULL,":");
if(strcmp(key,fkey)==0){
if(strcmp(value,strtok(fvalue,"\n"))==0)
{
printf("(%s)(%s)\n",fkey,key);
///everything same then why insert?
return 0;
}
else
{
printf("key:(%s)(%s)\n",fkey,key);
printf("value:(%s)(%s)\n",fvalue,value);
fflush(stdout);
fseek(fp,position,SEEK_SET);
snprintf(lin, strlen(key)+strlen(value)+2,"%s:%s\n",key,value);
if(fputs(lin,fp)<0)
printf("\n[unable to perform file_put]\n");
return 0;
}
}
position=ftell(fp);
}
snprintf(lin, sizeof(lin),"%s:%s\n",key,value);
if(write(fds[index], lin, strlen(lin))<0)
printf("\n[unable to perform file_put]\n");
// sem_post(&mutex[index]); sem_post(&mutex[index]);
return position;
} }
int storage_init() int storage_init()
{ {
...@@ -179,5 +145,5 @@ int storage_init() ...@@ -179,5 +145,5 @@ int storage_init()
sem_init(&mutex[i],0,1); sem_init(&mutex[i],0,1);
readCounters[i]=0; readCounters[i]=0;
} }
return 0;
} }
...@@ -4,7 +4,7 @@ int storage_init(); ...@@ -4,7 +4,7 @@ int storage_init();
void file_del( char *key); void file_del( char *key);
off_t file_put(char *key,char *value); void file_put(char *key,char *value);
int file_get(char *key, char *value); int file_get(char *key, char *value);
......
...@@ -23,13 +23,15 @@ int readercount = 0; ...@@ -23,13 +23,15 @@ int readercount = 0;
unsigned modulus( unsigned char *num, size_t size, unsigned divisor) { unsigned modulus( unsigned char *num, size_t size, unsigned divisor) {
unsigned rem = 0; unsigned rem = 0;
int i=0;
while(size>0) while(i<divisor)
{ {
rem+=num[size--]%divisor; rem+=num[i]%divisor;
i++;
} }
return rem%divisor; return rem%divisor;
} }
void file_del( char *key) void file_del( char *key)
{ {
int index=modulus(key,256,setSize); int index=modulus(key,256,setSize);
...@@ -148,7 +150,14 @@ int main() ...@@ -148,7 +150,14 @@ int main()
each write should return the line number each write should return the line number
*/ */
int n2=0; int n2=0;
char prevkey[256]="24"; char prevkey[256];
memset(prevkey,0,sizeof(prevkey));
memcpy(prevkey,"w",1);
char newkey[256];
memset(newkey,0,sizeof(newkey));
memcpy(newkey,"x",1);
printf("key:%s\n",prevkey);
char key[256]="25"; char key[256]="25";
char value[256]="value2"; char value[256]="value2";
off_t offset; off_t offset;
...@@ -172,7 +181,12 @@ int main() ...@@ -172,7 +181,12 @@ int main()
sem_init(&mutex[i],0,1); sem_init(&mutex[i],0,1);
readCounters[i]=0; readCounters[i]=0;
} }
printf("%u",modulus("99",256,setSize));
i=0;
printf("%d",modulus(prevkey,256,2));
printf("%d",modulus(newkey,256,2));
//offset = file_put("24", "value245"); //offset = file_put("24", "value245");
//offset = file_put("24", "value1"); //offset = file_put("24", "value1");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment