Commit ed5f3efd authored by Samarth Joshi's avatar Samarth Joshi

Integrated LRU cache with KVserver

parent 4174ea50
File deleted
/*
This file only has a main function
Include KVClientLibrary.c to use interfaces to make GET, PUT, DELETE requests
PA4 instructions:
The client is a separate process that will first establish a connection with the server, and then
send GET , PUT , and DELETE requests to the server process. It will have a main and use the
client library interfaces to do the actual operations.
*/
#define MAX 80
#define PORT 8000
#define SA struct sockaddr
#include "KVClientLibrary.c"
#define SA struct sockaddr
#include <stdio.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "KVClientLibrary.h"
struct message* requestMessage;
int main(int argc, char const *argv[])
......@@ -48,7 +42,8 @@ int main(int argc, char const *argv[])
//printf("%d",(int)put(sock,key, value,error));
//printf("%d",(int)del(sock,key,error));
//printf("%d",(int)get(sock,key, value,error));
while (1) {
//while (1) {
for(int i=0; i<10; i++) {
printf("%d",(int)put(sock, key, value, error));
}
......
/*
functions:
int get(void* key, void* value, void* error);
int post(void* key, void* value, void* error);
int delete(void* key, void* value, void* error);
PA4 instructions:
This is a library, which will encode and decode your request and response messages.
For example, at the client side this library will encode your request message to the decided
message format, and then send it out to the server process. Similarly, it will decode the
response received from the server, and then hand out the decoded response to the KVClient
module.
Something complimentary will happen at the server end.
Status Codes for request message
GET: 1
PUT: 2
DEL: 3
Status Codes for response message
Success: 200
Error: 400 (with the appropriate error message)
Reasons for error could be GET key not found, DEL key not found etc.
https://moodle.iitb.ac.in/mod/forum/discuss.php?d=13716
The client library really consists of only 3 functions corresponding to the get, put and del.
Internally, these messages manage encoding the request, sending the message,
waiting for the reply and returning the appropriate value.
So, a get API would like like int get(void* key, void* value, void* error);
The return value is the status code.
If the code is 200, then get will fill in the value with malloced memory.
Else, it will malloc some memory into the error pointer and return it. And so on....
*/
#include "KVMessageFormat.h"
#include<stdio.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <netdb.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include "KVClientLibrary.h"
// to print a message
void printMessage(struct message *requestMessage)
......
#include "KVMessageFormat.h"
void printMessage(struct message *requestMessage);
int get(int sockfd,char* key,char *value,char *error);
int del(int sockfd,char *key,char *error);
int put(int sockfd,char *key,char *value,char *error);
......@@ -14,6 +14,10 @@ Error: 400 (with the appropriate error message)
Reasons for error could be GET key not found, DEL key not found etc.
*/
#define STATUS_GET 1
#define STATUS_DEL 2
#define STATUS_PUT 3
struct message{
unsigned char status;
char key[256];
......
File deleted
......@@ -7,9 +7,20 @@
#include <netinet/in.h>
#include <sys/epoll.h>
#include <string.h>
#include "LRU.h"
#include "KVMessageFormat.h"
#define MAX_EVENTS 10
#define DEBUG (1)
void gen_random(char *s, const int len) {
static const char alphanum[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
for (int i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
s[len] = 0;
}
void *worker(void *args) {
struct epoll_event ev,events[MAX_EVENTS];
......@@ -19,6 +30,12 @@ void *worker(void *args) {
int read_pipe, conn_sock, nfds, i;
int epollfd;
int newfd;
int status;
// Generate name for each thread for debugging
char *name = (char *) malloc(5*sizeof(char));
gen_random(name, 5);
if DEBUG printf("[%s] Thread started!\n", name);
epollfd = epoll_create1(0);
if (epollfd == -1) {
......@@ -34,19 +51,22 @@ void *worker(void *args) {
perror("epoll_ctl: read_pipe");
exit(EXIT_FAILURE);
}
//if DEBUG printf("[%s] Added read pipe to epoll fd set!\n", name);
while (1) {
if DEBUG printf("[%s] waiting for epoll event!\n", name);
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
if DEBUG printf("[%s] %d events !\n", name, nfds);
for ( i= 0; i < nfds; ++i ) {
if (events[i].data.fd == read_pipe) {
/* if we get a request from main thread to add new client */
read(read_pipe, &newfd, sizeof(newfd));
printf("\nread %d\n", read_pipe);
if DEBUG printf("[%s][EVENT] New Client assigned by main thread fd:%d!\n", name, newfd);
ev.data.fd=newfd;
ev.events = EPOLLIN | EPOLLRDHUP;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, newfd, &ev) == -1) {
......@@ -59,8 +79,10 @@ void *worker(void *args) {
/* if we get a request from client (GET, PUSH, DEL) */
int flag = events[i].events;
if DEBUG printf("[%s][EVENT] flag set: %d\n", name, flag);
if (flag & EPOLLRDHUP) {
/* Connection was closed. */
if DEBUG printf("[%s][EVENT][EPOLLRDHUP] \n", name);
epoll_ctl( epollfd, EPOLL_CTL_DEL, events[i].data.fd , NULL );
close(events[i].data.fd);
continue;
......@@ -70,17 +92,49 @@ void *worker(void *args) {
/* Parse the actual message from client */
struct message *requestMessage= malloc(sizeof(struct message));
int readlength=read(events[i].data.fd , requestMessage, sizeof(struct message));
printf("[Message Received from client] (%d, %s, %s)\n",requestMessage->status,requestMessage->key,requestMessage->value);
/*Process the message and send appropriate response to client*/
//printf("[Message Received from client] (%d, %s, %s)\n",requestMessage->status,requestMessage->key,requestMessage->value);
if DEBUG printf("[%s][EVENT][EPOLLIN] \n", name);
switch(requestMessage->status) {
case STATUS_GET:
if DEBUG printf("[%s] GET \n", name);
memcpy(requestMessage->value, cache_get(requestMessage->key), 256);
if(requestMessage->value == 0) {
requestMessage->status = 240;
} else {
requestMessage->status = 200;
}
break;
case STATUS_PUT:
if DEBUG printf("[%s] PUT \n", name);
cache_put(requestMessage->key, requestMessage->value);
requestMessage->status = 200;
break;
case STATUS_DEL:
if DEBUG printf("[%s] DEL \n", name);
status = cache_del(requestMessage->key);
if(status == 0) {
requestMessage->status = 240;
} else {
requestMessage->status = 200;
}
break;
}
if DEBUG print_cache();
requestMessage->status=200;
write(events[i].data.fd,requestMessage, sizeof(struct message));
fflush(stdout);
write(events[i].data.fd, requestMessage, sizeof(struct message));
free(requestMessage);
}
}
}
}
free(name);
}
......@@ -123,15 +177,15 @@ int main (int argc, int argv) {
listen(sockfd, 5);
clilen = sizeof(cli_addr);
init_cache();
while(1) {
fflush(stdout);
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0)
perror("ERROR on accept");
write(pipes[next_thread_to_assign][1], &newsockfd, sizeof(newsockfd));
next_thread_to_assign = (next_thread_to_assign+1) % pool_thread_size;
}
......
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#define _GNU_SOURCE
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <stdatomic.h>
#include "LRU.h"
#define ATOMIC_TEST_AND_SET __atomic_test_and_set
#define CLEAR __atomic_clear
#define MAX_SIZE 10
struct KV *array[MAX_SIZE];
struct queue *qu;
struct queue *last;
void remove_element_from_deque(char *key)
{
queue *present = qu , *previous=NULL;
while(present->next != NULL)
{
if(present->key == key)
{
if(previous == NULL)
qu = qu->next;
if(last == present)
last = previous;
if(previous)
previous->next = present->next;
free(present);
return;
}
previous = present;
present = present->next;
}
}
void insert_into_queue(char *key)
{
queue *temp = (queue *)malloc(sizeof(queue));
temp->key = key;
temp->next = NULL;
if(qu == NULL)
{
qu = temp;
last = temp;
}
else
{
last->next = temp;
last = temp;
}
}
int find_empty_location()
{
for(int i=0;i<MAX_SIZE;i++)
{
if(array[i]->valid == FALSE)
return i;
}
}
int cache_del(char *key)
{
for(int i=0;i<MAX_SIZE;i++)
{
if(array[i]->key == key)
{
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1);
remove_element_from_deque(key);
array[i]->valid = FALSE;
CLEAR(&(array[i]->lock),0);
return 1;
}
}
return 0;
//TODO remove key from file also
}
void cache_put(char *key, char *value)
{
int indx=-1;
for(int i=0;i<MAX_SIZE;i++)
{
if(array[i]->key == key)
{
indx = i;
remove_element_from_deque(key);
break;
}
}
if(indx == -1)
{
indx = find_empty_location();
// TODO should write to file if modified is true
// replacment from cache
}
while(ATOMIC_TEST_AND_SET(&(array[indx]->lock),1) == 1);
array[indx]->key = key;
array[indx]->value = value;
array[indx]->valid = TRUE;
array[indx]->modified = TRUE;
insert_into_queue(key);
CLEAR(&(array[indx]->lock),0);
}
char* cache_get(char *key)
{
for(int i=0;i<MAX_SIZE;i++)
{
if(array[i]->key == key)
{
while(ATOMIC_TEST_AND_SET(&(array[i]->lock),1) == 1);
remove_element_from_deque(key);
insert_into_queue(key);
CLEAR(&(array[i]->lock),0);
return array[i]->value;
}
}
return 0;
}
void init_cache() {
for(int j=0;j<MAX_SIZE;j++)
{
array[j] = (KV *)malloc(sizeof(KV));
array[j]->valid = FALSE;
array[j]->lock = 0;
}
}
void print_cache() {
for(int j=0;j<MAX_SIZE;j++)
{
printf("(%s) : (%s)\n", array[j]->key, array[j]->value);
}
}
\ No newline at end of file
#define MAX_SIZE 10
typedef enum{
FALSE,TRUE
}bool;
struct KV{
char *key;
char *value;
bool valid;
bool modified;
unsigned _Atomic lock;
//int lock;
};
typedef struct KV KV;
typedef struct queue queue;
struct queue{
char *key;
struct queue *next;
};
extern struct KV *array[MAX_SIZE];
extern struct queue *qu;
extern struct queue *last;
void remove_element_from_deque(char *key);
void insert_into_queue(char *key);
int find_empty_location();
int cache_del(char *key);
void cache_put(char *key, char *value);
char* cache_get(char *key);
void init_cache();
void print_cache();
\ No newline at end of file
all: KVClient.c KVServer.c KVMessageFormat.h KVClientLibrary.c
gcc -pthread KVServer.c -o server
gcc KVClient.c -o client
\ No newline at end of file
CC = gcc
SFLAGS = -pthread -g
SRV_SRC = LRU.c KVServer.c
SRV_HED = KVMessageFormat.h LRU.h
STARGET = Server
CFLAGS = -g
CLI_SRC = KVClientLibrary.c KVClient.c
CLI_HED = KVMessageFormat.h KVClientLibrary.h
CTARGET = Client
.PHONY: server
.PHONY: client
all: server client
client: $(CLI_SRC) $(CLI_HED)
$(CC) $(CFLAGS) $(CLISRC) -o $(CTARGET)
server: $(SRV_SRV) $(SRV_HED)
$(CC) $(SFLAGS) $(SRVSRV) -o $(STARGET)
clean:
$(RM) -rf $(STARGET) $(CTARGET)
run_client:
./$(STARGET)
run_server:
./$(CTARGET)
File deleted
File deleted
File deleted
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment