Commit 4e39cb9d authored by SHAILESH KUMAR's avatar SHAILESH KUMAR

completed0

parent 3cbfa088
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include "jobs.h"
void getFileName(char * fname,long int x){
char id[5];
sprintf(id,"%ld",x);
strcpy(fname,"data/kv_");
strcat(fname,id);
strcat(fname,".bin");
}
void getTempFileName(char * fname,long int x){
char id[5];
sprintf(id,"%ld",x);
strcpy(fname,"data/temp_");
strcat(fname,id);
strcat(fname,".bin");
}
int deleteFromFile(char *key){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
char fname[17],temp[17];
int flag=0;
getFileName(fname,strlen(key));
getTempFileName(temp,strlen(key));
FILE *fp;
if( access( fname, F_OK ) != -1 ){
fp=fopen(fname,"rb");
}
else{
free(store);
return 0;
}
FILE *tempfp=fopen(temp,"wb");
if(fp==NULL || tempfp==NULL){
printf("ERROR:error opening file\n");
}
while(fread(store,sizeof(kvstore_data),1,fp)){
if(strcmp(store->key,key)){
printf("%s\n",store->key);
fwrite(store,sizeof(kvstore_data),1,tempfp);
flag=1;
}
}
/*fread(store,sizeof(kvstore_data),1,fp);
if(strcmp(store->key,key))
fwrite(store,sizeof(kvstore_data),1,tempfp);
*/
fclose(fp);
fclose(tempfp);
remove(fname);
rename(temp,fname);
return flag;
}
int dumpToFile(kvcache_data *cache){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
strcpy(store->key,cache->key);
strcpy(store->value,cache->value);
char fname[17];
getFileName(fname,strlen(cache->key));
deleteFromFile(cache->key);
FILE *fp=fopen(fname,"ab");
if(fp==NULL){
printf("ERROR:error opening file\n");
return 0;
}
if(!fwrite(store,sizeof(kvstore_data),1,fp)){
printf("ERROR:Error writing file.\n");
free(store);
fclose(fp);
return 0;
}
else{
free(store);
fclose(fp);
return 1;
}
return 1;
}
int restoreFromFile(char *key,char *value){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
char fname[17];
FILE *fp;
getFileName(fname,strlen(key));
if( access( fname, F_OK ) != -1 ){
fp=fopen(fname,"rb");
}
else{
free(store);
return 0;
}
if(fp==NULL){
printf("ERROR:error opening file\n");
}
while(fread(store,sizeof(kvstore_data),1,fp)){
if(!strcmp(key,store->key)){
strcpy(value,store->value);
free(store);
fclose(fp);
return 1;
}
}
return 0;
}
File deleted
makeserver: server.o client.o
gcc -o exec server.o client.o
server.o: server.c toxml.h jobs.h
gcc -c server.o server.c toxml.h jobs.h
client.o: client.c toxml.h jobs.h
gcc -c client.o client.c toxml.h jobs.h
CC=gcc
CFLAGS=-c -Wall
all: server client clean
server: server.o header_files/toxml.h header_files/KVCache.h header_files/KVStore.h header_files/jobs.h
$(CC) -pthread server.o header_files/toxml.h header_files/KVCache.h header_files/KVStore.h header_files/jobs.h -o server
client: client.o header_files/toxml.h header_files/jobs.h
$(CC) client.o header_files/toxml.h header_files/jobs.h -o client
server.o:
$(CC) $(CFLAGS) -pthread server.c
client.o:
$(CC) $(CFLAGS) client.c
clean:
rm -rf *.o
File deleted
No preview for this file type
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netdb.h> #include <netdb.h>
#include "toxml.h" #include "header_files/toxml.h"
void sendXML(FILE *fr,int sockfd,char *msg){ void sendXML(FILE *fr,int sockfd,char *msg){
char cmd[4]; char cmd[4];
char key[257]; char key[257];
...@@ -101,13 +101,11 @@ void error(char *msg) ...@@ -101,13 +101,11 @@ void error(char *msg)
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
int sockfd, portno, n ; int sockfd, portno ;
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
struct hostent *server; struct hostent *server;
char buffer[256];
portno = atoi("8080"); portno = atoi("8080");
sockfd = socket(AF_INET, SOCK_STREAM, 0); sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) if (sockfd < 0)
...@@ -123,8 +121,9 @@ int main(int argc, char *argv[]) ...@@ -123,8 +121,9 @@ int main(int argc, char *argv[])
(char *)&serv_addr.sin_addr.s_addr, (char *)&serv_addr.sin_addr.s_addr,
server->h_length); server->h_length);
serv_addr.sin_port = htons(portno); serv_addr.sin_port = htons(portno);
if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0) if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0) {
error("ERROR connecting"); error("ERROR connecting");
}
FILE *fp,*fr; FILE *fp,*fr;
char* line=NULL; char* line=NULL;
......
#include <stdio.h>
#include<stdlib.h>
#include<string.h>
#include<strings.h>
#include<unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include "toxml.h"
void sendXML(FILE *fr,int sockfd,char *msg){
char cmd[4];
char key[257];
char value[256*1024+1];
char encodedXML[300000];
char msgtype[10];
char delim[2]="\n";
strcpy(key,"");
strcpy(value,"");
for(int i =0;i<3;i++){
cmd[i]=msg[i];
}
cmd[3]='\0';
if(!strcmp(cmd,"GET")){
strcpy(msgtype,"getreq");
strcat(key,msg+4);
key[strlen(key)-1]='\0';
//printf("%s",key);
}
else if(!strcmp(cmd,"PUT")){
strcpy(msgtype,"putreq");
char a[2];
for(int i=4;i<strlen(msg);i++){
if(msg[i]==' '){
strcat(value,msg+i+1);
break;
}
else{
a[0]=msg[i];
a[1]='\0';
strcat(key,a);
}
}
value[strlen(value)-1]='\0';
//printf("%s**%s",key,value);
}
else if(!strcmp(cmd,"DEL")){
strcpy(msgtype,"delreq");
strcat(key,msg+4);
key[strlen(key)-1]='\0';
//printf("%s",key);
}
else{
fprintf(fr,"Unknown Error: Invalid Command.\n");
return;
}
if(strlen(key)>256){
fprintf(fr,"Oversized key\n");
return;
}
if(strlen(key)==0){
fprintf(fr,"ERROR: Key cannot be NULL\n");
return;
}
if(!strcmp(cmd,"PUT") && strlen(value)==0){
fprintf(fr,"ERROR: Value cannot be NULL\n");
return;
}
if(!strcmp(cmd,"PUT") && strlen(value)>(256*1024)){
fprintf(fr,"Oversized value\n");
return;
}
toXML(encodedXML,key,value,msgtype,"");
write(sockfd,strcat(encodedXML,delim),strlen(encodedXML));
char buf[300000],message[100];
read(sockfd,buf,300000);
fromXML(key,value,msgtype,message,buf);
if(strlen(message)!=0){fprintf(fr,"%s\n",message);}
else{
fprintf(fr,"%s %s\n",key,value);
}
return;
}
void error(char *msg)
{
perror(msg);
exit(0);
}
int main(int argc, char *argv[])
{
int sockfd, portno, n ;
struct sockaddr_in serv_addr;
struct hostent *server;
char buffer[256];
portno = atoi("8080");
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
error("ERROR opening socket");
server = gethostbyname("127.0.0.1");
if (server == NULL) {
fprintf(stderr,"ERROR, no such host\n");
exit(0);
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
bcopy((char *)server->h_addr,
(char *)&serv_addr.sin_addr.s_addr,
server->h_length);
serv_addr.sin_port = htons(portno);
if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0)
error("ERROR connecting");
FILE *fp,*fr;
char* line=NULL;
size_t len=0;
fp=fopen("req/client_req.txt","r");
fr=fopen("resp/client_resp.txt","a+");
if(fp==NULL || fr==NULL){
exit(1);
}
while((getline(&line,&len,fp))!=-1){
sendXML(fr,sockfd,line);
sleep(1);
}
fclose(fp);
//exit(0);
return 0;
}
Success
hello world
Success
hello world
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
Success
hello world
Success
Does not exist
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
File added
No preview for this file type
No preview for this file type
File added
//#include "jobs.h"
#include "KVStore.c"
#include "KVStore.h"
#include "config.h"
#include<stdlib.h> #include<stdlib.h>
#include<stdio.h> #include<stdio.h>
#include<string.h> #include<string.h>
#include<pthread.h>
long int getSetId(char * key){ long int getSetId(char * key){
return strlen(key); return strlen(key)%NO_OF_SETS;
} }
kvcache_data** init_cache(){ kvcache_data** init_cache(){
kvcache_data **t = (kvcache_data **)malloc(256 * sizeof(kvcache_data *)); printf("Initializing Cache Memory\n");
for (int i = 0; i < 256; ++i){ kvcache_data **t = (kvcache_data **)malloc(NO_OF_SETS * sizeof(kvcache_data *));
t[i] = (kvcache_data *)malloc( sizeof(kvcache_data)*10); for (int i = 0; i < NO_OF_SETS; ++i){
for(int j=0;j<10;j++){ t[i] = (kvcache_data *)malloc( sizeof(kvcache_data)*NO_OF_CACHE_ENTRIES);
for(int j=0;j<NO_OF_CACHE_ENTRIES;j++){
t[i][j].chance=-1; t[i][j].chance=-1;
} }
} }
...@@ -21,20 +22,33 @@ kvcache_data** init_cache(){ ...@@ -21,20 +22,33 @@ kvcache_data** init_cache(){
return t; return t;
} }
int* rep_line_per_set(){ int* rep_line_per_set(){
int* x=(int *)calloc(sizeof(int),256); int* x=(int *)calloc(sizeof(int),NO_OF_CACHE_ENTRIES);
for(int i=0;i<256;i++){ for(int i=0;i<NO_OF_CACHE_ENTRIES;i++){
x[i]=0; x[i]=0;
} }
return x; return x;
} }
int getInCache(kvcache_data** t,int* repl,char *key,char *value){
pthread_mutex_t * lock_initialize(){
printf("Initializing Cache Locks\n");
pthread_mutex_t *x=(pthread_mutex_t*)calloc(sizeof(pthread_mutex_t),NO_OF_CACHE_ENTRIES);
for(int i=0;i<NO_OF_CACHE_ENTRIES;i++){
if(pthread_mutex_init(&x[i],NULL)!=0){
printf("failed\n");
}
}
return x;
}
int getInCache(kvcache_data** t,int* repl,pthread_mutex_t* cache_lock, char *key,char *value){
int ln=getSetId(key)-1; int ln=getSetId(key)-1;
for(int i=0;i<10;i++){ pthread_mutex_lock(&cache_lock[ln]);
for(int i=0;i<NO_OF_CACHE_ENTRIES;i++){
if(t[ln][i].chance!=-1) if(t[ln][i].chance!=-1)
{ {
if(!strcmp(t[ln][i].key,key)){ if(!strcmp(t[ln][i].key,key)){
t[ln][i].chance=1; t[ln][i].chance=1;
strcpy(value,t[ln][i].value); strcpy(value,t[ln][i].value);
pthread_mutex_unlock(&cache_lock[ln]);
return 1; return 1;
} }
} }
...@@ -46,33 +60,38 @@ int getInCache(kvcache_data** t,int* repl,char *key,char *value){ ...@@ -46,33 +60,38 @@ int getInCache(kvcache_data** t,int* repl,char *key,char *value){
int temp=repl[ln]; int temp=repl[ln];
while(t[ln][temp].chance==1){ while(t[ln][temp].chance==1){
t[ln][temp].chance=0; t[ln][temp].chance=0;
repl[ln]=(repl[ln]+1)%10; repl[ln]=(repl[ln]+1)%NO_OF_CACHE_ENTRIES;
temp=repl[ln]; temp=repl[ln];
} }
if(!restoreFromFile(key,value)){ if(!restoreFromFile(key,value)){
pthread_mutex_unlock(&cache_lock[ln]);
return 0; return 0;
} }
else{ else{
strcpy(t[ln][temp].key,key); strcpy(t[ln][temp].key,key);
strcpy(t[ln][temp].value,value); strcpy(t[ln][temp].value,value);
repl[ln]=(repl[ln]+1)%10; repl[ln]=(repl[ln]+1)%NO_OF_CACHE_ENTRIES;
t[ln][temp].chance=0; t[ln][temp].chance=0;
pthread_mutex_unlock(&cache_lock[ln]);
return 1; return 1;
} }
} }
int postInCache(kvcache_data** t,int* repl,char *key,char *value){ int postInCache(kvcache_data** t,int* repl,pthread_mutex_t* cache_lock, char *key,char *value){
int ln=getSetId(key)-1; int ln=getSetId(key)-1;
for(int i=0;i<10;i++){ pthread_mutex_lock(&cache_lock[ln]);
for(int i=0;i<NO_OF_CACHE_ENTRIES;i++){
if(t[ln][i].chance!=-1) if(t[ln][i].chance!=-1)
{ {
if(!strcmp(t[ln][i].key,key)){ if(!strcmp(t[ln][i].key,key)){
t[ln][i].chance=1; t[ln][i].chance=1;
strcpy(t[ln][i].value,value); strcpy(t[ln][i].value,value);
pthread_mutex_unlock(&cache_lock[ln]);
return 1; return 1;
} }
} }
...@@ -83,27 +102,31 @@ int postInCache(kvcache_data** t,int* repl,char *key,char *value){ ...@@ -83,27 +102,31 @@ int postInCache(kvcache_data** t,int* repl,char *key,char *value){
int temp=repl[ln]; int temp=repl[ln];
while(t[ln][temp].chance==1){ while(t[ln][temp].chance==1){
t[ln][temp].chance=0; t[ln][temp].chance=0;
repl[ln]=(repl[ln]+1)%10; repl[ln]=(repl[ln]+1)%NO_OF_CACHE_ENTRIES;
temp=repl[ln]; temp=repl[ln];
} }
//printf("%d\n",temp); //printf("%d\n",temp);
strcpy(t[ln][temp].key,key); strcpy(t[ln][temp].key,key);
strcpy(t[ln][temp].value,value); strcpy(t[ln][temp].value,value);
repl[ln]=(repl[ln]+1)%10; repl[ln]=(repl[ln]+1)%NO_OF_CACHE_ENTRIES;
t[ln][temp].chance=0; t[ln][temp].chance=0;
if(dumpToFile(&t[ln][temp])){ if(dumpToFile(&t[ln][temp])){
pthread_mutex_unlock(&cache_lock[ln]);
return 1; return 1;
} }
else{ else{
return 0; return 0;
pthread_mutex_unlock(&cache_lock[ln]);
} }
pthread_mutex_unlock(&cache_lock[ln]);
return 1; return 1;
} }
int delInCache(kvcache_data** t,int* repl,char *key){ int delInCache(kvcache_data** t,int* repl,pthread_mutex_t* cache_lock, char *key){
int flag=0;
int ln=getSetId(key)-1; int ln=getSetId(key)-1;
for(int i=0;i<10;i++){ pthread_mutex_lock(&cache_lock[ln]);
for(int i=0;i<NO_OF_CACHE_ENTRIES;i++){
if(t[ln][i].chance!=-1) if(t[ln][i].chance!=-1)
{ {
if(!strcmp(t[ln][i].key,key)){ if(!strcmp(t[ln][i].key,key)){
...@@ -119,9 +142,11 @@ int delInCache(kvcache_data** t,int* repl,char *key){ ...@@ -119,9 +142,11 @@ int delInCache(kvcache_data** t,int* repl,char *key){
} }
if(deleteFromFile(key)){ if(deleteFromFile(key)){
pthread_mutex_unlock(&cache_lock[ln]);
return 1; return 1;
} }
else{ else{
pthread_mutex_unlock(&cache_lock[ln]);
return 0; return 0;
} }
} }
......
//#include "jobs.h"
#include "KVStore.c"
#include "KVStore.h"
#include<stdlib.h> #include<stdlib.h>
#include<stdio.h> #include<stdio.h>
#include<string.h> #include<string.h>
......
#define NO_OF_SETS 256
#define NO_OF_CACHE_ENTRIES 10
\ No newline at end of file
#include<pthread.h>
typedef struct jobs_queue jobs_queue; typedef struct jobs_queue jobs_queue;
struct jobs_queue{ struct jobs_queue{
int fd; int fd;
...@@ -22,4 +23,17 @@ struct kvstore_data{ ...@@ -22,4 +23,17 @@ struct kvstore_data{
char value[256*1024+10]; char value[256*1024+10];
}; };
typedef struct tpool tpool_t;
struct tpool {
kvcache_data **t;
int *repl;
pthread_mutex_t* cache_lock;
pthread_mutex_t work_mutex;
pthread_cond_t work_cond;
pthread_cond_t working_cond;
size_t working_cnt;
size_t thread_cnt;
jobs_queue * head;
jobs_queue *tail;
int stop;
};
jobs_queue *tpool_work_get(tpool_t *tm)
{
jobs_queue *work;
if (tm == NULL)
return NULL;
work = tm->head;
if (work == NULL)
return NULL;
if (work->next == NULL) {
tm->head = NULL;
tm->tail = NULL;
} else {
tm->head = work->next;
}
return work;
}
void process_worker(jobs_queue *job,tpool_t* tm){
//printf("executing job\n");
char resp[300000];
if(!strcmp(job->messagetype,"putreq")){
if(postInCache(tm->t,tm->repl,tm->cache_lock, job->key,job->value)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","IOError");
}
}
else if(!strcmp(job->messagetype,"getreq")){
if(getInCache(tm->t,tm->repl,tm->cache_lock ,job->key,job->value)){
toXML(resp,job->key,job->value,"resp","");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
else if(!strcmp(job->messagetype,"delreq")){
if(delInCache(tm->t,tm->repl,tm->cache_lock, job->key)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
//printf("%s\n",resp);
write(job->fd,resp,strlen(resp));
printf("Request fulfilled of client no. %d\n",job->fd );
}
void *worker(void *arg){
jobs_queue *x=NULL;
tpool_t *tm= (tpool_t*) arg ;
while(1){
pthread_mutex_lock(&tm->work_mutex);
//printf("thread:locked by %ld\n",pthread_self());
while(tm->head==NULL){
//printf("unlocked and waiting by %ld\n",pthread_self());
pthread_cond_wait(&tm->work_cond,&tm->work_mutex);
}
//printf("job came acquired by %ld\n",pthread_self());
tm->working_cnt++;
x=tpool_work_get(tm);
//printf("%d\n",x->fd);
pthread_mutex_unlock(&tm->work_mutex);
//printf("unlocked by %ld\n",pthread_self());
process_worker(x,tm);
free(x);
pthread_mutex_lock(&tm->work_mutex);
tm->working_cnt--;
if(tm->working_cnt==0 && tm->head==NULL){
pthread_cond_signal(&tm->working_cond);
}
pthread_mutex_unlock(&tm->work_mutex);
}
tm->thread_cnt--;
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return NULL;
}
tpool_t *tpool_create(size_t num,kvcache_data** t,int* repl,pthread_mutex_t* cache_lock,pthread_mutex_t q_lock)
{
printf("Creating threadpool\n");
tpool_t *tm;
pthread_t thread;
size_t i;
if (num == 0)
num = 2;
tm = calloc(1, sizeof(*tm));
tm->thread_cnt = num;
tm->t=t;
tm->repl=repl;
tm->cache_lock=cache_lock;
tm->head=NULL;
tm->tail=NULL;
//pthread_mutex_init(&(tm->work_mutex), NULL);
tm->work_mutex=q_lock;
pthread_cond_init(&(tm->work_cond), NULL);
pthread_cond_init(&(tm->working_cond), NULL);
for (i=0; i<num; i++) {
pthread_create(&thread, NULL, worker, tm);
pthread_detach(thread);
}
return tm;
}
...@@ -113,7 +113,7 @@ int main() { ...@@ -113,7 +113,7 @@ int main() {
} }
/*toXML(x,"","delreq","success"); toXML(x,"","delreq","success");
toXML(x,y,"resp","success"); toXML(x,y,"resp","success");
toXML("",y,"resp","success"); toXML("",y,"resp","success");
toXML("",y,"resp","failure"); toXML("",y,"resp","failure");
......
...@@ -113,7 +113,7 @@ int main() { ...@@ -113,7 +113,7 @@ int main() {
} }
/*toXML(x,"","delreq","success"); toXML(x,"","delreq","success");
toXML(x,y,"resp","success"); toXML(x,y,"resp","success");
toXML("",y,"resp","success"); toXML("",y,"resp","success");
toXML("",y,"resp","failure"); toXML("",y,"resp","failure");
......
typedef struct jobs_queue jobs_queue;
struct jobs_queue{
int fd;
char xml[300000];
jobs_queue * next;
};
typedef struct kvcache_data kvcache_data;
struct kvcache_data{
int chance;
char key[300];
char value[256*1024+10];
};
typedef struct kvstore_data kvstore_data;
struct kvstore_data{
char key[300];
char value[256*1024+10];
};
starting key value server ********************
KEY VALUE SERVER
********************
********************
TEAM MEMBERS
********************
1. Shailesh Kumar (193050092)
2. Rajesh Datta Mahale (193050063)
*****************
Folder Structure
******************
*data - a folder to contains all the KVStore files
*header_files
jobs.h - a header file that contains structures that commonly used in different codes.
toxml.h - a header file that contains functions to convert data to XML and parsing back to XML.
threadpool.h - functions needed to implement threadpool in the project.
KVCache.h - a header file that contains the function to implement KVCahce.
KVStore.h - a header file that contains the function to implement KVStore.
config.h - a header file that contains the configurations that should be implement.
*req_resp
client_req.txt - a file that contains request in space seperated format
client_resp.txt - a file where all the response to be written.
--client.c
a C file that contains client code
--server.c
a C file that contains server code
--Makefile
a makefile that makes both client and server program at once.
--server and client
autogenerated by make command and need to be run.
**************************************************************************************
NOTE: Both server and client is in same folder and easy to distinguish from each other.
**************************************************************************************
**********************
Executing procedure:
**********************
1. First make the code using "make" command .
2. Run "./server" command to start the server.
3. Run "./client" command to start the client.
4. files "client_resp.txt" and "client_req.txt" can be compared to verify result.
\ No newline at end of file
PUT hello world PUT hello world
GET hello GET hell
DEL hello DEL hello
PUT hello
GET hello GET hello
GET GET
PUT aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaapaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa bjjbj PUT aaaaaaaaaaaaaaaaaaaaaaaaaaaoooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooobbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaapaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa bjjbj
PUT xyz abc PUT xyz abc
PUT xyx bgc PUT xyz bgc
GET xyz GT xyz
PUT safsdfsdfsdfsdfsdfsdf
sdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsfsdfds
PUT sdfsdf sdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsfsdfdsdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsdfsfsdfdsjgsnkahlgabgkdfbnl
...@@ -3,7 +3,7 @@ GET hello ...@@ -3,7 +3,7 @@ GET hello
DEL hello DEL hello
GET hello GET hello
GET GET
PUT aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaapaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa bjjbj PUT aaaaaaaaaaaaaaaaaaaaaaaaaaaoooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooobbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaapaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa bjjbj
PUT xyz abc PUT xyz abc
PUT xyx bgc PUT xyx bgc
GET xyz GET xyz
Success Success
hello world
Success
Does not exist Does not exist
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
Success Success
hello world
Success Success
hello world hello
ERROR: Key cannot be NULL ERROR: Key cannot be NULL
Oversized key Oversized key
Success Success
Success Success
xyz abc Unknown Error: Invalid Command.
ERROR: Value cannot be NULL
Unknown Error: Invalid Command.
Success Success
hello world
Success Success
Does not exist Does not exist
ERROR: Key cannot be NULL
Oversized key
Success
Success Success
xyz abc
Success Success
hello world hello
Success
hello world
ERROR: Key cannot be NULL ERROR: Key cannot be NULL
Oversized key Oversized key
Success Success
Success Success
xyz abc Unknown Error: Invalid Command.
ERROR: Value cannot be NULL
Unknown Error: Invalid Command.
Success Success
hello world
Success Success
Does not exist Does not exist
Success
Success
hello
ERROR: Key cannot be NULL ERROR: Key cannot be NULL
Oversized key Oversized key
Success Success
Success Success
xyz abc Unknown Error: Invalid Command.
ERROR: Value cannot be NULL
Unknown Error: Invalid Command.
Success Success
hello world
Success Success
Does not exist Does not exist
Success
Success
hello
ERROR: Key cannot be NULL ERROR: Key cannot be NULL
Oversized key Oversized key
Success Success
Success Success
xyz abc Unknown Error: Invalid Command.
ERROR: Value cannot be NULL
Unknown Error: Invalid Command.
Success Success
hello world
Success Success
Does not exist Does not exist
ERROR: Key cannot be NULL
Success Success
Success Success
hello
ERROR: Key cannot be NULL
Oversized key
Success
Success Success
xyz abc Unknown Error: Invalid Command.
ERROR: Value cannot be NULL
Unknown Error: Invalid Command.
Success Success
hello world
Success Success
Does not exist Does not exist
Success
Success
hello
ERROR: Key cannot be NULL ERROR: Key cannot be NULL
Oversized key
Success Success
Success Success
Unknown Error: Invalid Command.
ERROR: Value cannot be NULL
Unknown Error: Invalid Command.
Success Success
xyz abc
No preview for this file type
...@@ -13,12 +13,15 @@ ...@@ -13,12 +13,15 @@
#include<strings.h> #include<strings.h>
#include<string.h> #include<string.h>
#include<pthread.h> #include<pthread.h>
#include "toxml.h" #include "header_files/toxml.h"
#include "KVCache.c" #include "header_files/KVCache.h"
#include "header_files/threadpool.h"
/*JOB QUEUE POINTER*/ /*JOB QUEUE POINTER*/
jobs_queue *job_head=NULL; jobs_queue *job_head=NULL;
jobs_queue *job_tail=NULL; jobs_queue *job_tail=NULL;
tpool_t *tpool;
/*CACHE POINTERS*/ /*CACHE POINTERS*/
kvcache_data **t; kvcache_data **t;
...@@ -26,13 +29,15 @@ int *repl; ...@@ -26,13 +29,15 @@ int *repl;
/*LOCK FOR INSERTING IN JOB QUEUE*/ /*LOCK FOR INSERTING IN JOB QUEUE*/
pthread_mutex_t job_q_lock; pthread_mutex_t job_q_lock;
/*LOCK FOR CACHE PER SETS*/
/* FUNCTION PROTOTYPE OF PROCESSING QUERY*/ pthread_mutex_t* cache_lock;
void process_work(jobs_queue*);
/* FUNCION PROTOTYPE FOR SERVING THE CLIENT */ /* FUNCION PROTOTYPE FOR SERVING THE CLIENT */
void *serve(void*); void *serve(void*);
/* FUNCTION FOR HANDLING ERRORS*/ /* FUNCTION FOR HANDLING ERRORS*/
void error(char *msg) void error(char *msg)
{ {
...@@ -40,13 +45,22 @@ void error(char *msg) ...@@ -40,13 +45,22 @@ void error(char *msg)
exit(1); exit(1);
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
int sockfd, newsockfd, portno, clilen, pid; socklen_t sockfd, newsockfd, clilen;
struct sockaddr_in serv_addr, cli_addr; struct sockaddr_in serv_addr, cli_addr;
/*Cache related variables initialization*/
t= init_cache(); t= init_cache();
repl=rep_line_per_set(); repl=rep_line_per_set();
pthread_mutex_init(&job_q_lock,NULL); cache_lock=lock_initialize();
/*Creating threadpool*/
tpool=tpool_create(10,t,repl,cache_lock,job_q_lock);
/*Initializing the lock for inserting and reading in job_queue*/
pthread_mutex_init(&job_q_lock,NULL);
sockfd = socket(AF_INET, SOCK_STREAM, 0); sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) if (sockfd < 0)
error("ERROR opening socket"); error("ERROR opening socket");
...@@ -59,9 +73,9 @@ int main(int argc, char *argv[]) ...@@ -59,9 +73,9 @@ int main(int argc, char *argv[])
error("ERROR on binding"); error("ERROR on binding");
listen(sockfd,5); listen(sockfd,5);
clilen = sizeof(cli_addr); clilen = sizeof(cli_addr);
printf("Server started\n");
while (1) { while (1) {
newsockfd = accept(sockfd, newsockfd = accept(sockfd,(struct sockaddr *) &cli_addr, &clilen);
(struct sockaddr *) &cli_addr, &clilen);
pthread_t thr; pthread_t thr;
int *arg; int *arg;
...@@ -72,12 +86,13 @@ int main(int argc, char *argv[]) ...@@ -72,12 +86,13 @@ int main(int argc, char *argv[])
return 0; /* we never get here */ return 0; /* we never get here */
} }
/******** serve() ********************* /******** serve() *********************
There is a separate instance of this function There is a separate instance of this function
for each connection. It handles all communication for each connection. It handles all communication
once a connnection has been established. once a connnection has been established.
*****************************************/ *****************************************/
void * serve (void* socket) void * serve (void* socket)
{ {
int sock; int sock;
...@@ -94,68 +109,58 @@ void * serve (void* socket) ...@@ -94,68 +109,58 @@ void * serve (void* socket)
n = read(sock,buffer,300000); n = read(sock,buffer,300000);
if (n < 0) error("ERROR reading from socket"); if (n < 0) error("ERROR reading from socket");
else if (n==0) { else if (n==0) {
printf("Client has been closed,Killing the thread\n"); printf("Client has been closed,Killing the thread\n");
pthread_cancel(tid); pthread_cancel(tid);
} }
else{ else{
//Creating a structure to add in linked list of job queue //Creating a structure to add in linked list of job queue
jobs_queue *job=(jobs_queue *)malloc((sizeof(jobs_queue))); jobs_queue *job=(jobs_queue *)malloc((sizeof(jobs_queue)));
job->fd=sock; job->fd=sock;
job->next=NULL; job->next=NULL;
// printf("%d\n",job->fd );
fromXML(job->key,job->value,job->messagetype,job->message,buffer); fromXML(job->key,job->value,job->messagetype,job->message,buffer);
if(strlen(job->key)>256){
//Adding to the chain in Job Queue char resp[300000];
pthread_mutex_lock(&job_q_lock); toXML(resp,"","","resp","Oversized key");
if(job_head==NULL){ write(job->fd,resp,strlen(resp));
job_head=job; continue;
job_tail=job;
} }
else{ else if(strlen(job->key)==0){
job_tail->next=job; char resp[300000];
job_tail=job; toXML(resp,"","","resp","Undersized key");
write(job->fd,resp,strlen(resp));
continue;
} }
pthread_mutex_unlock(&job_q_lock); if(strlen(job->value)>(256*1024)){
process_work(job); char resp[300000];
toXML(resp,"","","resp","Oversized Value");
write(job->fd,resp,strlen(resp));
continue;
} }
if(!(!strcmp(job->messagetype,"getreq") || !strcmp(job->messagetype,"putreq") || !strcmp(job->messagetype,"delreq"))){
char resp[300000];
toXML(resp,"","","resp","Wrong Command");
write(job->fd,resp,strlen(resp));
continue;
} }
} //Adding to the chain in Job Queue
pthread_mutex_lock(&job_q_lock);
printf("New Request Arrived by Client No. %d\n",job->fd);
/**********process_work****************** if (tpool->head == NULL) {
Function to process query in job_queue tpool->head = job;
and then writing to the client tpool->tail = job;
****************************************/ } else {
void process_work(jobs_queue *job){ tpool->tail->next = job;
//printf("executing job\n"); tpool->tail = job;
char resp[300000]; }
if(!strcmp(job->messagetype,"putreq")){
if(postInCache(t,repl,job->key,job->value)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","IOError");
}
}
else if(!strcmp(job->messagetype,"getreq")){
if(getInCache(t,repl,job->key,job->value)){
toXML(resp,job->key,job->value,"resp","");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
else if(!strcmp(job->messagetype,"delreq")){
if(delInCache(t,repl,job->key)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
//printf("%s\n",resp);
write(job->fd,resp,strlen(resp));
pthread_cond_broadcast(&(tpool->work_cond));
//printf("job_added:unlock\n");
pthread_cond_signal(&tpool->work_cond);
pthread_mutex_unlock(&job_q_lock);
//process_work(job);
}
}
} }
/* A simple server in the internet domain using TCP
The port number is passed as an argument
This version runs forever, forking off a separate
process for each connection
gcc server2.c -lsocket
*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include<stdlib.h>
#include<unistd.h>
#include<strings.h>
#include<string.h>
#include<pthread.h>
#include "toxml.h"
#include "KVCache.c"
/*JOB QUEUE POINTER*/
jobs_queue *job_head=NULL;
jobs_queue *job_tail=NULL;
/*CACHE POINTERS*/
kvcache_data **t;
int *repl;
/*LOCK FOR INSERTING IN JOB QUEUE*/
pthread_mutex_t job_q_lock;
/* FUNCTION PROTOTYPE OF PROCESSING QUERY*/
void process_work(jobs_queue*);
/* FUNCION PROTOTYPE FOR SERVING THE CLIENT */
void *serve(void*);
/* FUNCTION FOR HANDLING ERRORS*/
void error(char *msg)
{
perror(msg);
exit(1);
}
int main(int argc, char *argv[])
{
int sockfd, newsockfd, portno, clilen, pid;
struct sockaddr_in serv_addr, cli_addr;
t= init_cache();
repl=rep_line_per_set();
pthread_mutex_init(&job_q_lock,NULL);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
error("ERROR opening socket");
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(8080);
if (bind(sockfd, (struct sockaddr *) &serv_addr,
sizeof(serv_addr)) < 0)
error("ERROR on binding");
listen(sockfd,5);
clilen = sizeof(cli_addr);
while (1) {
newsockfd = accept(sockfd,
(struct sockaddr *) &cli_addr, &clilen);
pthread_t thr;
int *arg;
arg=malloc(sizeof(*arg));
*arg=newsockfd;
pthread_create(&thr,NULL,serve,arg);
} /* end of while */
return 0; /* we never get here */
}
/******** serve() *********************
There is a separate instance of this function
for each connection. It handles all communication
once a connnection has been established.
*****************************************/
void * serve (void* socket)
{
int sock;
int n;
long tid=pthread_self();
sock=*((int*) socket);
char buffer[300000];
while(1){
//zeroing all the value in buffer
bzero(buffer,300000);
//reading the socket
n = read(sock,buffer,300000);
if (n < 0) error("ERROR reading from socket");
else if (n==0) {
printf("Client has been closed,Killing the thread\n");
pthread_cancel(tid);
}
else{
//Creating a structure to add in linked list of job queue
jobs_queue *job=(jobs_queue *)malloc((sizeof(jobs_queue)));
job->fd=sock;
job->next=NULL;
fromXML(job->key,job->value,job->messagetype,job->message,buffer);
//Adding to the chain in Job Queue
pthread_mutex_lock(&job_q_lock);
if(job_head==NULL){
job_head=job;
job_tail=job;
}
else{
job_tail->next=job;
job_tail=job;
}
pthread_mutex_unlock(&job_q_lock);
process_work(job);
}
}
}
/**********process_work******************
Function to process query in job_queue
****************************************/
void process_work(jobs_queue *job){
//printf("executing job\n");
char resp[300000];
if(!strcmp(job->messagetype,"putreq")){
if(postInCache(t,repl,job->key,job->value)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","IOError");
}
}
else if(!strcmp(job->messagetype,"getreq")){
if(getInCache(t,repl,job->key,job->value)){
toXML(resp,job->key,job->value,"resp","");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
else if(!strcmp(job->messagetype,"delreq")){
if(delInCache(t,repl,job->key)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
//printf("%s\n",resp);
write(job->fd,resp,strlen(resp));
}
File deleted
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include "threadpool.c"
static const size_t num_threads = 4;
static const size_t num_items = 100;
void worker(void *arg)
{
int *val = arg;
int old = *val;
*val += 1000;
printf("tid=%ld, old=%d, val=%d\n", pthread_self(), old, *val);
if (*val%2)
usleep(100000);
}
int main(int argc, char **argv)
{
tpool_t *tm;
int *vals;
size_t i;
tm = tpool_create(num_threads);
vals = calloc(num_items, sizeof(*vals));
for (i=0; i<num_items; i++) {
vals[i] = i;
tpool_add_work(tm, worker, vals+i);
}
tpool_wait(tm);
for (i=0; i<num_items; i++) {
printf("%d\n", vals[i]);
}
free(vals);
tpool_destroy(tm);
return 0;
}
#include <stdbool.h>
#include<pthread.h>
#include <stddef.h>
#include<stdlib.h>
struct tpool;
typedef struct tpool tpool_t;
typedef void (*thread_func_t)(void *arg);
tpool_t *tpool_create(size_t num);
void tpool_destroy(tpool_t *tm);
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg);
void tpool_wait(tpool_t *tm);
struct tpool_work {
thread_func_t func;
void *arg;
struct tpool_work *next;
};
typedef struct tpool_work tpool_work_t;
struct tpool {
tpool_work_t *work_first;
tpool_work_t *work_last;
pthread_mutex_t work_mutex;
pthread_cond_t work_cond;
pthread_cond_t working_cond;
size_t working_cnt;
size_t thread_cnt;
bool stop;
};
static void *tpool_worker(void *arg)
{
tpool_t *tm = arg;
tpool_work_t *work;
while (1) {
pthread_mutex_lock(&(tm->work_mutex));
if (tm->stop)
break;
if (tm->work_first == NULL)
pthread_cond_wait(&(tm->work_cond), &(tm->work_mutex));
work = tpool_work_get(tm);
tm->working_cnt++;
pthread_mutex_unlock(&(tm->work_mutex));
if (work != NULL) {
work->func(work->arg);
tpool_work_destroy(work);
}
pthread_mutex_lock(&(tm->work_mutex));
tm->working_cnt--;
if (!tp->stop && tm->working_cnt == 0 && tm->work_first == NULL)
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
}
tm->thread_cnt--;
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return NULL;
}
tpool_t *tpool_create(size_t num)
{
tpool_t *tm;
pthread_t thread;
size_t i;
if (num == 0)
num = 2;
tm = calloc(1, sizeof(*tm));
tm->thread_cnt = num;
pthread_mutex_init(&(tm->work_mutex), NULL);
pthread_cond_init(&(tm->work_cond), NULL);
pthread_cond_init(&(tm->working_cond), NULL);
tm->work_first = NULL;
tm->work_last = NULL;
for (i=0; i<num; i++) {
pthread_create(&thread, NULL, tpool_worker, tm);
pthread_detach(thread);
}
return tm;
}
static tpool_work_t *tpool_work_create(thread_func_t func, void *arg)
{
tpool_work_t *work;
if (func == NULL)
return NULL;
work = malloc(sizeof(*work));
work->func = func;
work->arg = arg;
work->next = NULL;
return work;
}
static void tpool_work_destroy(tpool_work_t *work)
{
if (work == NULL)
return;
free(work);
}
static tpool_work_t *tpool_work_get(tpool_t *tm)
{
tpool_work_t *work;
if (tm == NULL)
return NULL;
work = tm->work_first;
if (work == NULL)
return NULL;
if (work->next == NULL) {
tm->work_first = NULL;
tm->work_last = NULL;
} else {
tm->work_first = work->next;
}
return work;
}
void tpool_destroy(tpool_t *tm)
{
tpool_work_t *work;
tpool_work_t *work2;
if (tm == NULL)
return;
pthread_mutex_lock(&(tm->work_mutex));
work = tm->work_first;
while (work != NULL) {
work2 = work->next;
tpool_work_destroy(work);
work = work2;
}
tm->stop = true;
pthread_cond_broadcast(&(tm->work_cond));
pthread_mutex_unlock(&(tm->work_mutex));
tpool_wait(tm);
pthread_mutex_destroy(&(tm->work_mutex));
pthread_cond_destroy(&(tm->work_cond));
pthread_cond_destroy(&(tm->working_cond));
free(tm);
}
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg)
{
tpool_work_t *work;
if (tm == NULL)
return false;
work = tpool_work_create(func, arg);
if (work == NULL)
return false;
pthread_mutex_lock(&(tm->work_mutex));
if (tm->work_first == NULL) {
tm->work_first = work;
tm->work_last = tm->work_first;
} else {
tm->work_last->next = work;
tm->work_last = work;
}
pthread_cond_broadcast(&(tm->work_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return true;
}
void tpool_wait(tpool_t *tp)
{
if (tp == NULL)
return;
pthread_mutex_lock(&(tp->work_mutex));
while (1) {
if ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0)) {
pthread_cond_wait(&(tp->working_cond), &(tp->work_mutex));
} else {
break;
}
}
pthread_mutex_unlock(&(tp->work_mutex));
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment