Commit a28cbc91 authored by Shivaji's avatar Shivaji

Merge branch 'master' of https://git.cse.iitb.ac.in/samarthjoshi/key-value-store into lru-sample

parents f0fb720e ea8ae751
No preview for this file type
No preview for this file type
...@@ -2,11 +2,13 @@ ...@@ -2,11 +2,13 @@
#define PORT 8000 #define PORT 8000
#define SA struct sockaddr #define SA struct sockaddr
#include <stdio.h> #include <stdio.h>
#include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include <sys/types.h> #include <sys/types.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <stdlib.h>
#include "KVClientLibrary.h" #include "KVClientLibrary.h"
#include <string.h> #include <string.h>
...@@ -63,12 +65,23 @@ void interactive (int sock) { ...@@ -63,12 +65,23 @@ void interactive (int sock) {
} while(command[0] != 'q'); } while(command[0] != 'q');
} }
int main(int argc, char const *argv[]) void gen_random(char *s, const int len) {
{ static const char alphanum[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
int sock = 0, valread;
for (int i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
s[len] = 0;
}
int client_init() {
int valread;
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
char *hello = "Hello from client"; char *hello = "Hello from client";
char buffer[1024] = {0}; char buffer[1024] = {0};
int sock = 0;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{ {
printf("\n Socket creation error \n"); printf("\n Socket creation error \n");
...@@ -90,22 +103,77 @@ int main(int argc, char const *argv[]) ...@@ -90,22 +103,77 @@ int main(int argc, char const *argv[])
printf("\nConnection Failed \n"); printf("\nConnection Failed \n");
return -1; return -1;
} }
char key1[256]="abc";
char value1[256]="cde"; return sock;
char error1[256]; }
char key2[256]="test";
char value2[256]="cdde"; void *seige(void * args) {
char error2[256];
int *rcompleted = (int *) args;
char value3[256]; *rcompleted = 0;
//printf("%d",(int)put(sock,key, value,error)); int status_code;
//printf("%d",(int)del(sock,key,error)); char key[256];
//printf("%d",(int)get(sock,key, value,error)); char value[256];
//while (1) { char error[256];
//printf("%d",(int)put(sock, key1, value1, error1)); int sock;
//printf("%d",(int)put(sock, key2, value2, error2)); int r;
//printf("%d\n",(int)get(sock, key1, value3, error2));
//printf("%s", value3); sock = client_init();
interactive(sock);
return 0; while(1) {
gen_random(key, 10);
gen_random(value, 10);
r = rand() % 3;
switch(r) {
case 0: put(sock, key, value, error);
break;
case 1: get(sock, key, value, error);
break;
case 2: get(sock, key, value, error);
break;
}
*rcompleted = *rcompleted + 1;
}
}
int main(int argc, char const *argv[])
{
int sock;
int max_concurrent = 100;
int n = 5;
pthread_t *seige_threads;
int *requestcompleted;
int *requestsent;
int sum;
int seconds = 1;
int i;
if(1) {
sock = client_init();
interactive(sock);
return 0;
} else {
seige_threads = malloc(max_concurrent * sizeof(pthread_t));
requestcompleted = (int *) malloc(max_concurrent * sizeof(int));
for(i=0; i<n; i++) {
pthread_create( &seige_threads[i], NULL, seige, requestcompleted+i );
}
while(1) {
sleep(1);
sum = 0;
for(int i=0; i<n; i++) {
sum = sum + requestcompleted[i];
}
if(seconds%5==0) {
printf("%d, %f\n", i, ((float)sum/seconds));
//pthread_create( &seige_threads[i], NULL, seige, requestcompleted+i );
//i++;
}
if(i==max_concurrent) {
break;
}
seconds++;
}
}
} }
\ No newline at end of file
...@@ -102,17 +102,25 @@ void *worker(void *args) { ...@@ -102,17 +102,25 @@ void *worker(void *args) {
if(status) { if(status) {
requestMessage->status = 200; requestMessage->status = 200;
} else { } else {
requestMessage->status = 240; status = file_get(requestMessage->key, requestMessage->value);
cache_put(requestMessage->key, requestMessage->value);
if(status) {
requestMessage->status = 200;
} else {
requestMessage->status = 240;
}
} }
break; break;
case STATUS_PUT: case STATUS_PUT:
if DEBUG printf("[%s] PUT \n", name); if DEBUG printf("[%s] PUT \n", name);
cache_put(requestMessage->key, requestMessage->value); cache_put(requestMessage->key, requestMessage->value);
file_put(requestMessage->key, requestMessage->value);
requestMessage->status = 200; requestMessage->status = 200;
break; break;
case STATUS_DEL: case STATUS_DEL:
if DEBUG printf("[%s] DEL \n", name); if DEBUG printf("[%s] DEL \n", name);
status = cache_del(requestMessage->key); status = cache_del(requestMessage->key);
file_del(requestMessage->key);
if(status == 0) { if(status == 0) {
requestMessage->status = 240; requestMessage->status = 240;
} else { } else {
...@@ -139,14 +147,14 @@ void *worker(void *args) { ...@@ -139,14 +147,14 @@ void *worker(void *args) {
int main (int argc, int argv) { int main (int argc, int argv) {
int i; int i;
int next_thread_to_assign = 0; int next_thread_to_assign = 0;
pthread_t *worker_threads; pthread_t *worker_threads;
int pool_thread_size = 1; // TODO: get pool thread size from config file int pool_thread_size = 10; // TODO: get pool thread size from config file
int sockfd, newsockfd, portno, clilen, n; int sockfd, newsockfd, portno, clilen, n;
struct sockaddr_in serv_addr, cli_addr; struct sockaddr_in serv_addr, cli_addr;
//int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes)); //int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes));
int pipes[1][2]; // TODO: initialize pipes dynamically int pipes[10][2]; // TODO: initialize pipes dynamically
worker_threads = malloc(pool_thread_size * sizeof(pthread_t)); worker_threads = malloc(pool_thread_size * sizeof(pthread_t));
for( i=0; i < pool_thread_size; i++ ) { for( i=0; i < pool_thread_size; i++ ) {
...@@ -175,7 +183,7 @@ int main (int argc, int argv) { ...@@ -175,7 +183,7 @@ int main (int argc, int argv) {
clilen = sizeof(cli_addr); clilen = sizeof(cli_addr);
init_cache(); init_cache();
init_storage() storage_init();
while(1) { while(1) {
......
CC = gcc CC = gcc
SFLAGS = -pthread -g SFLAGS = -pthread -g
SRV_SRC = LRU.c KVServer.c SRV_SRC = LRU.c StorageHandler.c KVServer.c
SRV_HED = KVMessageFormat.h LRU.h SRV_HED = KVMessageFormat.h LRU.h StorageHandler.h
STARGET = Server STARGET = Server
CFLAGS = -g CFLAGS = -pthread -g
CLI_SRC = KVClientLibrary.c KVClient.c CLI_SRC = KVClientLibrary.c KVClient.c
CLI_HED = KVMessageFormat.h KVClientLibrary.h CLI_HED = KVMessageFormat.h KVClientLibrary.h
CTARGET = Client CTARGET = Client
......
No preview for this file type
...@@ -21,16 +21,16 @@ pthread_t tid; ...@@ -21,16 +21,16 @@ pthread_t tid;
pthread_t writerthreads[100],readerthreads[100]; pthread_t writerthreads[100],readerthreads[100];
int readercount = 0; int readercount = 0;
int modulus(char *num, int size, int divisor) { unsigned modulus( unsigned char *num, size_t size, unsigned divisor) {
int rem = 0; unsigned rem = 0;
while (size-- > 0) { while (size-- > 0) {
rem = ((UCHAR_MAX + 1)*rem + *num) % divisor; rem = ((UCHAR_MAX + 1ULL)*rem + *num) % divisor;
num++; num++;
} }
return rem; return rem;
} }
void file_del(off_t offset, char *key) void file_del( char *key)
{ {
int index=modulus(key,256,setSize); int index=modulus(key,256,setSize);
fflush(stdout); fflush(stdout);
...@@ -39,48 +39,58 @@ void file_del(off_t offset, char *key) ...@@ -39,48 +39,58 @@ void file_del(off_t offset, char *key)
char blankspace[512]; char blankspace[512];
char ch; char ch;
int k=-1; int k=-1;
lseek(fds[index], offset, SEEK_SET); lseek(fds[index], 0, SEEK_SET);
while(read(fds[index], &ch,sizeof(ch))!=-1 && ch!='\n') memset(blankspace,0,512);
{ char *line;
length++; char temp[514];
size_t len=0;
off_t position=0;
FILE *fp=fdopen(fds[index],"rb+");
while ((getline(&line, &len, fp)) != -1)
{
memcpy(temp,line,strlen(line)-1);
char *fkey=strtok(line,":");
char *fvalue=strtok(NULL,":");
if(strcmp(key,fkey)==0){
lseek(fds[index],position,SEEK_SET);
write(fds[index],blankspace,strlen(temp));
break;
}
position=ftell(fp);
} }
printf("length %d",length); printf("length %d",length);
memset(blankspace, 0, length);
lseek(fds[index], offset, SEEK_SET);
write(fds[index], blankspace, length);
sem_post(&mutex[index]); sem_post(&mutex[index]);
} }
void file_get(off_t offset, char *key, char *value) int file_get(char *key, char *value)
{ {
/* Gets the value stored at offset */ /* Gets the value stored at offset */
/* Does not depend on key argument */ /* Does not depend on key argument */
int found =0;
int index=modulus(key,256,setSize); int index=modulus(key,256,setSize);
lseek(fds[index],0,SEEK_SET);
sem_wait(&readerLocks[index]); sem_wait(&readerLocks[index]);
readCounters[index]+=1; readCounters[index]+=1;
if(readCounters[index]==1) if(readCounters[index]==1)
sem_wait(&mutex[index]); sem_wait(&mutex[index]);
sem_post(&readerLocks[index]); sem_post(&readerLocks[index]);
char *line;
lseek(fds[index], offset, SEEK_SET);
char line[10];
//FILE *fp =fdopen(fds[index],"r+");
// fseek(fp, offset,SEEK_SET);
size_t len=0; size_t len=0;
FILE *fp=fdopen(fds[index],"rb+");
char ch; while ((getline(&line, &len, fp)) != -1)
int k=-1; {
while(read(fds[index], &ch,sizeof(ch))!=-1 && ch!='\n') char *fkey=strtok(line,":");
{ char *fvalue=strtok(NULL,":");
if(k>=0) if(strcmp(key,fkey)==0){
{ memcpy(value,fvalue,strlen(fvalue)-1);
value[k++]=ch; found=1;
} break;
if(ch==':')
{
k=0;
} }
} }
sem_wait(&readerLocks[index]); sem_wait(&readerLocks[index]);
readCounters[index]-=1; readCounters[index]-=1;
...@@ -89,27 +99,61 @@ void file_get(off_t offset, char *key, char *value) ...@@ -89,27 +99,61 @@ void file_get(off_t offset, char *key, char *value)
sem_post(&mutex[index]); sem_post(&mutex[index]);
} }
sem_post(&readerLocks[index]); sem_post(&readerLocks[index]);
return found;
} }
off_t file_put(char *key,char *value) { off_t file_put(char *key,char *value)
{
int index=modulus(key,256,setSize); /*
int bytes, rembytes, i; if found then ask the offset where it is present and if the value noy matches with the present value ,update the given line
if not present then search for empty line and insert there!
*/
unsigned int index=modulus(key,256,setSize);
lseek(fds[index],0,SEEK_SET);
off_t position; off_t position;
//sem_wait(&mutex[index]);
sem_wait(&mutex[index]);
printf("[Write to File: %d]\n",index); printf("[Write to File: %d]\n",index);
position = lseek(fds[index], 0, SEEK_END); char *line;
char line [514]; char lin[514];
snprintf(line, sizeof(line),"%s:%s\n",key,value); size_t len=0;
if(write(fds[index], line, strlen(line))<0) FILE *fp=fdopen(fds[index],"rb+");
position=ftell(fp);
while ((getline(&line, &len, fp)) != -1)
{
char *fkey=strtok(line,":");
char *fvalue=strtok(NULL,":");
if(strcmp(key,fkey)==0){
if(strcmp(value,strtok(fvalue,"\n"))==0)
{
printf("(%s)(%s)\n",fkey,key);
///everything same then why insert?
return 0;
}
else
{
printf("key:(%s)(%s)\n",fkey,key);
printf("value:(%s)(%s)\n",fvalue,value);
fflush(stdout);
fseek(fp,position,SEEK_SET);
snprintf(lin, strlen(key)+strlen(value)+2,"%s:%s\n",key,value);
if(fputs(lin,fp)<0)
printf("\n[unable to perform file_put]\n");
return 0;
}
}
position=ftell(fp);
}
snprintf(lin, sizeof(lin),"%s:%s\n",key,value);
if(write(fds[index], lin, strlen(lin))<0)
printf("\n[unable to perform file_put]\n"); printf("\n[unable to perform file_put]\n");
sem_post(&mutex[index]);
// sem_post(&mutex[index]);
return position; return position;
} }
int storage_init()
int init_storage() { {
/* /*
define the array of file descriptors depending on the prefix define the array of file descriptors depending on the prefix
define the array of readCount as well as the semaphore (read x and write y) for the same define the array of readCount as well as the semaphore (read x and write y) for the same
...@@ -135,6 +179,5 @@ int init_storage() { ...@@ -135,6 +179,5 @@ int init_storage() {
sem_init(&mutex[i],0,1); sem_init(&mutex[i],0,1);
readCounters[i]=0; readCounters[i]=0;
} }
return 0;
}
}
\ No newline at end of file
#include <sys/types.h> #include <sys/types.h>
void * file_del(off_t offset, char *key); int storage_init();
void file_get(off_t offset, char *key, char *value); void file_del( char *key);
off_t file_put(char *key,char *value); off_t file_put(char *key,char *value);
int init_storage(); int file_get(char *key, char *value);
void file_del( char *key);
No preview for this file type
...@@ -71,7 +71,7 @@ int file_get(char *key, char *value) ...@@ -71,7 +71,7 @@ int file_get(char *key, char *value)
/* Does not depend on key argument */ /* Does not depend on key argument */
int found =0; int found =0;
int index=modulus(key,256,setSize); int index=modulus(key,strlen(key),setSize);
lseek(fds[index],0,SEEK_SET); lseek(fds[index],0,SEEK_SET);
sem_wait(&readerLocks[index]); sem_wait(&readerLocks[index]);
readCounters[index]+=1; readCounters[index]+=1;
...@@ -122,6 +122,7 @@ off_t file_put(char *key,char *value) ...@@ -122,6 +122,7 @@ off_t file_put(char *key,char *value)
{ {
char *fkey=strtok(line,":"); char *fkey=strtok(line,":");
char *fvalue=strtok(NULL,":"); char *fvalue=strtok(NULL,":");
printf("key:(%s)(%s)\n",fkey,key);
if(strcmp(key,fkey)==0){ if(strcmp(key,fkey)==0){
if(strcmp(value,strtok(fvalue,"\n"))==0) if(strcmp(value,strtok(fvalue,"\n"))==0)
{ {
...@@ -131,11 +132,11 @@ off_t file_put(char *key,char *value) ...@@ -131,11 +132,11 @@ off_t file_put(char *key,char *value)
} }
else else
{ {
printf("key:(%s)(%s)\n",fkey,key);
printf("value:(%s)(%s)\n",fvalue,value); printf("value:(%s)(%s)\n",fvalue,value);
fflush(stdout); fflush(stdout);
fseek(fp,position,SEEK_SET); fseek(fp,position,SEEK_SET);
snprintf(lin, strlen(key)+strlen(value)+2,"%s:%s\n",key,value); snprintf(lin, sizeof(lin),"%s:%s\n",key,value);
if(fputs(lin,fp)<0) if(fputs(lin,fp)<0)
printf("\n[unable to perform file_put]\n"); printf("\n[unable to perform file_put]\n");
return 0; return 0;
...@@ -187,13 +188,15 @@ int main() ...@@ -187,13 +188,15 @@ int main()
readCounters[i]=0; readCounters[i]=0;
} }
//offset = file_put("24", "value245"); //offset = file_put("24", "value245");
//offset = file_put("40", "value28"); offset = file_put("24", "value1");
file_put("29","update2");
file_put("28","update2");
//file_get(prevkey, value); // Doesnot depend on key arg, returns key and value at offset 0 //file_get(prevkey, value); // Doesnot depend on key arg, returns key and value at offset 0
//printf("(%s)", value); //printf("(%s)", value);
//file_del("29"); //file_del("29");
//file_get("24",value); //file_get("24",value);
// printf("(%s)\n", value); // printf("(%s)\n", value);
file_del("21"); //file_del("21");
//printf("(%s)\n", value); //printf("(%s)\n", value);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment