Commit 21b1d907 authored by SHAILESH KUMAR's avatar SHAILESH KUMAR

85% done

parent 80827058
//#include "jobs.h"
#include "KVStore.c"
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
long int getSetId(char * key){
return strlen(key);
}
kvcache_data** init_cache(){
kvcache_data **t = (kvcache_data **)malloc(256 * sizeof(kvcache_data *));
for (int i = 0; i < 256; ++i){
t[i] = (kvcache_data *)malloc( sizeof(kvcache_data)*10);
for(int j=0;j<10;j++){
t[i][j].chance=-1;
}
}
return t;
}
int* rep_line_per_set(){
int* x=(int *)calloc(sizeof(int),256);
for(int i=0;i<256;i++){
x[i]=0;
}
return x;
}
int getInCache(kvcache_data** t,int* repl,char *key,char *value){
int ln=getSetId(key)-1;
for(int i=0;i<10;i++){
if(t[ln][i].chance!=-1)
{
if(!strcmp(t[ln][i].key,key)){
t[ln][i].chance=1;
strcpy(value,t[ln][i].value);
return 1;
}
}
else{
break;
}
}
int temp=repl[ln];
while(t[ln][temp].chance==1){
t[ln][temp].chance=0;
repl[ln]=(repl[ln]+1)%10;
temp=repl[ln];
}
if(!restoreFromFile(key,value)){
return 0;
}
else{
strcpy(t[ln][temp].key,key);
strcpy(t[ln][temp].value,value);
repl[ln]=(repl[ln]+1)%10;
t[ln][temp].chance=0;
return 1;
}
}
int postInCache(kvcache_data** t,int* repl,char *key,char *value){
int ln=getSetId(key)-1;
for(int i=0;i<10;i++){
if(t[ln][i].chance!=-1)
{
if(!strcmp(t[ln][i].key,key)){
t[ln][i].chance=1;
strcpy(t[ln][i].value,value);
return 1;
}
}
else{
break;
}
}
int temp=repl[ln];
while(t[ln][temp].chance==1){
t[ln][temp].chance=0;
repl[ln]=(repl[ln]+1)%10;
temp=repl[ln];
}
//printf("%d\n",temp);
strcpy(t[ln][temp].key,key);
strcpy(t[ln][temp].value,value);
repl[ln]=(repl[ln]+1)%10;
t[ln][temp].chance=0;
if(dumpToFile(&t[ln][temp])){
return 1;
}
else{
return 0;
}
return 1;
}
int delInCache(kvcache_data** t,int* repl,char *key){
int flag=0;
int ln=getSetId(key)-1;
for(int i=0;i<10;i++){
if(t[ln][i].chance!=-1)
{
if(!strcmp(t[ln][i].key,key)){
t[ln][i].chance=0;
strcpy(t[ln][i].key,"");
strcpy(t[ln][i].value,"");
}
}
else{
break;
}
}
if(deleteFromFile(key)){
return 1;
}
else{
return 0;
}
}
/*
int main(){
kvcache_data **t= init_cache();
int *repl=rep_line_per_set();
char value[1232];
printf("%d\n",postInCache(t,repl,"x","xenon"));
printf("%d\n",postInCache(t,repl,"y","yak"));
printf("%d\n",getInCache(t,repl,"y",value));
printf("%d\n",delInCache(t,repl,"y"));
printf("%d\n",getInCache(t,repl,"z",value));
for(int i =0;i<10;i++){
printf("%d %d %s %s\n",i,t[1-1][i].chance,t[1-1][i].key,t[1-1 ][i].value);
}
}
*/
//#include "jobs.h"
#include "KVStore.c"
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
long int getSetId(char * key){
return strlen(key);
}
kvcache_data** init_cache(){
kvcache_data **t = (kvcache_data **)malloc(256 * sizeof(kvcache_data *));
for (int i = 0; i < 256; ++i){
t[i] = (kvcache_data *)malloc( sizeof(kvcache_data)*10);
for(int j=0;j<10;j++){
t[i][j].chance=-1;
}
}
return t;
}
int* rep_line_per_set(){
int* x=(int *)calloc(sizeof(int),256);
for(int i=0;i<256;i++){
x[i]=0;
}
return x;
}
int getInCache(kvcache_data** t,int* repl,char *key,char *value){
int ln=getSetId(key)-1;
for(int i=0;i<10;i++){
if(t[ln][i].chance!=-1)
{
if(!strcmp(t[ln][i].key,key)){
t[ln][i].chance=1;
strcpy(value,t[ln][i].value);
return 1;
}
}
else{
break;
}
}
int temp=repl[ln];
while(t[ln][temp].chance==1){
t[ln][temp].chance=0;
repl[ln]=(repl[ln]+1)%10;
temp=repl[ln];
}
if(!restoreFromFile(key,value)){
return 0;
}
else{
strcpy(t[ln][temp].key,key);
strcpy(t[ln][temp].value,value);
repl[ln]=(repl[ln]+1)%10;
t[ln][temp].chance=0;
return 1;
}
}
int postInCache(kvcache_data** t,int* repl,char *key,char *value){
int ln=getSetId(key)-1;
for(int i=0;i<10;i++){
if(t[ln][i].chance!=-1)
{
if(!strcmp(t[ln][i].key,key)){
t[ln][i].chance=1;
strcpy(t[ln][i].value,value);
return 1;
}
}
else{
break;
}
}
int temp=repl[ln];
while(t[ln][temp].chance==1){
t[ln][temp].chance=0;
repl[ln]=(repl[ln]+1)%10;
temp=repl[ln];
}
//printf("%d\n",temp);
strcpy(t[ln][temp].key,key);
strcpy(t[ln][temp].value,value);
repl[ln]=(repl[ln]+1)%10;
t[ln][temp].chance=0;
if(dumpToFile(&t[ln][temp])){
return 1;
}
else{
return 0;
}
return 1;
}
int delInCache(kvcache_data** t,int* repl,char *key){
int flag=0;
int ln=getSetId(key)-1;
for(int i=0;i<10;i++){
if(t[ln][i].chance!=-1)
{
if(!strcmp(t[ln][i].key,key)){
t[ln][i].chance=0;
strcpy(t[ln][i].key,"");
strcpy(t[ln][i].value,"");
}
}
else{
break;
}
}
if(deleteFromFile(key)){
return 1;
}
else{
return 0;
}
}
/*
int main(){
kvcache_data **t= init_cache();
int *repl=rep_line_per_set();
char value[1232];
printf("%d\n",postInCache(t,repl,"x","xenon"));
printf("%d\n",postInCache(t,repl,"y","yak"));
printf("%d\n",getInCache(t,repl,"y",value));
printf("%d\n",delInCache(t,repl,"y"));
printf("%d\n",getInCache(t,repl,"z",value));
for(int i =0;i<10;i++){
printf("%d %d %s %s\n",i,t[1-1][i].chance,t[1-1][i].key,t[1-1 ][i].value);
}
}
*/
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include "jobs.h"
void getFileName(char * fname,long int x){
char id[5];
sprintf(id,"%ld",x);
strcpy(fname,"data/kv_");
strcat(fname,id);
strcat(fname,".bin");
}
void getTempFileName(char * fname,long int x){
char id[5];
sprintf(id,"%ld",x);
strcpy(fname,"data/temp_");
strcat(fname,id);
strcat(fname,".bin");
}
int deleteFromFile(char *key){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
char fname[17],temp[17];
int flag=0;
getFileName(fname,strlen(key));
getTempFileName(temp,strlen(key));
FILE *fp;
if( access( fname, F_OK ) != -1 ){
fp=fopen(fname,"rb");
}
else{
free(store);
return 0;
}
FILE *tempfp=fopen(temp,"wb");
if(fp==NULL || tempfp==NULL){
printf("ERROR:error opening file\n");
}
while(fread(store,sizeof(kvstore_data),1,fp)){
if(strcmp(store->key,key)){
fwrite(store,sizeof(kvstore_data),1,tempfp);
}
else{
flag=1;
}
}
/*fread(store,sizeof(kvstore_data),1,fp);
if(strcmp(store->key,key))
fwrite(store,sizeof(kvstore_data),1,tempfp);
*/
fclose(fp);
fclose(tempfp);
remove(fname);
rename(temp,fname);
return flag;
}
int dumpToFile(kvcache_data *cache){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
strcpy(store->key,cache->key);
strcpy(store->value,cache->value);
char fname[17];
getFileName(fname,strlen(cache->key));
deleteFromFile(cache->key);
FILE *fp=fopen(fname,"ab");
if(fp==NULL){
printf("ERROR:error opening file\n");
return 0;
}
if(!fwrite(store,sizeof(kvstore_data),1,fp)){
printf("ERROR:Error writing file.\n");
free(store);
fclose(fp);
return 0;
}
else{
free(store);
fclose(fp);
return 1;
}
return 1;
}
int restoreFromFile(char *key,char *value){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
char fname[17];
FILE *fp;
getFileName(fname,strlen(key));
if( access( fname, F_OK ) != -1 ){
fp=fopen(fname,"rb");
}
else{
free(store);
return 0;
}
if(fp==NULL){
printf("ERROR:error opening file\n");
}
while(fread(store,sizeof(kvstore_data),1,fp)){
if(!strcmp(key,store->key)){
strcpy(value,store->value);
free(store);
fclose(fp);
return 1;
}
}
return 0;
}
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include "jobs.h"
void getFileName(char * fname,long int x){
char id[5];
sprintf(id,"%ld",x);
strcpy(fname,"data/kv_");
strcat(fname,id);
strcat(fname,".bin");
}
void getTempFileName(char * fname,long int x){
char id[5];
sprintf(id,"%ld",x);
strcpy(fname,"data/temp_");
strcat(fname,id);
strcat(fname,".bin");
}
int deleteFromFile(char *key){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
char fname[17],temp[17];
int flag=0;
getFileName(fname,strlen(key));
getTempFileName(temp,strlen(key));
FILE *fp;
if( access( fname, F_OK ) != -1 ){
fp=fopen(fname,"rb");
}
else{
free(store);
return 0;
}
FILE *tempfp=fopen(temp,"wb");
if(fp==NULL || tempfp==NULL){
printf("ERROR:error opening file\n");
}
while(fread(store,sizeof(kvstore_data),1,fp)){
if(strcmp(store->key,key)){
printf("%s\n",store->key);
fwrite(store,sizeof(kvstore_data),1,tempfp);
flag=1;
}
}
/*fread(store,sizeof(kvstore_data),1,fp);
if(strcmp(store->key,key))
fwrite(store,sizeof(kvstore_data),1,tempfp);
*/
fclose(fp);
fclose(tempfp);
remove(fname);
rename(temp,fname);
return flag;
}
int dumpToFile(kvcache_data *cache){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
strcpy(store->key,cache->key);
strcpy(store->value,cache->value);
char fname[17];
getFileName(fname,strlen(cache->key));
deleteFromFile(cache->key);
FILE *fp=fopen(fname,"ab");
if(fp==NULL){
printf("ERROR:error opening file\n");
return 0;
}
if(!fwrite(store,sizeof(kvstore_data),1,fp)){
printf("ERROR:Error writing file.\n");
free(store);
fclose(fp);
return 0;
}
else{
free(store);
fclose(fp);
return 1;
}
return 1;
}
int restoreFromFile(char *key,char *value){
kvstore_data *store=(kvstore_data *)malloc(sizeof(kvstore_data));
char fname[17];
FILE *fp;
getFileName(fname,strlen(key));
if( access( fname, F_OK ) != -1 ){
fp=fopen(fname,"rb");
}
else{
free(store);
return 0;
}
if(fp==NULL){
printf("ERROR:error opening file\n");
}
while(fread(store,sizeof(kvstore_data),1,fp)){
if(!strcmp(key,store->key)){
strcpy(value,store->value);
free(store);
fclose(fp);
return 1;
}
}
return 0;
}
File added
File added
No preview for this file type
......@@ -8,7 +8,7 @@
#include <netinet/in.h>
#include <netdb.h>
#include "toxml.h"
void sendXML(int sockfd,char *msg){
void sendXML(FILE *fr,int sockfd,char *msg){
char cmd[4];
char key[257];
char value[256*1024+1];
......@@ -55,24 +55,24 @@ void sendXML(int sockfd,char *msg){
//printf("%s",key);
}
else{
printf("ERROR: Unknown operation.\n");
fprintf(fr,"Unknown Error: Invalid Command.\n");
return;
}
if(strlen(key)>256){
printf("ERROR: Key cannot exceed 256B\n");
fprintf(fr,"Oversized key\n");
return;
}
if(strlen(key)==0){
printf("ERROR: Key cannot be NULL\n");
fprintf(fr,"ERROR: Key cannot be NULL\n");
return;
}
if(!strcmp(cmd,"PUT") && strlen(value)==0){
printf("ERROR: Value cannot be NULL\n");
fprintf(fr,"ERROR: Value cannot be NULL\n");
return;
}
if(!strcmp(cmd,"PUT") && strlen(value)>(256*1024)){
printf("ERROR: Value cannot exceed 256KB\n");
fprintf(fr,"Oversized value\n");
return;
}
......@@ -81,9 +81,13 @@ void sendXML(int sockfd,char *msg){
toXML(encodedXML,key,value,msgtype,"");
write(sockfd,strcat(encodedXML,delim),strlen(encodedXML));
char buf[256];
read(sockfd,buf,256);
char buf[300000],message[100];
read(sockfd,buf,300000);
fromXML(key,value,msgtype,message,buf);
if(strlen(message)!=0){fprintf(fr,"%s\n",message);}
else{
fprintf(fr,"%s %s\n",key,value);
}
return;
......@@ -97,7 +101,7 @@ void error(char *msg)
int main(int argc, char *argv[])
{
int sockfd, portno, n,no_of_requests=0;
int sockfd, portno, n ;
struct sockaddr_in serv_addr;
struct hostent *server;
......@@ -122,23 +126,21 @@ int main(int argc, char *argv[])
if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0)
error("ERROR connecting");
FILE *fp;
FILE *fp,*fr;
char* line=NULL;
size_t len=0;
fp=fopen("client_req.txt","r");
if(fp==NULL){
fr=fopen("client_resp.txt","a+");
if(fp==NULL || fr==NULL){
exit(1);
}
while((getline(&line,&len,fp))!=-1){
//write(sockfd,line,read);
no_of_requests++;
sendXML(sockfd,line);
sendXML(fr,sockfd,line);
sleep(1);
}
fclose(fp);
while(1);
//exit(0);
......
......@@ -8,7 +8,7 @@
#include <netinet/in.h>
#include <netdb.h>
#include "toxml.h"
void sendXML(int sockfd,char *msg){
void sendXML(FILE *fr,int sockfd,char *msg){
char cmd[4];
char key[257];
char value[256*1024+1];
......@@ -55,24 +55,24 @@ void sendXML(int sockfd,char *msg){
//printf("%s",key);
}
else{
printf("ERROR: Unknown operation.\n");
fprintf(fr,"Unknown Error: Invalid Command.\n");
return;
}
if(strlen(key)>256){
printf("ERROR: Key cannot exceed 256B");
fprintf(fr,"Oversized key\n");
return;
}
if(strlen(key)==0){
printf("ERROR: Key cannot be NULL");
fprintf(fr,"ERROR: Key cannot be NULL\n");
return;
}
if(!strcmp(cmd,"PUT") && strlen(value)==0){
printf("ERROR: Value cannot be NULL");
fprintf(fr,"ERROR: Value cannot be NULL\n");
return;
}
if(!strcmp(cmd,"PUT") && strlen(value)>(256*1024)){
printf("ERROR: Value cannot exceed 256KB");
fprintf(fr,"Oversized value\n");
return;
}
......@@ -81,9 +81,13 @@ void sendXML(int sockfd,char *msg){
toXML(encodedXML,key,value,msgtype,"");
write(sockfd,strcat(encodedXML,delim),strlen(encodedXML));
char buf[256];
read(sockfd,buf,256);
char buf[300000],message[100];
read(sockfd,buf,300000);
fromXML(key,value,msgtype,message,buf);
if(strlen(message)!=0){fprintf(fr,"%s\n",message);}
else{
fprintf(fr,"%s %s\n",key,value);
}
return;
......@@ -97,7 +101,7 @@ void error(char *msg)
int main(int argc, char *argv[])
{
int sockfd, portno, n,no_of_requests=0;
int sockfd, portno, n;
struct sockaddr_in serv_addr;
struct hostent *server;
......@@ -122,23 +126,21 @@ int main(int argc, char *argv[])
if (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0)
error("ERROR connecting");
FILE *fp;
FILE *fp,*fr;
char* line=NULL;
size_t len=0;
fp=fopen("client_req.txt","r");
if(fp==NULL){
fr=fopen("client_resp.txt","a+");
if(fp==NULL || fr==NULL){
exit(1);
}
while((getline(&line,&len,fp))!=-1){
//write(sockfd,line,read);
no_of_requests++;
sendXML(sockfd,line);
sendXML(fr,sockfd,line);
sleep(1);
}
fclose(fp);
while(1);
//exit(0);
......
......@@ -4,4 +4,6 @@ DEL hello
GET hello
GET
PUT aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaapaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa bjjbj
PUT xyz abc
PUT xyx bgc
GET xyz
......@@ -3,4 +3,6 @@ GET hello
DEL hello
GET hello
GET
PUT aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa bjjbj
PUT aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaapaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa bjjbj
PUT xyz abc
PUT xyx bgc
Success
hello world
Success
Does not exist
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
Success
hello world
Success
hello world
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
Success
hello world
Success
Does not exist
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
Success
hello world
Success
hello world
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
Success
hello world
Success
Does not exist
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
Success
hello world
Success
hello world
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
Success
hello world
Success
Does not exist
ERROR: Key cannot be NULL
Oversized key
Success
Success
xyz abc
File added
typedef struct jobs_queue jobs_queue;
struct jobs_queue{
int fd;
char xml[300000];
char key[260];
char value[266*1024];
char message[1024];
char messagetype[10];
jobs_queue * next;
};
typedef struct kvcache_data kvcache_data;
struct kvcache_data{
int chance;
char key[300];
char value[256*1024+10];
};
typedef struct kvstore_data kvstore_data;
struct kvstore_data{
char key[300];
char value[256*1024+10];
};
No preview for this file type
typedef struct jobs_queue jobs_queue;
struct jobs_queue{
int fd;
char xml[300000];
jobs_queue * next;
};
typedef struct kvcache_data kvcache_data;
struct kvcache_data{
int chance;
char key[300];
char value[256*1024+10];
};
typedef struct kvstore_data kvstore_data;
struct kvstore_data{
char key[300];
char value[256*1024+10];
};
No preview for this file type
......@@ -14,14 +14,26 @@
#include<string.h>
#include<pthread.h>
#include "toxml.h"
#include "jobs.h"
#include "KVCache.c"
/*JOB QUEUE POINTER*/
jobs_queue *job_head=NULL;
jobs_queue *job_tail=NULL;
/*CACHE POINTERS*/
kvcache_data **t;
int *repl;
/*LOCK FOR INSERTING IN JOB QUEUE*/
pthread_mutex_t job_q_lock;
void *serve(void*); /* function prototype */
/* FUNCTION PROTOTYPE OF PROCESSING QUERY*/
void process_work(jobs_queue*);
/* FUNCION PROTOTYPE FOR SERVING THE CLIENT */
void *serve(void*);
/* FUNCTION FOR HANDLING ERRORS*/
void error(char *msg)
{
perror(msg);
......@@ -32,6 +44,8 @@ int main(int argc, char *argv[])
{
int sockfd, newsockfd, portno, clilen, pid;
struct sockaddr_in serv_addr, cli_addr;
t= init_cache();
repl=rep_line_per_set();
pthread_mutex_init(&job_q_lock,NULL);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
......@@ -48,17 +62,7 @@ int main(int argc, char *argv[])
while (1) {
newsockfd = accept(sockfd,
(struct sockaddr *) &cli_addr, &clilen);
/*if (newsockfd < 0)
error("ERROR on accept");
pid = fork();
if (pid < 0)
error("ERROR on fork");
if (pid == 0) {
close(sockfd);
serve(newsockfd);
exit(0);
}
else close(newsockfd);*/
pthread_t thr;
int *arg;
arg=malloc(sizeof(*arg));
......@@ -98,7 +102,8 @@ void * serve (void* socket)
jobs_queue *job=(jobs_queue *)malloc((sizeof(jobs_queue)));
job->fd=sock;
job->next=NULL;
strcpy(job->xml,buffer);
fromXML(job->key,job->value,job->messagetype,job->message,buffer);
//Adding to the chain in Job Queue
pthread_mutex_lock(&job_q_lock);
......@@ -111,18 +116,46 @@ void * serve (void* socket)
job_tail=job;
}
pthread_mutex_unlock(&job_q_lock);
//Printing the linked list
jobs_queue *p=job_head;
while(p!=NULL){
printf("%d ",p->fd);
p=p->next;
process_work(job);
}
printf("\n");
}
}
/**********process_work******************
Function to process query in job_queue
and then writing to the client
****************************************/
void process_work(jobs_queue *job){
//printf("executing job\n");
char resp[300000];
if(!strcmp(job->messagetype,"putreq")){
if(postInCache(t,repl,job->key,job->value)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","IOError");
//Acknowledging the client to send next data
n = write(sock,"ACK\n",4);
if (n < 0) error("ERROR writing to socket");
}
}
else if(!strcmp(job->messagetype,"getreq")){
if(getInCache(t,repl,job->key,job->value)){
toXML(resp,job->key,job->value,"resp","");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
else if(!strcmp(job->messagetype,"delreq")){
if(delInCache(t,repl,job->key)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
//printf("%s\n",resp);
write(job->fd,resp,strlen(resp));
}
......@@ -14,14 +14,26 @@
#include<string.h>
#include<pthread.h>
#include "toxml.h"
#include "jobs.h"
#include "KVCache.c"
/*JOB QUEUE POINTER*/
jobs_queue *job_head=NULL;
jobs_queue *job_tail=NULL;
/*CACHE POINTERS*/
kvcache_data **t;
int *repl;
/*LOCK FOR INSERTING IN JOB QUEUE*/
pthread_mutex_t job_q_lock;
void *serve(void*); /* function prototype */
/* FUNCTION PROTOTYPE OF PROCESSING QUERY*/
void process_work(jobs_queue*);
/* FUNCION PROTOTYPE FOR SERVING THE CLIENT */
void *serve(void*);
/* FUNCTION FOR HANDLING ERRORS*/
void error(char *msg)
{
perror(msg);
......@@ -32,6 +44,8 @@ int main(int argc, char *argv[])
{
int sockfd, newsockfd, portno, clilen, pid;
struct sockaddr_in serv_addr, cli_addr;
t= init_cache();
repl=rep_line_per_set();
pthread_mutex_init(&job_q_lock,NULL);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
......@@ -48,17 +62,7 @@ int main(int argc, char *argv[])
while (1) {
newsockfd = accept(sockfd,
(struct sockaddr *) &cli_addr, &clilen);
/*if (newsockfd < 0)
error("ERROR on accept");
pid = fork();
if (pid < 0)
error("ERROR on fork");
if (pid == 0) {
close(sockfd);
serve(newsockfd);
exit(0);
}
else close(newsockfd);*/
pthread_t thr;
int *arg;
arg=malloc(sizeof(*arg));
......@@ -98,7 +102,8 @@ void * serve (void* socket)
jobs_queue *job=(jobs_queue *)malloc((sizeof(jobs_queue)));
job->fd=sock;
job->next=NULL;
strcpy(job->xml,buffer);
fromXML(job->key,job->value,job->messagetype,job->message,buffer);
//Adding to the chain in Job Queue
pthread_mutex_lock(&job_q_lock);
......@@ -111,18 +116,45 @@ void * serve (void* socket)
job_tail=job;
}
pthread_mutex_unlock(&job_q_lock);
//Printing the linked list
/*jobs_queue *p=job_head;
while(p!=NULL){
printf("%d ",p->fd);
p=p->next;
process_work(job);
}
}
}
/**********process_work******************
Function to process query in job_queue
****************************************/
void process_work(jobs_queue *job){
//printf("executing job\n");
char resp[300000];
if(!strcmp(job->messagetype,"putreq")){
if(postInCache(t,repl,job->key,job->value)){
toXML(resp,"","","resp","Success");
}
printf("\n");
*/
else{
toXML(resp,"","","resp","IOError");
//Acknowledging the client to send next data
n = write(sock,"ACK\n",4);
if (n < 0) error("ERROR writing to socket");
}
}
else if(!strcmp(job->messagetype,"getreq")){
if(getInCache(t,repl,job->key,job->value)){
toXML(resp,job->key,job->value,"resp","");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
else if(!strcmp(job->messagetype,"delreq")){
if(delInCache(t,repl,job->key)){
toXML(resp,"","","resp","Success");
}
else{
toXML(resp,"","","resp","Does not exist");
}
}
//printf("%s\n",resp);
write(job->fd,resp,strlen(resp));
}
File added
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include "threadpool.c"
static const size_t num_threads = 4;
static const size_t num_items = 100;
void worker(void *arg)
{
int *val = arg;
int old = *val;
*val += 1000;
printf("tid=%ld, old=%d, val=%d\n", pthread_self(), old, *val);
if (*val%2)
usleep(100000);
}
int main(int argc, char **argv)
{
tpool_t *tm;
int *vals;
size_t i;
tm = tpool_create(num_threads);
vals = calloc(num_items, sizeof(*vals));
for (i=0; i<num_items; i++) {
vals[i] = i;
tpool_add_work(tm, worker, vals+i);
}
tpool_wait(tm);
for (i=0; i<num_items; i++) {
printf("%d\n", vals[i]);
}
free(vals);
tpool_destroy(tm);
return 0;
}
#include <stdbool.h>
#include<pthread.h>
#include <stddef.h>
#include<stdlib.h>
struct tpool;
typedef struct tpool tpool_t;
typedef void (*thread_func_t)(void *arg);
tpool_t *tpool_create(size_t num);
void tpool_destroy(tpool_t *tm);
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg);
void tpool_wait(tpool_t *tm);
struct tpool_work {
thread_func_t func;
void *arg;
struct tpool_work *next;
};
typedef struct tpool_work tpool_work_t;
struct tpool {
tpool_work_t *work_first;
tpool_work_t *work_last;
pthread_mutex_t work_mutex;
pthread_cond_t work_cond;
pthread_cond_t working_cond;
size_t working_cnt;
size_t thread_cnt;
bool stop;
};
static void *tpool_worker(void *arg)
{
tpool_t *tm = arg;
tpool_work_t *work;
while (1) {
pthread_mutex_lock(&(tm->work_mutex));
if (tm->stop)
break;
if (tm->work_first == NULL)
pthread_cond_wait(&(tm->work_cond), &(tm->work_mutex));
work = tpool_work_get(tm);
tm->working_cnt++;
pthread_mutex_unlock(&(tm->work_mutex));
if (work != NULL) {
work->func(work->arg);
tpool_work_destroy(work);
}
pthread_mutex_lock(&(tm->work_mutex));
tm->working_cnt--;
if (!tp->stop && tm->working_cnt == 0 && tm->work_first == NULL)
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
}
tm->thread_cnt--;
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return NULL;
}
tpool_t *tpool_create(size_t num)
{
tpool_t *tm;
pthread_t thread;
size_t i;
if (num == 0)
num = 2;
tm = calloc(1, sizeof(*tm));
tm->thread_cnt = num;
pthread_mutex_init(&(tm->work_mutex), NULL);
pthread_cond_init(&(tm->work_cond), NULL);
pthread_cond_init(&(tm->working_cond), NULL);
tm->work_first = NULL;
tm->work_last = NULL;
for (i=0; i<num; i++) {
pthread_create(&thread, NULL, tpool_worker, tm);
pthread_detach(thread);
}
return tm;
}
static tpool_work_t *tpool_work_create(thread_func_t func, void *arg)
{
tpool_work_t *work;
if (func == NULL)
return NULL;
work = malloc(sizeof(*work));
work->func = func;
work->arg = arg;
work->next = NULL;
return work;
}
static void tpool_work_destroy(tpool_work_t *work)
{
if (work == NULL)
return;
free(work);
}
static tpool_work_t *tpool_work_get(tpool_t *tm)
{
tpool_work_t *work;
if (tm == NULL)
return NULL;
work = tm->work_first;
if (work == NULL)
return NULL;
if (work->next == NULL) {
tm->work_first = NULL;
tm->work_last = NULL;
} else {
tm->work_first = work->next;
}
return work;
}
void tpool_destroy(tpool_t *tm)
{
tpool_work_t *work;
tpool_work_t *work2;
if (tm == NULL)
return;
pthread_mutex_lock(&(tm->work_mutex));
work = tm->work_first;
while (work != NULL) {
work2 = work->next;
tpool_work_destroy(work);
work = work2;
}
tm->stop = true;
pthread_cond_broadcast(&(tm->work_cond));
pthread_mutex_unlock(&(tm->work_mutex));
tpool_wait(tm);
pthread_mutex_destroy(&(tm->work_mutex));
pthread_cond_destroy(&(tm->work_cond));
pthread_cond_destroy(&(tm->working_cond));
free(tm);
}
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg)
{
tpool_work_t *work;
if (tm == NULL)
return false;
work = tpool_work_create(func, arg);
if (work == NULL)
return false;
pthread_mutex_lock(&(tm->work_mutex));
if (tm->work_first == NULL) {
tm->work_first = work;
tm->work_last = tm->work_first;
} else {
tm->work_last->next = work;
tm->work_last = work;
}
pthread_cond_broadcast(&(tm->work_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return true;
}
void tpool_wait(tpool_t *tp)
{
if (tp == NULL)
return;
pthread_mutex_lock(&(tp->work_mutex));
while (1) {
if ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0)) {
pthread_cond_wait(&(tp->working_cond), &(tp->work_mutex));
} else {
break;
}
}
pthread_mutex_unlock(&(tp->work_mutex));
}
File added
#include<stdio.h>
#include<string.h>
void fromXML(char *key,char *value,char* messagetype, char *message,char *xml){
strcpy(key,"");
strcpy(value,"");
strcpy(message,"");
strcpy(messagetype,"");
int i;
char tag[50];
char data[300000];
strcpy(data,"");
for(i=0;i<strlen(xml);i++){
if(xml[i]=='<'){
strcpy(tag,"");
}
else if (xml[i]=='>'){
if(tag[0]!='/'){
//do nothing
}
}
else{
char a[2];
a[0]=xml[i];
a[1]='\0';
if(!strcmp(tag,"Key")){
strcat(key,a);
}
else if(!strcmp(tag,"Value")){
strcat(value,a);
}
else if(!strcmp(tag,"Message")){
strcat(message,a);
}
else if(!strcmp(tag,"type")){
if(strcmp(a,"\"") && strcmp(a,"=") )
strcat(messagetype,a);
}
else{
if(xml[i]==' '){
strcpy(tag,"");
}
else{
strcat(tag,a);
}
}
}
}
}
void toXML(char *resp,char key[],char value[],char messagetype[],char message[]){
strcpy(resp,"");
char temp[300000]="";
char *tempo="<?xml version=\"1.0\" encoding=\"UTF-8\"?><KVMessage type=\"";
strcat(temp,tempo);
strcat(temp,messagetype);
strcat(temp,"\">");
if(strcmp(key,"")){
strcat(temp,"<Key>");
strcat(temp,key);
strcat(temp,"</Key>");
if(strcmp(value,"")){
strcat(temp,"<Value>");
strcat(temp,value);
strcat(temp,"</Value>");
}
}
else{
strcat(temp,"<Message>");
strcat(temp,message);
strcat(temp,"</Message>");
}
strcat(temp,"</KVMessage>");
strcat(resp,temp);
}
/*
int main() {
//code
char x[]="hello";
char y[]="world";
char resp[300000];
toXML(resp,x,"","getreq","success");
//printf("%s\n",resp);
strcpy(resp,"");
toXML(resp,x,y,"putreq","success");
char ix[300];
char iy[300000];
char msg[10];
fromXML(ix,iy,msg,resp);
if(strcmp(ix,"")){
printf("key=%s\n",ix);
}
if(strcmp(iy,"")){
printf("value=%s\n",iy);
}
if(strcmp(msg,"")){
printf("msg=%s\n",msg);
}
/*toXML(x,"","delreq","success");
toXML(x,y,"resp","success");
toXML("",y,"resp","success");
toXML("",y,"resp","failure");
toXML(resp,"","","resp","success");
fromXML(ix,iy,msg,resp);
if(strcmp(ix,"")){
printf("key=%s\n",ix);
}
if(strcmp(iy,"")){
printf("value=%s\n",iy);
}
if(strcmp(msg,"")){
printf("msg=%s\n",msg);
}
toXML(resp,"","","resp","success");
fromXML(ix,iy,msg,resp);
if(strcmp(ix,"")){
printf("key=%s\n",ix);
}
if(strcmp(iy,"")){
printf("value=%s\n",iy);
}
if(strcmp(msg,"")){
printf("msg=%s\n",msg);
}
return 0;
}*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment