Commit 0691e504 authored by Shivaji's avatar Shivaji

fixed update bug in cache

parents bb0f32ef 3a427d2d
File added
File deleted
/*
This file only has a main function
Include KVClientLibrary.c to use interfaces to make GET, PUT, DELETE requests
PA4 instructions:
The client is a separate process that will first establish a connection with the server, and then
send GET , PUT , and DELETE requests to the server process. It will have a main and use the
client library interfaces to do the actual operations.
*/
#define MAX 80
#define PORT 8000
#define SA struct sockaddr
#include "KVClientLibrary.c"
#define SA struct sockaddr
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "KVClientLibrary.h"
#include <string.h>
struct message* requestMessage;
void interactive (int sock) {
char command[515];
int readlen, i, j;
int status_code;
char key[256];
char value[256];
char error[256];
do {
memset(key, 0, sizeof(key));
memset(value, 0, sizeof(value));
memset(error, 0, sizeof(error));
printf("interactive> ");
fflush(stdout);
readlen = read(0, command, sizeof(command));
i=5;
while(command[i] && command[i]!=' ' && command[i]!='\n') {
i++;
}
memcpy(key, command+4, i-4);
if(command[i]==' ') {
j=i+1;
while(command[j] && command[j]!=':' && command[j]!='\n') {
j++;
}
memcpy(value, command+i+1, j-i-1);
}
// printf("key: %s\n", key);
// printf("value: %s\n", value);
// printf("error: %s\n", error);
switch(command[0]) {
case 'g': status_code = get(sock, key, value, error);
printf("[%d]", status_code);
printf(" Value recieved: %s\n", value);
break;
case 'p':status_code = put(sock, key, value, error);
printf("[%d]\n", status_code);
break;
case 'd':status_code = del(sock, key, error);
printf("[%d]\n", status_code);
break;
default: return;
}
} while(command[0] != 'q');
}
int main(int argc, char const *argv[])
{
int sock = 0, valread;
......@@ -42,15 +90,22 @@ int main(int argc, char const *argv[])
printf("\nConnection Failed \n");
return -1;
}
char key[256]="abc";
char value[256]="cde";
char error[256];
char key1[256]="abc";
char value1[256]="cde";
char error1[256];
char key2[256]="test";
char value2[256]="cdde";
char error2[256];
char value3[256];
//printf("%d",(int)put(sock,key, value,error));
//printf("%d",(int)del(sock,key,error));
//printf("%d",(int)get(sock,key, value,error));
while (1) {
printf("%d",(int)put(sock, key, value, error));
}
//while (1) {
//printf("%d",(int)put(sock, key1, value1, error1));
//printf("%d",(int)put(sock, key2, value2, error2));
//printf("%d\n",(int)get(sock, key1, value3, error2));
//printf("%s", value3);
interactive(sock);
return 0;
}
\ No newline at end of file
/*
functions:
int get(void* key, void* value, void* error);
int post(void* key, void* value, void* error);
int delete(void* key, void* value, void* error);
PA4 instructions:
This is a library, which will encode and decode your request and response messages.
For example, at the client side this library will encode your request message to the decided
message format, and then send it out to the server process. Similarly, it will decode the
response received from the server, and then hand out the decoded response to the KVClient
module.
Something complimentary will happen at the server end.
Status Codes for request message
GET: 1
PUT: 2
DEL: 3
Status Codes for response message
Success: 200
Error: 400 (with the appropriate error message)
Reasons for error could be GET key not found, DEL key not found etc.
https://moodle.iitb.ac.in/mod/forum/discuss.php?d=13716
The client library really consists of only 3 functions corresponding to the get, put and del.
Internally, these messages manage encoding the request, sending the message,
waiting for the reply and returning the appropriate value.
So, a get API would like like int get(void* key, void* value, void* error);
The return value is the status code.
If the code is 200, then get will fill in the value with malloced memory.
Else, it will malloc some memory into the error pointer and return it. And so on....
*/
#include "KVMessageFormat.h"
#include<stdio.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <netdb.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include "KVClientLibrary.h"
// to print a message
void printMessage(struct message *requestMessage)
......@@ -130,7 +89,7 @@ int put(int sockfd,char *key,char *value,char *error)
}
else
{
printMessage(reply);
//printMessage(reply);
}
free(request);
......
#include "KVMessageFormat.h"
void printMessage(struct message *requestMessage);
int get(int sockfd,char* key,char *value,char *error);
int del(int sockfd,char *key,char *error);
int put(int sockfd,char *key,char *value,char *error);
......@@ -14,6 +14,10 @@ Error: 400 (with the appropriate error message)
Reasons for error could be GET key not found, DEL key not found etc.
*/
#define STATUS_GET 1
#define STATUS_DEL 2
#define STATUS_PUT 3
struct message{
unsigned char status;
char key[256];
......
File deleted
......@@ -7,9 +7,20 @@
#include <netinet/in.h>
#include <sys/epoll.h>
#include <string.h>
#include "LRU.h"
#include "KVMessageFormat.h"
#define MAX_EVENTS 10
#define DEBUG (0)
void gen_random(char *s, const int len) {
static const char alphanum[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
for (int i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
s[len] = 0;
}
void *worker(void *args) {
struct epoll_event ev,events[MAX_EVENTS];
......@@ -19,11 +30,17 @@ void *worker(void *args) {
int read_pipe, conn_sock, nfds, i;
int epollfd;
int newfd;
int status;
// Generate name for each thread for debugging
char *name = (char *) malloc(5*sizeof(char));
gen_random(name, 5);
if DEBUG printf("[%s] Thread started!\n", name);
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
while (epollfd == -1) {
epollfd = epoll_create1(0);
}
/* Add the pipe which we get from main thread */
......@@ -34,19 +51,22 @@ void *worker(void *args) {
perror("epoll_ctl: read_pipe");
exit(EXIT_FAILURE);
}
//if DEBUG printf("[%s] Added read pipe to epoll fd set!\n", name);
while (1) {
if DEBUG printf("[%s] waiting for epoll event!\n", name);
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
if DEBUG printf("[%s] %d events !\n", name, nfds);
for ( i= 0; i < nfds; ++i ) {
if (events[i].data.fd == read_pipe) {
/* if we get a request from main thread to add new client */
read(read_pipe, &newfd, sizeof(newfd));
printf("\nread %d\n", read_pipe);
if DEBUG printf("[%s][EVENT] New Client assigned by main thread fd:%d!\n", name, newfd);
ev.data.fd=newfd;
ev.events = EPOLLIN | EPOLLRDHUP;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, newfd, &ev) == -1) {
......@@ -59,8 +79,10 @@ void *worker(void *args) {
/* if we get a request from client (GET, PUSH, DEL) */
int flag = events[i].events;
if DEBUG printf("[%s][EVENT] flag set: %d\n", name, flag);
if (flag & EPOLLRDHUP) {
/* Connection was closed. */
if DEBUG printf("[%s][EVENT][EPOLLRDHUP] \n", name);
epoll_ctl( epollfd, EPOLL_CTL_DEL, events[i].data.fd , NULL );
close(events[i].data.fd);
continue;
......@@ -70,17 +92,46 @@ void *worker(void *args) {
/* Parse the actual message from client */
struct message *requestMessage= malloc(sizeof(struct message));
int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message));
printf("[Message Received from client] (%d, %s, %s)\n",requestMessage->status,requestMessage->key,requestMessage->value);
/*Process the message and send appropriate response to client*/
requestMessage->status=200;
write(events[i].data.fd,requestMessage, sizeof(struct message));
fflush(stdout);
//printf("[Message Received from client] (%d, %s, %s)\n",requestMessage->status,requestMessage->key,requestMessage->value);
if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name);
switch(requestMessage->status) {
case STATUS_GET:
if DEBUG printf("[%s] GET \n", name);
status = cache_get(requestMessage->key, requestMessage->value);
if(status) {
requestMessage->status = 200;
} else {
requestMessage->status = 240;
}
break;
case STATUS_PUT:
if DEBUG printf("[%s] PUT \n", name);
cache_put(requestMessage->key, requestMessage->value);
requestMessage->status = 200;
break;
case STATUS_DEL:
if DEBUG printf("[%s] DEL \n", name);
status = cache_del(requestMessage->key);
if(status == 0) {
requestMessage->status = 240;
} else {
requestMessage->status = 200;
}
break;
}
write(events[i].data.fd, requestMessage, sizeof(struct message));
free(requestMessage);
}
}
}
}
free(name);
}
......@@ -90,12 +141,12 @@ int main (int argc, int argv) {
int i;
int next_thread_to_assign = 0;
pthread_t *worker_threads;
int pool_thread_size = 5; // TODO: get pool thread size from config file
int pool_thread_size = 1; // TODO: get pool thread size from config file
int sockfd, newsockfd, portno, clilen, n;
struct sockaddr_in serv_addr, cli_addr;
//int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes));
int pipes[5][2]; // TODO: initialize pipes dynamically
int pipes[1][2]; // TODO: initialize pipes dynamically
worker_threads = malloc(pool_thread_size * sizeof(pthread_t));
for( i=0; i < pool_thread_size; i++ ) {
......@@ -123,15 +174,15 @@ int main (int argc, int argv) {
listen(sockfd, 5);
clilen = sizeof(cli_addr);
init_cache();
while(1) {
fflush(stdout);
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0)
perror("ERROR on accept");
write(pipes[next_thread_to_assign][1], &newsockfd, sizeof(newsockfd));
next_thread_to_assign = (next_thread_to_assign+1) % pool_thread_size;
}
......
......@@ -5,42 +5,26 @@
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdatomic.h>
#include <stdatomic.h>
#include <assert.h>
#include "LRU.h"
#define ATOMIC_TEST_AND_SET __atomic_test_and_set
#define CLEAR __atomic_clear
#define MAX_SIZE 10
typedef enum{
FALSE,TRUE
}bool;
struct KV{
char *key;
char *value;
bool valid;
bool modified;
unsigned _Atomic lock;
//int lock;
};
struct KV *array[MAX_SIZE];
struct queue *qu=NULL;
struct queue *last;
typedef struct KV KV;
struct queue{
char *key;
struct queue *next;
};
typedef struct queue queue;
KV *array[MAX_SIZE];
queue *qu = NULL;
queue *last=NULL;
void remove_element_from_deque(char *key)
{
queue *present = qu , *previous=NULL;
if(present == NULL)
return;
while(present->next != NULL)
{
if(present->key == key)
......@@ -62,7 +46,7 @@ void remove_element_from_deque(char *key)
void insert_into_queue(char *key)
{
queue *temp = (queue *)malloc(sizeof(queue));
temp->key = key;
memcpy(temp->key, key, KEY_SIZE);
temp->next = NULL;
if(qu == NULL)
{
......@@ -76,6 +60,23 @@ void insert_into_queue(char *key)
}
}
int remove_front_element()
{
queue *temp = qu;
qu = qu->next;
assert(temp != NULL);
assert(qu != NULL);
for(int i=0;i<MAX_SIZE;i++)
{
if(strcmp(array[i]->key , temp->key) == 0)
{
// check if it is modified to write to file
array[i]->valid = FALSE;
return i;
}
}
}
int find_empty_location()
{
for(int i=0;i<MAX_SIZE;i++)
......@@ -83,192 +84,92 @@ int find_empty_location()
if(array[i]->valid == FALSE)
return i;
}
return remove_front_element();
}
void delet(char *key)
int cache_del(char *key)
{
for(int i=0;i<MAX_SIZE;i++)
{
if(array[i]->key == key)
if(strcmp(array[i]->key , key) == 0)
{
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1);
remove_element_from_deque(key);
array[i]->valid = FALSE;
CLEAR(&(array[i]->lock),0);
break;
return 1;
}
}
printf("Cache after DEL: \n");
print_cache();
return 0;
//TODO remove key from file also
}
void put(char *key, char *value)
void cache_put(char *key, char *value)
{
int indx=-1;
for(int i=0;i<MAX_SIZE;i++)
{
if(array[i]->key == key)
{
indx = i;
remove_element_from_deque(key);
break;
if(array[i]->key!=NULL) {
if(strcmp(array[i]->key , key) == 0)
{
printf("ENTERING INTO QUEUE\n");
indx = i;
remove_element_from_deque(key);
break;
}
}
}
printf("key is present at index: %d\n", indx);
if(indx == -1)
{
indx = find_empty_location();
assert(indx>=0);
// TODO should write to file if modified is true
// replacment from cache
}
while(ATOMIC_TEST_AND_SET(&(array[indx]->lock),1) == 1);
array[indx]->key = key;
array[indx]->value = value;
memcpy(array[indx]->key, key, KEY_SIZE);
memcpy(array[indx]->value, value, VAL_SIZE);
array[indx]->valid = TRUE;
array[indx]->modified = TRUE;
insert_into_queue(key);
CLEAR(&(array[indx]->lock),0);
print_cache();
}
char* get(char *key)
int cache_get(char *key, char *value)
{
for(int i=0;i<MAX_SIZE;i++)
{
if(array[i]->key == key)
if(strcmp(array[i]->key , key) == 0 && array[i]->valid)
{
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1);
remove_element_from_deque(key);
insert_into_queue(key);
CLEAR(&(array[i]->lock),0);
return array[i]->value;
memcpy(value, array[i]->value, VAL_SIZE);
return 1;
}
}
return (char *)"NOT FOUND";
return 0;
}
int main()
{
int i=0;
void init_cache() {
for(int j=0;j<MAX_SIZE;j++)
{
array[j] = (KV *)malloc(sizeof(KV));
array[j]->valid = FALSE;
array[j]->lock = 0;
}
char key1[250]="Hello world1";
char value1[256] = "THIS IS Vlaue1";
char key2[250]="Hello world2";
char value2[256] = "THIS IS Vlaue2";
char key3[250]="Hello world3";
char value3[256] = "THIS IS Vlaue3";
char key4[250]="Hello world4";
char value4[256] = "THIS IS Vlaue4";
char key5[250]="Hello world5";
char value5[256] = "THIS IS Vlaue5";
char key6[250]="Hello world6";
char value6[256] = "THIS IS Vlaue6";
KV *temp1 = (KV *)malloc(sizeof(KV));
KV *temp2 = (KV *)malloc(sizeof(KV));
KV *temp3 = (KV *)malloc(sizeof(KV));
KV *temp4 = (KV *)malloc(sizeof(KV));
KV *temp5 = (KV *)malloc(sizeof(KV));
KV *temp6 = (KV *)malloc(sizeof(KV));
temp1->key = key1;
temp1->value = value1;
temp1->valid = TRUE;
temp1->lock = ATOMIC_VAR_INIT(0);
temp2->key = key2;
temp2->value = value2;
temp2->valid = TRUE;
temp2->lock = ATOMIC_VAR_INIT(0);
temp3->key = key3;
temp3->value = value3;
temp3->valid = TRUE;
temp3->lock = ATOMIC_VAR_INIT(0);
temp4->key = key4;
temp4->value = value4;
temp4->valid = TRUE;
temp4->lock = ATOMIC_VAR_INIT(0);
temp5->key = key5;
temp5->value = value5;
temp5->valid = TRUE;
temp5->lock = ATOMIC_VAR_INIT(0);
temp6->key = key6;
temp6->value = value6;
temp6->valid = TRUE;
temp6->lock = ATOMIC_VAR_INIT(0);
array[i++] = (temp1);
array[i++] = (temp2);
array[i++] = (temp3);
array[i++] = (temp4);
array[i++] = (temp5);
array[i++] = (temp6);
qu = (queue *)malloc(sizeof(queue));
qu->key = temp1->key;
qu->next = NULL;
queue *te = qu;
te->next = (queue *)malloc(sizeof(queue));
te = te->next;
te->key = temp1->key;
te->next = NULL;
te->next = (queue *)malloc(sizeof(queue));
te = te->next;
te->key = temp2->key;
te->next = NULL;
te->next = (queue *)malloc(sizeof(queue));
te = te->next;
te->key = temp3->key;
te->next = NULL;
te->next = (queue *)malloc(sizeof(queue));
te = te->next;
te->key = temp4->key;
te->next = NULL;
te->next = (queue *)malloc(sizeof(queue));
te = te->next;
te->key = temp5->key;
te->next = NULL;
last = te;
printf("%s " " %s\n", array[0]->key, array[0]->value);
printf("%s " " %s\n", array[1]->key, array[1]->value);
printf("%s " " %s\n", array[2]->key, array[2]->value);
printf("%s " " %s\n", array[3]->key, array[3]->value);
printf("%s " " %s\n", array[4]->key, array[4]->value);
printf("%s " " %s\n", array[5]->key, array[5]->value);
printf("%s\n",get(array[5]->key));
put(array[5]->key, (char *)"This is a good");
printf("%s\n",get(array[5]->key));
delet(array[5]->key);
delet(array[0]->key);
put((char *)"Hello World7", (char *)"THis is vaue7");
printf("%s %s\n",array[0]->key, array[0]->value);
}
return 0;
void print_cache() {
for(int j=0;j<MAX_SIZE;j++)
{
printf("[%d] (%s) : (%s)\n", array[j]->valid, array[j]->key, array[j]->value);
}
}
#define MAX_SIZE 10
#define KEY_SIZE 256
#define VAL_SIZE 256
typedef enum{
FALSE,TRUE
}bool;
struct KV{
char key[KEY_SIZE];
char value[VAL_SIZE];
bool valid;
bool modified;
unsigned _Atomic lock;
//int lock;
};
typedef struct KV KV;
typedef struct queue queue;
struct queue{
char key[KEY_SIZE];
struct queue *next;
};
extern struct KV *array[MAX_SIZE];
extern struct queue *qu;
extern struct queue *last;
void remove_element_from_deque(char *key);
void insert_into_queue(char *key);
int find_empty_location();
int cache_del(char *key);
void cache_put(char *key, char *value);
int cache_get(char *key, char *value);
void init_cache();
void print_cache();
int remove_front_element();
\ No newline at end of file
all: KVClient.c KVServer.c KVMessageFormat.h KVClientLibrary.c
gcc -pthread KVServer.c -o server
gcc KVClient.c -o client
\ No newline at end of file
CC = gcc
SFLAGS = -pthread -g
SRV_SRC = LRU.c KVServer.c
SRV_HED = KVMessageFormat.h LRU.h
STARGET = Server
CFLAGS = -g
CLI_SRC = KVClientLibrary.c KVClient.c
CLI_HED = KVMessageFormat.h KVClientLibrary.h
CTARGET = Client
.PHONY: server
.PHONY: client
all: server client
client: $(CLI_SRC) $(CLI_HED)
$(CC) $(CFLAGS) $(CLI_SRC) -o $(CTARGET)
server: $(SRV_SRC) $(SRV_HED)
$(CC) $(SFLAGS) $(SRV_SRC) -o $(STARGET)
clean:
$(RM) -rf $(STARGET) $(CTARGET)
run_client:
./$(CTARGET)
run_server:
./$(STARGET)
File added
File deleted
File deleted
File deleted
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment