Commit 4acc3128 authored by Samarth Joshi's avatar Samarth Joshi

adding working threads using epoll

parent 7610e770
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#define _GNU_SOURCE
#include <fcntl.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <string.h> // memset
#define MAX_EVENTS 10
/* /*
KVServer will consist of a main thread that will perform the following steps: KVServer will consist of a main thread that will perform the following steps:
...@@ -57,4 +69,104 @@ error. (Assume each character to be 1 byte in size) ...@@ -57,4 +69,104 @@ error. (Assume each character to be 1 byte in size)
readers single writer locks for efficiency. readers single writer locks for efficiency.
KVClient KVClient
*/ */
\ No newline at end of file
void *worker(void *args) {
struct epoll_event ev, events[MAX_EVENTS];
int read_pipe, conn_sock, nfds, i;
int epollfd;
int *newfd;
read_pipe = ((int *) args)[0];
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = read_pipe;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, read_pipe, &ev) == -1) {
perror("epoll_ctl: read_pipe");
exit(EXIT_FAILURE);
}
while (1) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
for ( i= 0; i < nfds; ++i ) {
if (events[i].data.fd == read_pipe) {
printf("Hola1!");
fflush(stdout);
read(read_pipe, newfd, sizeof(newfd));
printf("read %d", *newfd);
printf("Hola2!");
fflush(stdout);
} else {
printf("im doing some work");
}
}
}
}
int main (int argc, int argv) {
int i;
int next = 0; // to decide which worker thread to assign client in round robin fashion
pthread_t *threads; // set of all worker threads
int pool_thread_size = 5; // TODO: get pool thread size from config file
int sockfd, newsockfd, portno, clilen, n;
struct sockaddr_in serv_addr, cli_addr;
//int *pipes = (int*) malloc(pool_thread_size * 2 * sizeof(pipes));
int pipes[5][2];
// initialize thread pool with initial pool size
threads = malloc(pool_thread_size * sizeof(pthread_t));
for( i=0; i < pool_thread_size; i++ ) {
pipe(pipes[i]);
pthread_create( &threads[i], NULL, worker, &pipes[i]);
}
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("ERROR opening socket");
exit(1);
}
memset(&serv_addr, 0, sizeof(serv_addr));
portno = 6969;
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(portno);
serv_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
perror("ERROR on binding");
exit(1);
}
listen(sockfd, 5);
clilen = sizeof(cli_addr);
while(1) {
printf("Hola!");
fflush(stdout);
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0)
perror("ERROR on accept");
write(pipes[next][1], &newsockfd, sizeof(newsockfd));
next = (next+1) % pool_thread_size;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment