Commit a830282b authored by Shivaji's avatar Shivaji

Merge branch 'master' of https://git.cse.iitb.ac.in/samarthjoshi/key-value-store into lru-sample

parents 996cb357 6dec1e51
File added
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <sys/epoll.h> #include <sys/epoll.h>
#include <string.h> #include <string.h>
#include "LRU.h" #include "LRU.h"
#include "StorageHandler.h"
#include "KVMessageFormat.h" #include "KVMessageFormat.h"
#define MAX_EVENTS 10 #define MAX_EVENTS 10
#define DEBUG (0) #define DEBUG (0)
...@@ -33,7 +34,7 @@ void *worker(void *args) { ...@@ -33,7 +34,7 @@ void *worker(void *args) {
int status; int status;
// Generate name for each thread for debugging /* Generate name for each thread for debugging */
char *name = (char *) malloc(5*sizeof(char)); char *name = (char *) malloc(5*sizeof(char));
gen_random(name, 5); gen_random(name, 5);
if DEBUG printf("[%s] Thread started!\n", name); if DEBUG printf("[%s] Thread started!\n", name);
...@@ -51,7 +52,7 @@ void *worker(void *args) { ...@@ -51,7 +52,7 @@ void *worker(void *args) {
perror("epoll_ctl: read_pipe"); perror("epoll_ctl: read_pipe");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
//if DEBUG printf("[%s] Added read pipe to epoll fd set!\n", name);
while (1) { while (1) {
if DEBUG printf("[%s] waiting for epoll event!\n", name); if DEBUG printf("[%s] waiting for epoll event!\n", name);
...@@ -92,7 +93,6 @@ void *worker(void *args) { ...@@ -92,7 +93,6 @@ void *worker(void *args) {
/* Parse the actual message from client */ /* Parse the actual message from client */
struct message *requestMessage= malloc(sizeof(struct message)); struct message *requestMessage= malloc(sizeof(struct message));
int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message)); int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message));
//printf("[Message Received from client] (%d, %s, %s)\n",requestMessage->status,requestMessage->key,requestMessage->value);
if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name); if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name);
switch(requestMessage->status) { switch(requestMessage->status) {
...@@ -175,6 +175,7 @@ int main (int argc, int argv) { ...@@ -175,6 +175,7 @@ int main (int argc, int argv) {
clilen = sizeof(cli_addr); clilen = sizeof(cli_addr);
init_cache(); init_cache();
init_storage()
while(1) { while(1) {
......
#include<semaphore.h>
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include <stdint.h>
#include <limits.h>
#include<fcntl.h>
#include<unistd.h>
#include<string.h>
#include<sys/stat.h>
#include <sys/types.h>
int *fds;
int setSize=2;
int *readCounters;
sem_t *mutex;
sem_t *readerLocks;
sem_t x,y;
pthread_t tid;
pthread_t writerthreads[100],readerthreads[100];
int readercount = 0;
int modulus(char *num, int size, int divisor) {
int rem = 0;
while (size-- > 0) {
rem = ((UCHAR_MAX + 1)*rem + *num) % divisor;
num++;
}
return rem;
}
void file_del(off_t offset, char *key)
{
int index=modulus(key,256,setSize);
fflush(stdout);
int length=0;
sem_wait(&mutex[index]);
char blankspace[512];
char ch;
int k=-1;
lseek(fds[index], offset, SEEK_SET);
while(read(fds[index], &ch,sizeof(ch))!=-1 && ch!='\n')
{
length++;
}
printf("length %d",length);
memset(blankspace, 0, length);
lseek(fds[index], offset, SEEK_SET);
write(fds[index], blankspace, length);
sem_post(&mutex[index]);
}
void file_get(off_t offset, char *key, char *value)
{
/* Gets the value stored at offset */
/* Does not depend on key argument */
int index=modulus(key,256,setSize);
sem_wait(&readerLocks[index]);
readCounters[index]+=1;
if(readCounters[index]==1)
sem_wait(&mutex[index]);
sem_post(&readerLocks[index]);
lseek(fds[index], offset, SEEK_SET);
char line[10];
//FILE *fp =fdopen(fds[index],"r+");
// fseek(fp, offset,SEEK_SET);
size_t len=0;
char ch;
int k=-1;
while(read(fds[index], &ch,sizeof(ch))!=-1 && ch!='\n')
{
if(k>=0)
{
value[k++]=ch;
}
if(ch==':')
{
k=0;
}
}
sem_wait(&readerLocks[index]);
readCounters[index]-=1;
if(readCounters[index]==0)
{
sem_post(&mutex[index]);
}
sem_post(&readerLocks[index]);
}
off_t file_put(char *key,char *value) {
int index=modulus(key,256,setSize);
int bytes, rembytes, i;
off_t position;
sem_wait(&mutex[index]);
printf("[Write to File: %d]\n",index);
position = lseek(fds[index], 0, SEEK_END);
char line [514];
snprintf(line, sizeof(line),"%s:%s\n",key,value);
if(write(fds[index], line, strlen(line))<0)
printf("\n[unable to perform file_put]\n");
sem_post(&mutex[index]);
return position;
}
int init_storage() {
/*
define the array of file descriptors depending on the prefix
define the array of readCount as well as the semaphore (read x and write y) for the same
PUT,DEL would use write lock
GET would use read lock
each write should return the line number
*/
fds=(int *)malloc(sizeof(int)*setSize);
readCounters=(int *)malloc(sizeof(int)*setSize);
readerLocks=(sem_t *)malloc(sizeof(sem_t)*setSize);
mutex=(sem_t *)malloc(sizeof(sem_t)*setSize);
int i=0;
char fileName[setSize+20];
for(i=0;i<setSize;i++)
{
snprintf(fileName,sizeof(fileName),"File%d.txt",i);
fds[i]=open(fileName, O_CREAT|O_RDWR,S_IRWXU);
if(fds[i]<0)
{
printf("\n[Unable to Open File%d.txt]\n",i);
}
sem_init(&readerLocks[i],0,1);
sem_init(&mutex[i],0,1);
readCounters[i]=0;
}
return 0;
}
\ No newline at end of file
#include <sys/types.h>
void * file_del(off_t offset, char *key);
void file_get(off_t offset, char *key, char *value);
off_t file_put(char *key,char *value);
int init_storage();
File added
#include<semaphore.h>
#include<stdio.h> #include<stdio.h>
#include <string.h> #include<stdlib.h>
#include <stdlib.h> #include<unistd.h>
#include "KVMessageFormat.h" #include<pthread.h>
struct message* get(char m,char *key) #include <stdint.h>
{ #include <limits.h>
if(!m || key==NULL ) #include<fcntl.h>
{ #include<unistd.h>
printf("Invalid parameters in get()"); #include<string.h>
return NULL; #include<sys/stat.h>
} #include <sys/types.h>
struct message *request= malloc(sizeof(struct message));
request->status=m; int *fds;
memcpy(request->key,key,256); //copied the complete key int setSize=2;
if(strlen(request->key)<256) int *readCounters;
{ sem_t *mutex;
request->key[strlen(request->key)]='\0'; sem_t *readerLocks;
sem_t x,y;
pthread_t tid;
pthread_t writerthreads[100],readerthreads[100];
int readercount = 0;
unsigned modulus( unsigned char *num, size_t size, unsigned divisor) {
unsigned rem = 0;
while (size-- > 0) {
rem = ((UCHAR_MAX + 1ULL)*rem + *num) % divisor;
num++;
} }
request->value[0]='\0'; return rem;
return request;
} }
struct message* put(char m,char *key,char *value) void file_del( char *key)
{ {
if(!m || (key==NULL && value==NULL)) int index=modulus(key,256,setSize);
fflush(stdout);
int length=0;
sem_wait(&mutex[index]);
char blankspace[512];
memset(blankspace,0,512);
char *line;
char temp[514];
size_t len=0;
off_t position=0;
FILE *fp=fdopen(fds[index],"rb+");
while ((getline(&line, &len, fp)) != -1)
{ {
printf("Invalid parameters in put()"); memcpy(temp,line,strlen(line)-1);
return NULL; char *fkey=strtok(line,":");
} char *fvalue=strtok(NULL,":");
struct message *request= malloc(sizeof(struct message)); if(strcmp(key,fkey)==0){
request->status=m; lseek(fds[index],position,SEEK_SET);
memcpy(request->key,key,256); //copied the complete key write(fds[index],blankspace,strlen(temp));
if(strlen(request->key)<256) puts(temp);
{ break;
request->key[strlen(request->key)]='\0';
} }
position=ftell(fp);
memcpy(request->value,value,256); //copied the complete value
if(strlen(request->key)<256) //to pad with \0
{
request->key[strlen(request->value)]='\0';
} }
sem_post(&mutex[index]);
return request;
} }
struct message *del(char m,char *key){ int file_get(char *key, char *value)
if(!m || key==NULL ) {
/* Gets the value stored at offset */
/* Does not depend on key argument */
int found =0;
int index=modulus(key,256,setSize);
sem_wait(&readerLocks[index]);
readCounters[index]+=1;
if(readCounters[index]==1)
sem_wait(&mutex[index]);
sem_post(&readerLocks[index]);
char *line;
size_t len=0;
FILE *fp=fdopen(fds[index],"rb+");
while ((getline(&line, &len, fp)) != -1)
{ {
printf("Invalid parameters in del()"); char *fkey=strtok(line,":");
return NULL; char *fvalue=strtok(NULL,":");
if(strcmp(key,fkey)==0){
memcpy(value,fvalue,strlen(fvalue)-1);
found=1;
break;
}
} }
struct message *request= malloc(sizeof(struct message)); sem_wait(&readerLocks[index]);
request->status=m; readCounters[index]-=1;
memcpy(request->key,key,256); //copied the complete key if(readCounters[index]==0)
if(strlen(request->key)<256)
{ {
request->key[strlen(request->key)]='\0'; sem_post(&mutex[index]);
} }
request->value[0]='\0'; sem_post(&readerLocks[index]);
return request; return found;
} }
struct message *request(char status,char* key,char* value) off_t file_put(char *key,char *value)
{
if(!status ||(key==NULL && value==NULL) )
{ {
printf("Invalid parameters in request()"); /*
return NULL; if found then ask the offset where it is present and if the value noy matches with the present value ,update the given line
} if not present then search for empty line and insert there!
struct message *requestMessage= malloc(sizeof(struct message)); */
requestMessage->status=status; unsigned int index=modulus(key,256,setSize);
off_t position;
memcpy(requestMessage->key,key,256); //copied the complete key //sem_wait(&mutex[index]);
printf("[Write to File: %d]\n",index);
if(strlen(requestMessage->key)<256) char *line;
char lin[514];
size_t len=0;
FILE *fp=fdopen(fds[index],"rb+");
position=ftell(fp);
while ((getline(&line, &len, fp)) != -1)
{ {
requestMessage->key[strlen(requestMessage->key)]='\0'; char *fkey=strtok(line,":");
} char *fvalue=strtok(NULL,":");
if(value!=NULL) if(strcmp(key,fkey)==0){
if(strcmp(value,strtok(fvalue,"\n"))==0)
{ {
memcpy(requestMessage->value,value,256); //copied the complete value printf("(%s)(%s)\n",fkey,key);
///everything same then why insert?
return 0;
} }
else
if(strlen(requestMessage->value)<256) //to pad with \0
{ {
requestMessage->value[strlen(requestMessage->value)]='\0'; printf("key:(%s)(%s)\n",fkey,key);
printf("value:(%s)(%s)\n",fvalue,value);
fflush(stdout);
fseek(fp,position,SEEK_SET);
snprintf(lin, sizeof(lin),"%s:%s\n",key,value);
if(fputs(lin,fp)<0)
printf("\n[unable to perform file_put]\n");
return 0;
}
} }
printf("[Message Generated at Client]\n[[Status:%c]\n[Key:%s]\n[Value:%s]]",requestMessage->status,requestMessage->key,requestMessage->value); position=ftell(fp);
return requestMessage; }
snprintf(lin, sizeof(lin),"%s:%s\n",key,value);
if(write(fds[index], lin, strlen(lin))<0)
printf("\n[unable to perform file_put]\n");
// sem_post(&mutex[index]);
return position;
} }
void main() int main()
{ {
char message[256]; /*
for(int i=0;i<30;i++) define the array of file descriptors depending on the prefix
define the array of readCount as well as the semaphore (read x and write y) for the same
PUT,DEL would use write lock
GET would use read lock
each write should return the line number
*/
int n2=0;
char prevkey[256]="24";
char key[256]="25";
char value[256]="value2";
off_t offset;
bzero(key+strlen(key),sizeof(key)-strlen(key));
bzero(value+strlen(value),sizeof(value)-strlen(value));
fds=(int *)malloc(sizeof(int)*setSize);
readCounters=(int *)malloc(sizeof(int)*setSize);
readerLocks=(sem_t *)malloc(sizeof(sem_t)*setSize);
mutex=(sem_t *)malloc(sizeof(sem_t)*setSize);
int i=0;
char fileName[setSize+20];
for(i=0;i<setSize;i++)
{ {
message[i]='a'; snprintf(fileName,sizeof(fileName),"File%d.txt",i);
fds[i]=open(fileName, O_CREAT|O_RDWR,S_IRWXU);
if(fds[i]<0)
{
printf("\n[Unable to Open File%d.txt]\n",i);
} }
message[sizeof(message)]='\0'; sem_init(&readerLocks[i],0,1);
request('2',message,message); sem_init(&mutex[i],0,1);
char c ='2'; readCounters[i]=0;
int status=(int)c; }
//offset = file_put("24", "value245");
printf("%d",status); //offset = file_put("40", "value28");
//file_get(prevkey, value); // Doesnot depend on key arg, returns key and value at offset 0
//printf("(%s)", value);
file_del("29");
//creating a message
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment