Commit d0bc0917 authored by Bhavesh Yadav's avatar Bhavesh Yadav

Added kv-server code

parent a753ec27
kvclient: kvclient.o parsexml.o
gcc -o kvclient kvclient.o parsexml.o
kvclient.o: kvclient.c parsexml.h
gcc -c -o kvclient.o kvclient.c
%.o: %.c
gcc -c -o $@ $^
run: kvclient
./kvclient batchRun.txt batchResp1.txt
clean:
rm -f *.o kvclient *.csv *.xml batchResp*.txt
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
File added
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include "parsexml.h"
#define PORT 8080
int main(int argc, char** argv)
{
int sock_fd, addrlen, msglen;
struct sockaddr_in clientAddr, *result;
char key[258],value[256*1024+2],buffer[257*1024],*message,*err;
char *operation = (char *)malloc(sizeof(char)*7);
if( (sock_fd = socket(AF_INET , SOCK_STREAM , 0)) == 0) { //Connection oriented Socket, choose protocol automatically (0)
printf("%s",toRespXML("Network Error: Could not create socket"));
exit(1);
}
clientAddr.sin_family = AF_INET;
clientAddr.sin_port = htons(PORT);
clientAddr.sin_addr.s_addr = INADDR_ANY; //inet_addr("localhost");// inet_aton("localhost",&(clientAddr.sin_addr));
memset(&(clientAddr.sin_zero), '\0', 8);
addrlen = sizeof(struct sockaddr_in);
if( connect(sock_fd, (struct sockaddr *)&clientAddr, addrlen)<0) {
printf("%s",toRespXML("Network Error: Could not connect"));
exit(1);
}
FILE *fp;
if (argc >= 2){
fp = fopen(argv[1], "r");
if (fp==NULL){
printf("ERROR:File doesn't exist\n");
return -1;
}
}
FILE *outFp;
if(argc>=3)
outFp = fopen(argv[2],"w+");
while(1){
// batch mode
if(argc >= 2) {
if(fscanf(fp,"%[^,],",operation)==EOF)
break;
if(!strcmp(operation,"put") || !strcmp(operation,"PUT"))
fscanf(fp,"%[^,],%[^\n]\n",key,value);
else
fscanf(fp,"%[^\n]\n",key);
// printf("%s,%s,%s\n",operation,key,value);
if(!strcmp(operation,"get") || !strcmp(operation,"GET"))
strcpy(operation,"getreq");
else if(!strcmp(operation,"put") || !strcmp(operation,"PUT"))
strcpy(operation,"putreq");
else if(!strcmp(operation,"del") || !strcmp(operation,"DEL"))
strcpy(operation,"delreq");
else {
printf("Invalid Choice, Please try again\n");
continue;
}
message = toXML(operation,key,value);
write(sock_fd, message, strlen(message));
msglen = read(sock_fd, buffer, 257*1024);
buffer[msglen] = '\0';
extractRespXml(outFp,buffer);
if(argc>=4)
if(!strcmp(argv[3],"-p"))
puts(buffer);
}
// Interactive mode
else {
printf("Enter operation\n GET\tPUT\tDEL\n");
scanf("%s",operation);
if(!strcmp(operation,"get") || !strcmp(operation,"GET")){
printf("Enter Key\n");
strcpy(operation,"getreq");
scanf("%s",key);
} else if(!strcmp(operation,"put") || !strcmp(operation,"PUT")){
printf("Enter Key and Value\n");
strcpy(operation,"putreq");
scanf("%s",key);
getchar();
scanf("%[^\n]s",value);
} else if(!strcmp(operation,"del") || !strcmp(operation,"DEL")){
printf("Enter Key\n");
strcpy(operation,"delreq");
scanf("%s",key);
} else {
printf("Invalid Choice, Please try again\n");
continue;
}
message = toXML(operation,key,value);
write(sock_fd, message, strlen(message));
msglen = read(sock_fd, buffer, 257*1024);
buffer[msglen] = '\0';
puts(buffer);
}
}
fclose(outFp);
return 0;
}
\ No newline at end of file
#include <stdio.h>
#include <string.h>
#include <strings.h>
#include <stdlib.h>
#include <stdbool.h>
#include<unistd.h>
#include "parsexml.h"
char *toRespXML(char* msg){
char *line1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVMessage type=\"resp\">\n<Message>";
char *line2 = "</Message>\n</KVMessage>\n";
int msglen = strlen(line1)+strlen(line2)+strlen(msg)+1;
char *message = (char *)malloc(sizeof(char)*msglen);
strcpy(message,line1);
strcat(message,msg);
strcat(message,line2);
// puts("message");
// puts(msg);
return message;
}
char * toXML(char *reqType, char *key, char * value){
char *line1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n";
char *line21 = "<KVMessage type=\"";
char *line22 = "\">\n";
char *line31 = "<Key>";
char *line32 = "</Key>\n";
char *line41 = "<Value>";
char *line42 = "</Value>\n";
char *line5 = "</KVMessage>\n";
int len2 = strlen(line21)+strlen(line22)+strlen(reqType)+1;
int len3 = strlen(line31)+strlen(line32)+strlen(key)+1;
int len4 = strlen(line41)+strlen(line42)+strlen(value)+1;
char *line2 = (char *)malloc(sizeof(char) * len2);
char *line3 = (char *)malloc(sizeof(char) * len3);
char *line4 = (char *)malloc(sizeof(char) * len4);
strcpy(line2,line21);
strcat(line2,reqType);
strcat(line2,line22);
strcpy(line3,line31);
strcat(line3,key);
strcat(line3,line32);
strcpy(line4,line41);
strcat(line4,value);
strcat(line4,line42);
int msglen = strlen(line1)+strlen(line2)+strlen(line3)+strlen(line4)+strlen(line5)+1;
char *message = (char *)malloc(sizeof(char) * msglen);
strcpy(message,line1);
strcat(message,line2);
strcat(message,line3);
if(!strcmp(reqType,"putreq") || !strcmp(reqType,"resp"))
strcat(message,line4);
strcat(message,line5);
free(line2);free(line3);free(line4);
return message;
}
char *extractRespXml(FILE *fp,char *buffer){
char *p,*q,*err;
char *key;
char *value;
p = strstr(buffer, "<Key>")+5;
q = strstr(buffer, "</Key>");
if(p!=NULL && q!=NULL){
key = malloc(sizeof(char)*(q-p+1));
memcpy(key,p,q-p);
key[q-p] = '\0';
p = strstr(buffer, "<Value>")+ 7;
q = strstr(buffer, "</Value>");
if(p!=NULL && q!=NULL){
value = malloc(sizeof(char)*(q-p+1));
memcpy(value,p,q-p);
value[q-p] = '\0';
}
fprintf(fp,"%s,%s\n",key,value);
free(key);
free(value);
}
else {
p = strstr(buffer, "<Message>")+ 9;
q = strstr(buffer, "</Message>");
char *message = (char *)malloc(sizeof(char)*170);
if(p!=NULL && q!=NULL){
memcpy(message,p,q-p);
if(!strcmp(message,"Success")){
fprintf(fp,"Success\n");
} else {
fprintf(fp,"Error,%s\n",message);
}
}
free(message);
}
}
extReq_t *extractXML(char *buffer){
extReq_t *req;
req = malloc(sizeof(extReq_t));
char *p,*q;
req->operation = malloc(sizeof(char)*12);
req->err = malloc(sizeof(char)*170);
req->val = malloc(sizeof(char)*256*1024+1);
req->error=true;
p = strstr(buffer, "type=")+ 6;
q = strstr(buffer, "\">");
if(p==NULL || q==NULL) {
strcpy(req->err,"XML Error: Received unparseable message");
return req;
}
else
memcpy(req->operation,p,q-p);
p = strstr(buffer, "<Key>")+5;
q = strstr(buffer, "</Key>");
if(p==NULL || q==NULL){
strcpy(req->err,"XML Error: Received unparseable message");
return req;
}
else if (q-p > 257) {
strcpy(req->err ,"Oversized key");
return req;
}
else {
req->key = malloc(sizeof(char)*(q-p+1));
memcpy(req->key,p,q-p);
req->key[q-p] = '\0';
}
if(!strcmp(req->operation,"putreq")) {
p = strstr(buffer, "<Value>")+ 7;
q = strstr(buffer, "</Value>");
if(p==NULL || q==NULL) {
strcpy(req->err,"XML Error: Received unparseable message");
return req;
}
else if (q-p>1024*257+2) {
strcpy(req->err,"Oversized value");
return req;
}
else{
req->val = realloc(req->val,sizeof(char)*(q-p+1));
memcpy(req->val,p,q-p);
req->val[q-p] = '\0';
}
}
req->error = false;
return req;
}
\ No newline at end of file
#include<stdio.h>
#include<stdbool.h>
struct extractedReq {
char* key;
char* val;
char* operation;
char* err;
bool error;
};
typedef struct extractedReq extReq_t;
char *toXML(char *reqType, char *key, char * value);
extReq_t *extractXML(char *buffer);
char *toRespXML(char* msg);
char *extractRespXml(FILE *fp,char *buffer);
Team Info:
193050040 Shreyansh Jain
193050049 Axel James
193050052 Bhaveshkumar Yadav
To compile the system run:
make
Instructions to execute :
1. Through make file
client: make runserver
server: make runclient
2. Manually
server: ./kvserver -port=8080 -threadPoolSize=5 -numSetsInCache=4 -sizeOfSet=2
client: ./kvclient batchRun.txt outputFile.txt -p
'-p' flag prints the received response from server in BatchMode
Input format :
1. For Batch Mode
PUT,<key>,<value> OR
GET,<key> OR
DEL,<key>
put,<key>,<value> OR
get,<key> OR
del,<key>
2. For Interactive Mode
Enter operation : GET/PUT/DEL
Now Enter Key,Value according to reqType
For GET : <key>
For PUT : <key> <value>
For DEL : <key>
xmpld1,val1123123
xmpld,val1123123
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<unistd.h>
#include"KVCache.h"
#include"kvstore.h"
// 256+1 for key
// 256*1024+1 for value
// 1 for valid
// 1 for replacement
// cache line structure
// [round-robin last][valid bit*setSize][referenced bit*setSize][blocks..]
typedef struct Cache
{
int numSets;
int setSize;
char * cacheptr;
pthread_mutex_t* locks;
}Cache;
Cache cache;
int getSetId(char * key){
int s=0;
for (int i = 0; (i < 10)&&(key[i]!='\0'); ++i)
{
s+=(int)key[i];
}
return s%(cache.numSets);
}
char* buildCache(int numSets, int setSize) {
char* cacheptr;
cacheptr = (char *)calloc((BLOCKINFO + BLOCKSIZE)*numSets*setSize + numSets, sizeof(char));
if (cacheptr==NULL)
{
printf("Not enought memory for cache\n");
exit(0);
}
cache.numSets = numSets;
cache.setSize = setSize;
cache.cacheptr = cacheptr;
//creating locks
cache.locks = (pthread_mutex_t *) malloc(numSets * sizeof(pthread_mutex_t));
if (cache.locks == NULL)
{
printf("Cannot create locks\n");
exit(0);
}
for (int i = 0; i < setSize; ++i)
{
if (pthread_mutex_init(&cache.locks[i], NULL) != 0)
{
printf("\n mutex init has failed\n");
exit(0);
}
}
return cacheptr;
}
char * searchCache(int setNo, char* key, int* idx) {
int success = 0;
char * cacheptr = cache.cacheptr;
int startIdx = ((BLOCKINFO + BLOCKSIZE)*cache.setSize + 1 )*setNo;
char * startptr = cache.cacheptr + startIdx + 1;
char * blockptr = startptr + 2*cache.setSize;
for (int i = 0; i < cache.setSize; ++i)
{
if (startptr[i]==1)
{
if(strcmp(key,blockptr+(i)*BLOCKSIZE)==0) {
startptr[cache.setSize+i] = (char)1;
*idx = i;
return blockptr+(i)*BLOCKSIZE + 257;
}
}
}
return NULL;
}
char* secondChance(int setNo) {
char * cacheptr = cache.cacheptr;
int startIdx = ((BLOCKINFO + BLOCKSIZE)*cache.setSize + 1 )*setNo;
char * startptr = cache.cacheptr + startIdx;
int nextReplace = (int)startptr[0];
char* refBits = startptr+cache.setSize+1;
while(((int)refBits[nextReplace])!=0) {
refBits[nextReplace] = (char) 0;
nextReplace++;
if (nextReplace>=cache.setSize)
{
nextReplace = 0;
}
}
char* retBlock = startptr + 1 + BLOCKINFO*cache.setSize + BLOCKSIZE*nextReplace;
refBits[nextReplace] = (char)1;
startptr[1+nextReplace] = (char)1;
nextReplace++;
if (nextReplace>=cache.setSize)
{
nextReplace = 0;
}
startptr[0] = (char)nextReplace;
return retBlock;
}
void addCache(int setNo, char* key, char* value, char* blockValue) {
if (blockValue!=NULL)
{
strcpy(blockValue, value);
}
else
{
char* block = secondChance(setNo);
strcpy(block, key);
strcpy(block+257, value);
}
}
void deleteCache(int setNo, char* key) {
int idx;
char * blockValue = searchCache(setNo, key, &idx);
if (blockValue!=NULL)
{
char * cacheptr = cache.cacheptr;
int startIdx = ((BLOCKINFO + BLOCKSIZE)*cache.setSize + 1 )*setNo;
char * startptr = cache.cacheptr + startIdx;
startptr[1+idx] = (int)0;
startptr[idx+cache.setSize] = (int)0;
}
}
bool searchKey(char* key, char* value) {
int idx;
bool success = false;
int setNo = getSetId(key);
pthread_mutex_lock(&(cache.locks[setNo]));
char* ret = searchCache(setNo, key, &idx);
if (ret!=NULL) {
// Maybe should copy the string 'value' to another location
strcpy(value,ret);
pthread_mutex_unlock(&(cache.locks[setNo]));
success = true;
// return value;
}
else {
ret = restoreFromFile(key);
if (ret!=NULL) {
strcpy(value,ret);
addCache(setNo, key, value, NULL);
success = true;
}
pthread_mutex_unlock(&(cache.locks[setNo]));
}
return success;
}
void addKey(char* key, char* value) {
int idx;
int setNo = getSetId(key);
pthread_mutex_lock(&(cache.locks[setNo]));
char * blockValue = searchCache(setNo,key, &idx);
addCache(setNo, key, value, blockValue);
dumpToFile(key, value);
pthread_mutex_unlock(&(cache.locks[setNo]));
}
bool deleteKey(char* key) {
int setNo = getSetId(key);
bool success;
pthread_mutex_lock(&(cache.locks[setNo]));
deleteCache(setNo, key);
success = deleteStore(key);
pthread_mutex_unlock(&(cache.locks[setNo]));
return success;
}
void toXMLCache() {
char* cacheptr = cache.cacheptr;
char* info = cacheptr+1;
char* blockptr = info+2*cache.setSize;
FILE *fptr;
fptr = fopen("cache.xml", "w+");
if(fptr == NULL)
{
printf("Cannot open xml file!");
exit(1);
}
fprintf(fptr,"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVCache>\n");
for (int i = 0; i <cache.numSets; ++i)
{
char* refInfo = info+cache.setSize;
fprintf(fptr,"\t<Set Id=\"%d\">\n", i);
for (int j = 0; j < cache.setSize; ++j)
{
fprintf(fptr,"\t\t<CacheEntry isReferenced=\"");
if (((int)refInfo[j])==0)
{
fprintf(fptr,"false\" isValid=\"​");
}
else
{
fprintf(fptr,"true\" isValid=\"​");
}
if (((int)info[j])==0)
{
fprintf(fptr,"false\">\n");
fprintf(fptr, "\t\t\t<Key></Key>\n");
fprintf(fptr, "\t\t\t<Value></Value>\n");
}
else
{
fprintf(fptr,"true\">\n");
fprintf(fptr, "\t\t\t<Key>%s</Key>\n",blockptr);
fprintf(fptr, "\t\t\t<Value>%s</Value>\n",blockptr+257);
}
fprintf(fptr,"\t\t</CacheEntry>\n");
blockptr = blockptr + BLOCKSIZE;
}
fprintf(fptr,"\t</Set>\n");
info = info+cache.setSize*(BLOCKSIZE+BLOCKINFO)+1;
blockptr = info+2*cache.setSize;
}
fprintf(fptr,"</KVCache>\n");
fclose(fptr);
}
// int main()
// {
// char k1[]="key1";
// char v1[]="value1";
// char k2[]="jkey2";
// char v2[]="value2";
// char k3[]="key3";
// char v3[]="value3";
// char* cacheptr = buildCache(6, 2);
// addKey(k1,v1);
// addKey(k2,v2);
// // printf("addk\n");
// printf("s1:%s\n",searchKey(k1));
// printf("s12:%s\n",searchKey(k2));
// addKey(k3,v3);
// printf("s12:%s\n",searchKey(k1));
// printf("s12:%s\n",searchKey(k2));
// printf("s12:%s\n",searchKey(k3));
// // printf("searchCache\n");
// printf("s2:%s\n",searchKey(v1));
// // printf("sea2\n");
// deleteKey(k1);
// deleteKey(k2);
// deleteKey(k3);
// deleteKey(v1);
// toXML();
// printf("s3:%s\n",searchKey(k1));
// printf("%d",getSetId("abcd"));
// printf("%d",getSetId("abcdesaf"));
// printf("%d",getSetId("bcesaf"));
// return 0;
// }
\ No newline at end of file
// 256+1 for key
// 256*1024+1 for value
#include<stdbool.h>
#define BLOCKSIZE (257 + 256*1024 + 1)
// 1 for valid
// 1 for replacement
#define BLOCKINFO (2)
char* buildCache(int numSets, int setSize);
bool searchKey(char* key, char* value);
void addKey(char* key, char* value);
void toXMLCache();
bool deleteKey(char* key);
\ No newline at end of file
.SUFFIXES: kvserver: kvserver.o threadPool.o KVCache.o parsexml.o kvstore.o
.SUFFIXES: .c .o gcc -o kvserver kvserver.o threadPool.o KVCache.o parsexml.o kvstore.o -pthread
CLNT = llist
SRVR = llist_svc
CFLAGS = -g -Wall
SRVR_OBJ = llist_svc_proc.o llist_xdr.o llist_svc.o kvserver.o: kvserver.c threadPool.h KVCache.h kvstore.h
# CLNT_OBJ = llist.o llist_xdr.o llist_clnt.o gcc -c kvserver.c
.c.o:; gcc -c -o $@ $(CFLAGS) $< KVCache.o: KVCache.c KVCache.h kvstore.h
gcc -c KVCache.c
default: $(SRVR) parsexml.o: parsexml.c parsexml.h
gcc -c parsexml.c
$(CLNT): $(CLNT_OBJ) threadPool.o: threadPool.c threadPool.h KVCache.h parsexml.h
gcc -o $(CLNT) $(CLNT_OBJ) gcc -c threadPool.c
$(SRVR): $(SRVR_OBJ) kvstore.o: kvstore.c kvstore.h
gcc -o $(SRVR) $(SRVR_OBJ) gcc -c kvstore.c
run: kvserver
./kvserver -port=8080 -threadPoolSize=5 -numSetsInCache=4 -sizeOfSet=2
clean: clean:
rm -f *.o rm -f *.o kvserver kvclient *.csv *.xml batchResp*.txt
rm -f llist llist_svc \ No newline at end of file
\ No newline at end of file
File added
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <stdbool.h>
#include "threadPool.h"
#include "KVCache.h"
#include "parsexml.h"
#include "kvstore.h"
#define MAX_CLIENTS 20
#define BACKLOG 5
int PORT;
bool startsWith(const char *str, const char *pre)
{
size_t lenpre = strlen(pre),
lenstr = strlen(str);
return lenstr < lenpre ? false : memcmp(pre, str, lenpre) == 0;
}
int main(int argc, char* argv[])
{
int numSets=16, setSize=4;
int num_threads = 5;
PORT = 8080;
for (int i = 1; i < argc; ++i)
{
if (startsWith(argv[i],"-port="))
{
PORT = atoi(argv[i]+6);
}
else if (startsWith(argv[i],"-threadPoolSize="))
{
num_threads = atoi(argv[i]+16);
}
else if (startsWith(argv[i],"-numSetsInCache="))
{
numSets = atoi(argv[i]+16);
}
else if (startsWith(argv[i],"-sizeOfSet="))
{
setSize = atoi(argv[i]+11);
}
}
tpool_t* tp = createThreadPool(num_threads);
printf("NumSets = %d",numSets);
char* cacheptr = buildCache(numSets, setSize);
initStore();
int sock_fd, addrlen, msglen, newConnection, client_socket[MAX_CLIENTS], max_sd;
struct sockaddr_in socketAddr;
char buffer[257*1024*10];
fd_set rset;
memset(client_socket,0,sizeof(int)*MAX_CLIENTS);
if( (sock_fd = socket(AF_INET , SOCK_STREAM , 0)) == 0) {
printf("%s",toRespXML("Network Error: Could not create socket"));
exit(1);
}
socketAddr.sin_family = AF_INET;
socketAddr.sin_addr.s_addr = INADDR_ANY;
socketAddr.sin_port = htons( PORT );
memset(&(socketAddr.sin_zero), '\0', 8);
addrlen = sizeof(struct sockaddr_in);
if (bind(sock_fd, (struct sockaddr *)&socketAddr, addrlen)<0){
perror("bind failed");
exit(1);
}
printf("Listening on port %d \n", PORT);
if (listen(sock_fd, BACKLOG) < 0) {
perror("listen");
exit(1);
}
puts("Waiting for connections ...");
while(1)
{
FD_ZERO(&rset);
FD_SET(sock_fd, &rset);
max_sd = sock_fd;
for (int i = 0 ; i < MAX_CLIENTS ; i++) {
int sd = client_socket[i];
if(sd > 0)
FD_SET( sd , &rset);
if(sd > max_sd)
max_sd = sd;
}
if ( (select(max_sd+1,&rset,NULL,NULL,NULL) < 0) && (errno!=EINTR))
printf("select error");
if (FD_ISSET(sock_fd, &rset)){
newConnection = accept(sock_fd, (struct sockaddr *)&socketAddr, (socklen_t*)&addrlen);
if (newConnection < 0) {
perror("Error during Accept");
exit(1);
}
printf("New connection,socket_fd is %d,ip is:%s,port:%d\n", newConnection,inet_ntoa(socketAddr.sin_addr),ntohs(socketAddr.sin_port));
for (int i=0;i<MAX_CLIENTS;i++) {
if( client_socket[i] == 0 ) {
client_socket[i] = newConnection;
printf("Adding to list of sockets as %d\n" , i);
break;
}
}
}
for (int i = 0; i < MAX_CLIENTS; i++) {
int sd = client_socket[i];
if (FD_ISSET(sd,&rset)) {
if ((msglen = read(sd,buffer,1024*257)) == 0) {
getpeername(sd , (struct sockaddr*)&socketAddr,(socklen_t*)&addrlen);
printf("Host disconnected,ip %s,port %d\n",inet_ntoa(socketAddr.sin_addr),ntohs(socketAddr.sin_port));
close(sd);
client_socket[i] = 0;
}
else {
buffer[msglen] = '\0';
// printf("message recieved\n");
// printf("%s",buffer);
puts(buffer);
addRequestToQueue(tp,decodeRequestAndProcess,buffer,sd);
/*
char operation[7],key[257],value[256*1024+1];
extractXML(buffer,key,value,operation);
printf("%s\n%s\n%s\n",key,value,operation);
ThreadPool Here !!
Call extract from XML procedure to get Operation Type, Key, Value
*/
// write(sd , buffer , strlen(buffer));
}
}
}
}
return 0;
}
\ No newline at end of file
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<unistd.h>
#include<stdbool.h>
/////////////////add/////////////////
#define NUMFILES 10
typedef struct Store
{
pthread_mutex_t* locks;
}Store;
Store store;
int getFileId(char * key){
int s=0;
for (int i = 0; (i < 10)&&(key[i]!='\0'); ++i)
{
s+=(int)key[i];
}
return s%(NUMFILES);
}
void initStore() {
//creating locks
store.locks = (pthread_mutex_t *) malloc(NUMFILES * sizeof(pthread_mutex_t));
if (store.locks == NULL)
{
printf("Cannot create locks\n");
exit(0);
}
for (int i = 0; i < NUMFILES; ++i)
{
if (pthread_mutex_init(&store.locks[i], NULL) != 0)
{
printf("\n mutex init has failed\n");
exit(0);
}
}
return;
}
/////////////////add end/////////////////
bool dumpToFile(char* key, char* val){
/////////////////add/////////////////
int setNo = getFileId(key);
/////////////////add end/////////////////
char fileName[10];
char tempfileName[10];
bool keyExists = false;
strcpy(tempfileName,"temp");
sprintf(fileName,"%d",setNo);
strcat(fileName,".csv");
strcat(tempfileName,fileName);
/////////////////add/////////////////
pthread_mutex_lock(&(store.locks[setNo]));
/////////////////add end/////////////////
FILE *fp = fopen(fileName,"a");
FILE *newFp = fopen(tempfileName,"w");
char* storedKey = malloc(sizeof(char)*257);
char* storedVal = malloc(sizeof(char)*256*1024+1);
while(1){
if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF)
break;
if(!strcmp(key,storedKey)) {
keyExists = true;
fprintf(newFp,"%s,%s\n",key,val);
}
else
fprintf(newFp,"%s,%s\n",storedKey,storedVal);
}
if(!keyExists)
fprintf(newFp,"%s,%s\n",key,val);
remove(fileName);
// free(fp);
rename(tempfileName,fileName);
/////////////////add/////////////////
pthread_mutex_unlock(&(store.locks[setNo]));
/////////////////add end/////////////////
fclose(newFp);
free(storedKey);
free(storedVal);
return true;
}
//returns false if key doesn't exist in the store//
//TODO:: re-use code of add-key instead of copy-paste
bool deleteStore(char* key) {
/////////////////add/////////////////
int setNo = getFileId(key);
/////////////////add end/////////////////
char fileName[10];
char tempfileName[10];
bool keyExists = false;
strcpy(tempfileName,"temp");
sprintf(fileName,"%d",setNo);
strcat(fileName,".csv");
strcat(tempfileName,fileName);
/////////////////add/////////////////
pthread_mutex_lock(&(store.locks[setNo]));
/////////////////add end/////////////////
FILE *fp = fopen(fileName,"a");
FILE *newFp = fopen(tempfileName,"w");
char* storedKey = malloc(sizeof(char)*256);
char* storedVal = malloc(sizeof(char)*256*1024+1);
while(1){
if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF)
break;
if(!strcmp(key,storedKey)) {
keyExists = true;
}else
fprintf(newFp,"%s,%s\n",storedKey,storedVal);
}
remove(fileName);
// free(fp);
rename(tempfileName,fileName);
/////////////////add/////////////////
pthread_mutex_unlock(&(store.locks[setNo]));
/////////////////add end/////////////////
fclose(newFp);
free(storedKey);
free(storedVal);
return keyExists;
}
char* restoreFromFile(char* key) {
/////////////////add/////////////////
int setNo = getFileId(key);
/////////////////add end/////////////////
char fileName[10];
sprintf(fileName,"%d",setNo);
strcat(fileName,".csv");
/////////////////add/////////////////
pthread_mutex_lock(&(store.locks[setNo]));
/////////////////add end/////////////////
FILE *fp = fopen(fileName,"r+");
if(fp==NULL)
return NULL;
char* storedKey = malloc(sizeof(char)*256);
char* storedVal = malloc(sizeof(char)*256*1024+1);
while(1){
if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF)
break;
if(!strcmp(key,storedKey)) {
free(storedKey);
fclose(fp);
pthread_mutex_unlock(&(store.locks[setNo]));
return storedVal;
}
}
fclose(fp);
/////////////////add/////////////////
pthread_mutex_unlock(&(store.locks[setNo]));
/////////////////add end/////////////////
free(storedKey);
return NULL;
}
void dumpToFileXML(char* filename) {
FILE *dumpPtr = fopen(filename, "w");
char *storedKey = malloc(sizeof(char)*257);
char *storedVal = malloc(sizeof(char)*256*1024+1);
if(dumpPtr == NULL)
{
printf("Cannot open xml file!");
exit(1);
}
fprintf(dumpPtr,"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVStore>\n");
for (int i = 0; i < 10; ++i) {
char fileSet[10];
sprintf(fileSet,"%d",i);
strcat(fileSet,".csv");
FILE *fp = fopen(fileSet,"r+");
while(1){
if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF)
break;
fprintf(dumpPtr,"<KVPair>\n<Key>%s</Key>\n<Value>%s</Value>\n</KVPair>\n",storedKey,storedVal);
}
fclose(fp);
}
free(storedKey);
free(storedVal);
fprintf(dumpPtr,"</KVStore>");
fclose(dumpPtr);
}
int iniit() {
char* key = malloc(sizeof(char)*245);
char* val = malloc(sizeof(char)*245);
// dumpToFile(1,"sundaram","shivam4");
// dumpToFile(1,"key1","val");
// deleteStore(1,"radium");
// deleteStore(1,"mr");
// printf("%s\n",restoreFromFile(1,"sundaram"));
// printf("%s\n",restoreFromFile(1,"sundaram"));
if(!restoreFromFile("key12"))
printf("Not found");
return 0;
}
#include<stdbool.h>
bool dumpToFile(char* key,char* val);
bool deleteStore(char* key);
char* restoreFromFile(char* key);
void initStore();
\ No newline at end of file
#include <stdio.h>
#include <string.h>
#include <strings.h>
#include <stdlib.h>
#include <stdbool.h>
#include<unistd.h>
#include "parsexml.h"
char *toRespXML(char* msg){
char *line1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVMessage type=\"resp\">\n<Message>";
char *line2 = "</Message>\n</KVMessage>\n";
int msglen = strlen(line1)+strlen(line2)+strlen(msg)+1;
char *message = (char *)malloc(sizeof(char)*msglen);
strcpy(message,line1);
strcat(message,msg);
strcat(message,line2);
// puts("message");
// puts(msg);
return message;
}
char * toXML(char *reqType, char *key, char * value){
char *line1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n";
char *line21 = "<KVMessage type=\"";
char *line22 = "\">\n";
char *line31 = "<Key>";
char *line32 = "</Key>\n";
char *line41 = "<Value>";
char *line42 = "</Value>\n";
char *line5 = "</KVMessage>\n";
int len2 = strlen(line21)+strlen(line22)+strlen(reqType)+1;
int len3 = strlen(line31)+strlen(line32)+strlen(key)+1;
int len4 = strlen(line41)+strlen(line42)+strlen(value)+1;
char *line2 = (char *)malloc(sizeof(char) * len2);
char *line3 = (char *)malloc(sizeof(char) * len3);
char *line4 = (char *)malloc(sizeof(char) * len4);
strcpy(line2,line21);
strcat(line2,reqType);
strcat(line2,line22);
strcpy(line3,line31);
strcat(line3,key);
strcat(line3,line32);
strcpy(line4,line41);
strcat(line4,value);
strcat(line4,line42);
int msglen = strlen(line1)+strlen(line2)+strlen(line3)+strlen(line4)+strlen(line5)+1;
char *message = (char *)malloc(sizeof(char) * msglen);
strcpy(message,line1);
strcat(message,line2);
strcat(message,line3);
if(!strcmp(reqType,"putreq") || !strcmp(reqType,"resp"))
strcat(message,line4);
strcat(message,line5);
free(line2);free(line3);free(line4);
return message;
}
char *extractRespXml(FILE *fp,char *buffer){
char *p,*q,*err;
char *key;
char *value;
p = strstr(buffer, "<Key>")+5;
q = strstr(buffer, "</Key>");
if(p!=NULL && q!=NULL){
key = malloc(sizeof(char)*(q-p+1));
memcpy(key,p,q-p);
key[q-p] = '\0';
p = strstr(buffer, "<Value>")+ 7;
q = strstr(buffer, "</Value>");
if(p!=NULL && q!=NULL){
value = malloc(sizeof(char)*(q-p+1));
memcpy(value,p,q-p);
value[q-p] = '\0';
}
fprintf(fp,"%s,%s\n",key,value);
free(key);
free(value);
}
else {
p = strstr(buffer, "<Message>")+ 9;
q = strstr(buffer, "</Message>");
char *message = (char *)malloc(sizeof(char)*170);
if(p!=NULL && q!=NULL){
memcpy(message,p,q-p);
if(!strcmp(message,"Success")){
fprintf(fp,"Success\n");
} else {
fprintf(fp,"Error,%s\n",message);
}
}
free(message);
}
}
extReq_t *extractXML(char *buffer){
extReq_t *req;
req = malloc(sizeof(extReq_t));
char *p,*q;
req->operation = malloc(sizeof(char)*12);
req->err = malloc(sizeof(char)*170);
req->val = malloc(sizeof(char)*256*1024+1);
req->error=true;
p = strstr(buffer, "type=")+ 6;
q = strstr(buffer, "\">");
if(p==NULL || q==NULL) {
strcpy(req->err,"XML Error: Received unparseable message");
return req;
}
else
memcpy(req->operation,p,q-p);
p = strstr(buffer, "<Key>")+5;
q = strstr(buffer, "</Key>");
if(p==NULL || q==NULL){
strcpy(req->err,"XML Error: Received unparseable message");
return req;
}
else if (q-p > 257) {
strcpy(req->err ,"Oversized key");
return req;
}
else {
req->key = malloc(sizeof(char)*(q-p+1));
memcpy(req->key,p,q-p);
req->key[q-p] = '\0';
}
if(!strcmp(req->operation,"putreq")) {
p = strstr(buffer, "<Value>")+ 7;
q = strstr(buffer, "</Value>");
if(p==NULL || q==NULL) {
strcpy(req->err,"XML Error: Received unparseable message");
return req;
}
else if (q-p>1024*257+2) {
strcpy(req->err,"Oversized value");
return req;
}
else{
req->val = realloc(req->val,sizeof(char)*(q-p+1));
memcpy(req->val,p,q-p);
req->val[q-p] = '\0';
}
}
req->error = false;
return req;
}
\ No newline at end of file
#include<stdio.h>
#include<stdbool.h>
struct extractedReq {
char* key;
char* val;
char* operation;
char* err;
bool error;
};
typedef struct extractedReq extReq_t;
char *toXML(char *reqType, char *key, char * value);
extReq_t *extractXML(char *buffer);
char *toRespXML(char* msg);
char *extractRespXml(FILE *fp,char *buffer);
.SUFFIXES:
.SUFFIXES: .c .o
CLNT = llist
SRVR = llist_svc
CFLAGS = -g -Wall
SRVR_OBJ = llist_svc_proc.o llist_xdr.o llist_svc.o
# CLNT_OBJ = llist.o llist_xdr.o llist_clnt.o
.c.o:; gcc -c -o $@ $(CFLAGS) $<
default: $(SRVR)
$(CLNT): $(CLNT_OBJ)
gcc -o $(CLNT) $(CLNT_OBJ)
$(SRVR): $(SRVR_OBJ)
gcc -o $(SRVR) $(SRVR_OBJ)
clean:
rm -f *.o
rm -f llist llist_svc
\ No newline at end of file
#include"parsexml.h"
#include <stdbool.h>
#include <stddef.h>
#include<stdlib.h>
#include<stdio.h>
#include<unistd.h>
#include<pthread.h>
#include<string.h>
#include"KVCache.h"
#include "threadPool.h"
struct request {
tFunction func;
char *arg;
int clientFD;
struct request *next;
};
typedef struct request request_t;
struct tpool {
request_t *first;
request_t *work_last;
pthread_mutex_t queueLock;
pthread_cond_t threadWait;
pthread_cond_t producerWait;
size_t workingCount;
size_t activeThreads;
bool stop;
};
static request_t *createJob(tFunction func, void *arg,int clientFD)
{
request_t *work;
if (func == NULL)
return NULL;
work = malloc(sizeof(*work));
work->func = func;
work->arg = arg;
work->clientFD = clientFD;
work->next = NULL;
return work;
}
static void deleteJob(request_t *work)
{
if (work == NULL)
return;
free(work);
}
static request_t *tpoolGetRequest(tpool_t *tm)
{
request_t *work;
if (tm == NULL)
return NULL;
work = tm->first;
if (work == NULL)
return NULL;
if (work->next == NULL) {
tm->first = NULL;
tm->work_last = NULL;
} else {
tm->first = work->next;
}
return work;
}
static void *threadMainFunction(void *arg)
{
tpool_t *tm = arg;
request_t *work;
while (1) {
pthread_mutex_lock(&(tm->queueLock));
if (tm->stop)
break;
if (tm->first == NULL) {
pthread_cond_wait(&(tm->threadWait), &(tm->queueLock));
}
work = tpoolGetRequest(tm);
tm->workingCount++;
pthread_mutex_unlock(&(tm->queueLock));
if (work != NULL) {
work->func(work->arg);
write(work->clientFD,work->arg,sizeof(char)*strlen(work->arg));
deleteJob(work);
}
pthread_mutex_lock(&(tm->queueLock));
tm->workingCount--;
if (!tm->stop && tm->workingCount == 0 && tm->first == NULL)
pthread_cond_signal(&(tm->producerWait));
pthread_mutex_unlock(&(tm->queueLock));
}
tm->activeThreads--;
pthread_cond_signal(&(tm->producerWait));
pthread_mutex_unlock(&(tm->queueLock));
return NULL;
}
tpool_t *createThreadPool(size_t num)
{
tpool_t *tm;
pthread_t thread;
size_t i;
if (num == 0)
num = 2;
tm = calloc(1, sizeof(*tm));
tm->activeThreads = num;
pthread_mutex_init(&(tm->queueLock), NULL);
pthread_cond_init(&(tm->threadWait), NULL);
pthread_cond_init(&(tm->producerWait), NULL);
tm->first = NULL;
tm->work_last = NULL;
for (i=0; i<num; i++) {
pthread_create(&thread, NULL, threadMainFunction, tm);
pthread_detach(thread);
}
return tm;
}
void destroyThreadPool(tpool_t *tm)
{
request_t *work;
request_t *work2;
if (tm == NULL)
return;
pthread_mutex_lock(&(tm->queueLock));
work = tm->first;
while (work != NULL) {
work2 = work->next;
deleteJob(work);
work = work2;
}
tm->stop = true;
pthread_cond_broadcast(&(tm->threadWait));
pthread_mutex_unlock(&(tm->queueLock));
tpoolWait(tm);
pthread_mutex_destroy(&(tm->queueLock));
pthread_cond_destroy(&(tm->threadWait));
pthread_cond_destroy(&(tm->producerWait));
free(tm);
}
bool addRequestToQueue(tpool_t *tm, tFunction func, void *arg,int clientFD)
{
request_t *work;
int msglen = 1024*257+170;
char* copyBuffer = (char*)malloc(sizeof(char)*msglen);
strcpy(copyBuffer,arg);
if (tm == NULL)
return false;
work = createJob(func, copyBuffer,clientFD);
if (work == NULL)
return false;
pthread_mutex_lock(&(tm->queueLock));
if (tm->first == NULL) {
tm->first = work;
tm->work_last = tm->first;
} else {
tm->work_last->next = work;
tm->work_last = work;
}
pthread_cond_signal(&(tm->threadWait));
pthread_mutex_unlock(&(tm->queueLock));
return true;
}
void tpoolWait(tpool_t *tm)
{
if (tm == NULL)
return;
pthread_mutex_lock(&(tm->queueLock));
while (1) {
if ((!tm->stop && tm->workingCount != 0) || (tm->stop && tm->activeThreads != 0)) {
pthread_cond_wait(&(tm->producerWait), &(tm->queueLock));
} else {
break;
}
}
pthread_mutex_unlock(&(tm->queueLock));
}
void worker(char *arg)
{
int *val = (int*)arg;
int old = *val;
*val += 1000;
printf("tid=%ld, old=%d, val=%d\n", pthread_self(), old, *val);
if (*val%2)
usleep(100000);
}
static const size_t num_threads = 4;
static const size_t num_items = 5;
int init(int argc, char **argv)
{
tpool_t *tm;
int *vals;
size_t i;
tm = createThreadPool(num_threads);
vals = calloc(num_items, sizeof(*vals));
char* buffer[10];
buffer[0] = malloc(sizeof(char)*1025);
buffer[1] = malloc(sizeof(char)*1025);
buffer[2] = malloc(sizeof(char)*1025);
buffer[3] = malloc(sizeof(char)*1025);
char* key = malloc(sizeof(char)*256+1);
char* value = malloc(sizeof(char)*256*1024 +1);
char* operation = malloc(sizeof(char)*7);
char* cacheptr = buildCache(2, 2);
strcpy(buffer[0],"<?xml version=\"1.0\" encoding=\"UTF-8\"?><KVMessage type=\"putreq\"><Key>5</Key><Value>25</Value></KVMessage>");
strcpy(buffer[1],"<?xml version=\"1.0\" encoding=\"UTF-8\"?><KVMessage type=\"getreq\"><Key>5</Key><Value></Value></KVMessage>");
strcpy(buffer[2],"<?xml version=\"1.0\" encoding=\"UTF-8\"?><KVMessage type=\"delreq\"><Key>5</Key><Value></Value></KVMessage>");
strcpy(buffer[3],"<?xml version=\"1.0\" encoding=\"UTF-8\"?><KVMessage type=\"getreq\"><Key>5</Key><Value></Value></KVMessage>");
for (i=0; i<4; i++) {
vals[i] = i;
addRequestToQueue(tm, decodeRequestAndProcess, buffer[i],1);
}
tpoolWait(tm);
free(vals);
destroyThreadPool(tm);
return 0;
}
void decodeRequestAndProcess(char* buffer) {
extReq_t *request;
// char* key;
// char* value = (char *)malloc(sizeof(char)*1024*256+1);
// char* operation = (char *)malloc(sizeof(char)*7);
// char* err = (char *)malloc(sizeof(char)*170);
request = extractXML(buffer);
if(request->error){
strcpy(buffer,toRespXML(request->err));
return;
}
puts("key");
puts(request->key);
if(!strcmp(request->operation,"putreq")){
puts("putreq");
addKey(request->key,request->val);
strcpy(buffer,toRespXML("Success"));
// puts("buffer");
// puts(buffer);
}
else if(!strcmp(request->operation,"getreq")){
puts("getreq");
if(searchKey(request->key,request->val))
strcpy(buffer,toXML("resp",request->key,request->val));
else
strcpy(buffer,toRespXML("Does not exist"));
// puts("buffer");
// puts(buffer);
}
else if (!strcmp(request->operation,"delreq")){
puts("delreq");
if(deleteKey(request->key))
strcpy(buffer,toRespXML("Success"));
else
strcpy(buffer,toRespXML("Does not exist"));
// puts("buffer");
// puts(buffer);
}
// free(key);
// free(value);
// free(operation);
free(request);
// toXMLCache();
}
\ No newline at end of file
#define __TPOOL_H__
#include <stdbool.h>
struct tpool;
typedef struct tpool tpool_t;
typedef void (*tFunction)(char *arg);
tpool_t *createThreadPool(size_t num);
void destroyThreadPool(tpool_t *tm);
void decodeRequestAndProcess(char* buffer);
bool addRequestToQueue(tpool_t *tm, tFunction func, void *arg,int clientFD);
void tpoolWait(tpool_t *tm);
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment