Commit 83c4a0ff authored by Nilesh Jagdish's avatar Nilesh Jagdish

Server with epoll added

parent aeb6e014
#include <stdio.h>
#include "server.h"
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdlib.h>
#include "cache.h"
using namespace std;
pthread_mutex_t lock;
pthread_mutex_t listLock;
pthread_cond_t pendingClient;
list<int> connectionQueue;
void initVals(char *ip, char *portNo, int *nThreads) {
FILE *ptr = fopen("config.txt", "r");
fscanf(ptr, "%s\n%s\n%d\n", ip, portNo, nThreads);
fclose(ptr);
}
void *respondToClient(void *args) {
int threadId = *((int *)args);
cout << "Thread at work : " << threadId << "..." << endl;
static struct epoll_event events[2];
static int epfd = epoll_create(4);
int flag = 0;
pthread_mutex_lock(&listLock);
while(1) {
if(connectionQueue.size() > 0) {
int clientFd = connectionQueue.front();
cout << "Thread " << threadId << " processing client " << clientFd << endl;
connectionQueue.pop_front();
pthread_mutex_unlock(&listLock);
static struct epoll_event ev;
ev.data.fd = clientFd;
ev.events = EPOLLIN;
int res = epoll_ctl(epfd, EPOLL_CTL_ADD, clientFd, &ev);
flag = 1;
}
else if(!flag) {
cout << "Thread "<< threadId << " Still waiting...." << endl;
pthread_cond_wait(&pendingClient, &listLock);
cout << "Thread "<< threadId << " Wait finished...." << endl;
}
int nfds = epoll_wait(epfd, events, 2, 1000);
cout << "NFDS Value " << nfds << " for thread " << threadId << endl;
for(int k = 0; k < nfds; k++) {
int fd = events[k].data.fd;
char *buffer = (char *)malloc(513*sizeof(char));
int len = read(fd, buffer, 513);
buffer[len] = '\0';
char key[257], value[257], converted_output[257];
string output;
memset(key, '\0', 257);
memset(value, '\0', 257);
char todo = buffer[0];
int i, j;
if(todo != '4') {
for(i = 1; buffer[i] == '0'; i++);
memcpy(key, buffer + i, 257 - i);
for(i = 257; buffer[i] == '0'; i++);
memcpy(value, buffer + i, 513 - i);
}
string key_string, value_string;
switch(todo) {
case '1':
cout << "GET Request received" << endl;
key_string = key;
cout << key_string << endl;
output = GET(key_string);
strcpy(converted_output, output.c_str());
write(fd, converted_output, output.length());
break;
case '2':
cout << "PUT Request received" << endl;
key_string = key;
value_string = value;
cout << key_string << endl;
cout << value_string << endl;
output = PUT(key_string, value_string);
strcpy(converted_output, output.c_str());
write(fd, converted_output, output.length());
break;
case '3':
cout << "DEL Request received" << endl;
key_string = key;
output = DEL(key_string);
strcpy(converted_output, output.c_str());
write(fd, converted_output, output.length());
break;
case '4':
write(fd, "Connection terminated\0", 22);
return NULL;
}
}
}
}
int acceptConnections(char *addr, char *portNo, int nThreads) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
pthread_t clientThreads[nThreads];
int threadIDs[nThreads];
int clientNo = 0;
struct addrinfo hints, *result;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
for(int i = 0; i < nThreads; i++) {
threadIDs[i] = i;
pthread_create(&clientThreads[i], NULL, respondToClient, (void *)&threadIDs[i]);
}
int s = getaddrinfo(addr, portNo, &hints, &result);
if(s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
return -1;
}
int b = bind(sockfd, result->ai_addr, result->ai_addrlen);
if(b != 0) {
printf("Binding error\n");
return -1;
}
int l = listen(sockfd, nThreads);
if(l != 0) {
printf("Error while listening...\n");
return -1;
}
printf("Waiting for connection...\n");
while(1) {
pthread_mutex_lock(&lock);
int* clientFd = (int *)malloc(sizeof(int));
*clientFd = accept(sockfd, NULL, NULL);
clientNo++;
pthread_mutex_lock(&listLock);
connectionQueue.push_back(*clientFd);
pthread_cond_broadcast(&pendingClient);
pthread_mutex_unlock(&listLock);
// pthread_create(&clientThreads[clientNo], NULL, respondToClient, (void *)clientFd);
pthread_mutex_unlock(&lock);
}
return 0;
}
int main(int argc, char **argv) {
printf("Server is running...\n");
char ip[20], portNo[10];
int nThreads;
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&pendingClient, NULL);
pthread_mutex_init(&listLock, NULL);
initializeCache();
initVals(ip, portNo, &nThreads);
printf("IP address: %s\n", ip);
printf("Port No: %s\n", portNo);
printf("No of threads: %d\n", nThreads);
acceptConnections(ip, portNo, nThreads);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment