Commit 09065428 authored by Shah Rinku's avatar Shah Rinku

kvcache read offload: mtcp hashmap code

parent 50f9697a
Makefile
epserver
epwget
log_*
# TODO: Make this Makefile.in pretty
TARGETS = epserver epwget
CC=@CC@ -g -O3 -Wall -Werror -fgnu89-inline
DPDK=@DPDK@
PS=@PSIO@
NETMAP=@NETMAP@
ONVM=@ONVM@
CCP=@CCP@
CFLAGS=@CFLAGS@
LDFLAGS=@LDFLAGS@
# If ARCH is not defined, retrive from system
ARCH ?= $(shell uname -m)
# Add arch-specific optimization
ifeq ($(ARCH),x86_64)
LIBS += -m64
endif
# mtcp library and header
MTCP_FLD =../../mtcp/
MTCP_INC =-I${MTCP_FLD}/include
MTCP_LIB =-L${MTCP_FLD}/lib
MTCP_TARGET = ${MTCP_LIB}/libmtcp.a
UTIL_FLD = ../../util
UTIL_INC = -I${UTIL_FLD}/include
UTIL_OBJ = ${UTIL_FLD}/http_parsing.o ${UTIL_FLD}/tdate_parse.o ${UTIL_FLD}/netlib.o
# util library and header
INC = -I./include/ ${UTIL_INC} ${MTCP_INC} -I${UTIL_FLD}/include
LIBS = ${MTCP_LIB}
# psio-specific variables
ifeq ($(PS),1)
PS_DIR = ../../io_engine/
PS_INC = ${PS_DIR}/include
INC += -I{PS_INC}
LIBS += -lmtcp -L${PS_DIR}/lib -lps -lpthread -lnuma -lrt
endif
# netmap-specific variables
ifeq ($(NETMAP),1)
LIBS += -lmtcp -lpthread -lnuma -lrt
endif
# dpdk-specific variables
ifeq ($(DPDK),1)
DPDK_MACHINE_LINKER_FLAGS=$${RTE_SDK}/$${RTE_TARGET}/lib/ldflags.txt
DPDK_MACHINE_LDFLAGS=$(shell cat ${DPDK_MACHINE_LINKER_FLAGS})
LIBS += -g -O3 -pthread -lrt -march=native ${MTCP_FLD}/lib/libmtcp.a -lnuma -lmtcp -lpthread -lrt -ldl -lgmp -L${RTE_SDK}/${RTE_TARGET}/lib ${DPDK_MACHINE_LDFLAGS} ${LDFLAGS}
endif
# onvm-specific variables
ifeq ($(ONVM),1)
ifeq ($(RTE_TARGET),)
$(error "Please define RTE_TARGET environment variable")
endif
INC += -I@ONVMLIBPATH@/onvm_nflib
INC += -I@ONVMLIBPATH@/lib
INC += -DENABLE_ONVM
LIBS += @ONVMLIBPATH@/onvm_nflib/$(RTE_TARGET)/libonvm.a
LIBS += @ONVMLIBPATH@/lib/$(RTE_TARGET)/lib/libonvmhelper.a -lm
endif
ifeq ($V,) # no echo
export MSG=@echo
export HIDE=@
else
export MSG=@\#
export HIDE=
endif
ifeq ($(CCP), 1)
# LIBCCP
LIBCCP = $(MTCP_FLD)/src/libccp
LIBS += -L$(LIBCCP) -lccp -lstartccp
INC += -I$(LIBCCP)
endif
all: epserver epwget
epserver.o: epserver.c
$(MSG) " CC $<"
$(HIDE) ${CC} -c $< ${CFLAGS} ${INC}
epserver: epserver.o ${MTCP_FLD}/lib/libmtcp.a
$(MSG) " LD $<"
$(HIDE) ${CC} $< ${LIBS} ${UTIL_OBJ} -o $@
epwget.o: epwget.c
$(MSG) " CC $<"
$(HIDE) ${CC} -c $< ${CFLAGS} ${INC}
epwget: epwget.o ${MTCP_FLD}/lib/libmtcp.a
$(MSG) " LD $<"
$(HIDE) ${CC} $< ${LIBS} ${UTIL_OBJ} -o $@
clean:
$(MSG) " CLEAN $(TARGETS)"
$(HIDE) rm -f *~ *.o ${TARGETS} log_*
distclean: clean
rm -rf Makefile
TARGETS = epserver epwget
CC = gcc -g -O3
DPDK=0
PS=0
# DPDK LIBRARY and HEADER
DPDK_INC=../../dpdk/include
DPDK_LIB=../../dpdk/lib/
# mtcp library and header
MTCP_FLD =../../mtcp/
MTCP_INC =-I${MTCP_FLD}/include
MTCP_LIB =-L${MTCP_FLD}/lib
MTCP_TARGET = ${MTCP_LIB}/libmtcp.a
UTIL_FLD = ../../util
UTIL_INC = -I${UTIL_FLD}/include
UTIL_OBJ = ${UTIL_FLD}/http_parsing.o ${UTIL_FLD}/tdate_parse.o
PS_DIR = ../../io_engine/
PS_INC = ${PS_DIR}/include
INC = -I./include/ ${UTIL_INC} ${MTCP_INC} -I${UTIL_FLD}/include
LIBS = ${MTCP_LIB}
ifeq ($(PS),1)
INC += -I{PS_INC}
LIBS += -lmtcp -L${PS_DIR}/lib -lps -lpthread -lnuma -lrt
endif
# CFLAGS for DPDK-related compilation
INC += ${MTCP_INC}
ifeq ($(DPDK),1)
INC += -DENABLE_DPDK -DRTE_MACHINE_CPUFLAG_SSE -DRTE_MACHINE_CPUFLAG_SSE2 -DRTE_MACHINE_CPUFLAG_SSE3 \
-DRTE_MACHINE_CPUFLAG_SSSE3 -DRTE_MACHINE_CPUFLAG_SSE4_1 -DRTE_MACHINE_CPUFLAG_SSE4_2 \
-DRTE_MACHINE_CPUFLAG_AES -DRTE_MACHINE_CPUFLAG_PCLMULQDQ -DRTE_MACHINE_CPUFLAG_AVX \
-DRTE_COMPILE_TIME_CPUFLAGS=RTE_CPUFLAG_SSE,RTE_CPUFLAG_SSE2,RTE_CPUFLAG_SSE3,RTE_CPUFLAG_SSSE3,RTE_CPUFLAG_SSE4_1,RTE_CPUFLAG_SSE4_2,RTE_CPUFLAG_AES,RTE_CPUFLAG_PCLMULQDQ,RTE_CPUFLAG_AVX -I${DPDK_INC} \
-include ${DPDK_INC}/rte_config.h
endif
ifeq ($(DPDK),1)
LIBS += -m64 -g -O3 -pthread -lrt -march=native -Wl,-export-dynamic ${MTCP_FLD}/lib/libmtcp.a -L../../dpdk/lib -Wl,-lnuma -Wl,-lmtcp -Wl,-lpthread -Wl,-lrt -Wl,-ldl -Wl,--whole-archive -Wl,-lrte_distributor -Wl,-lrte_kni -Wl,-lrte_pipeline -Wl,-lrte_table -Wl,-lrte_port -Wl,-lrte_timer -Wl,-lrte_hash -Wl,-lrte_lpm -Wl,-lrte_power -Wl,-lrte_acl -Wl,-lrte_meter -Wl,-lrte_sched -Wl,-lm -Wl,-lrt -Wl,--start-group -Wl,-lrte_kvargs -Wl,-lrte_mbuf -Wl,-lrte_ip_frag -Wl,-lethdev -Wl,-lrte_malloc -Wl,-lrte_mempool -Wl,-lrte_ring -Wl,-lrte_eal -Wl,-lrte_cmdline -Wl,-lrte_cfgfile -Wl,-lrte_pmd_bond -Wl,-lrte_pmd_vmxnet3_uio -Wl,-lrte_pmd_i40e -Wl,-lrte_pmd_ixgbe -Wl,-lrte_pmd_e1000 -Wl,-lrte_pmd_ring -Wl,-lrt -Wl,-lm -Wl,-ldl -Wl,--end-group -Wl,--no-whole-archive
endif
all: epserver epwget
epserver.o: epserver.c
${CC} -c $< ${CFLAGS} ${INC}
epserver: epserver.o
${CC} $< ${LIBS} ${UTIL_OBJ} -o $@
epwget.o: epwget.c
${CC} -c $< ${CFLAGS} ${INC}
epwget: epwget.o
${CC} $< ${LIBS} ${UTIL_OBJ} -o $@
clean:
rm -f *~ *.o ${TARGETS} log_*
========================================================================
USAGE OF EXAMPLE APPLICATIONS
========================================================================
epserver: a simple mtcp-epoll-based web server
Single-Process, Multi-threaded Usage:
./epserver -p www_home -f epserver.conf [-N #cores]
ex) ./epserver -p /home/notav/www -f epserver.conf -N 8
Multi-Process, Single-threaded Usage [DPDK-only]
(Master runs on core 0 by default, Slave processes on core 1~N)
ex) ./epserver -p /home/notav/www -f epserver-multiprocess.conf -c 0
for i in {1..7}
do
./epserver -p /home/notav/www -f epserver-mutliprocess.conf -c $i
done
options:
www_home: the directory to server. # max files are limited to
MAX_FILES in epserver.c:36
-N: number of CPU cores to use. default: all existing cores
-p: path to www/ files
-f: path to mtcp configuration file
-c: the core_id on which the process should run
[only works for multi-process mode]
========================================================================
epwget: simple mtcp-epoll-based http request generator
Single-Process, Multi-threaded Usage:
usage: ./epwget URL #requests [-N #cores] [-c concurrency] -f $mtcp_conf
ex) ./epwget 10.0.0.43/example.txt 10000000 -N 8 -c 8000 -f epwget.conf
Multi-Process, Single-threaded Usage [DPDK-only]
(Master runs on core 0 by default, Slave processes on core 1~N)
usage: ./epwget URL #requests -n $core [-c concurrency] -f $mtcp_conf
ex) ./epwget 10.0.0.43/example.txt 10000000 -n 0 -c 1000 -f epwget-multiprocess.conf
for i in {1..7}
do
./epwget 10.0.0.43/example.txt 1000000 -n $i -c 1000 -f epwget-multiprocess.conf
done
options:
URL: url of the content to download.
#requests: number of requests to generate
-N: number of CPU cores to use. default: min(# cores, # requests)
-c: number of maximum concurrent connections. default: 100
-f: path to mtcp configuration file
-n: the core_id on which the process should run
[only works for multi-process mode]
notes:
- epwget can use a range of IP addresses for larger concurrent
connections that cannot be in an IP. you can set it in epwget.c:33.
- epwget overrides some part of the settings in epgwet.conf and uses
mtcp_setconf() internally to apply the input arguments to the
configuration.
========================================================================
ONVM setups:
The config file provides simple onvm mtcp setups such as:
- simple endpoint server
- Enable `io = onvm`, and `onvm_serv = 1` in epserver.conf
- Run onvm
$ onvm/go.sh 1,2,3 1 -s stdout
- Run epserver
$ sudo ./epserver -p /path/to/www -f epserver.conf -N 1
- local client/server setup
- Enable `io = onvm`, `onvm_serv` = 1, and `onvm_dest = 2` in epserver.conf
- Enable `io = onvm`, `onvm_serv` = 2, and `onvm_dest = 1` in epwget.conf
- Run onvm
$ onvm/go.sh 1,2,3 1 -s stdout
- Run epserver
$ sudo ./epserver -p /path/to/www -f epserver.conf -N 1
- Run epwget
$ sudo ./epwget $SERVER_IP/foo.html 10000000 -N 1 -c 1024 -f epwget.conf
PLEASE NOTE THAT YOU WILL HAVE TO ADD STATIC ARP TABLE ENTRIES TO RUN
MTCP IN ONVM MODE.
========================================================================
Contact: mtcp-user at list.ndsl.kaist.edu
April 2, 2014.
EunYoung Jeong <notav at ndsl.kaist.edu>
M. Asim Jamshed <ajamshed at ndsl.kaist.edu>
# This file is to configure static arp tables.
# Rename this file to arp.conf and set the appropriate values.
# Please save this file as config/arp.conf. Put the config/
# directory in the same directory where the binary lies.
#
# (Destination IP address/IP_prefix) (Destination MAC address)
ARP_ENTRY 5
192.168.220.35/32 0c:42:a1:df:ac:48
192.168.220.34/32 0c:42:a1:df:ac:49
192.168.220.33/32 ee:cc:b5:55:cc:dc
192.168.220.60/32 0c:42:a1:df:ac:42
192.168.220.61/32 0c:42:a1:df:ac:43
# This file is routing table example of our testbed machine
# Copy this file to route.conf and give appropriate routes
# Please save this file as config/route.conf. Put the config/
# directory in the same directory where the binary lies.
#
# (Destination address)/(Prefix) (Device name)
#
#
# Add entry for default gateway route as:
# w.x.y.z/0 dpdk0
# Always put the default gateway route as the last entry.
# Make sure that the mask (Prefix) is set to 0. For example,
# if the default gateway IP address is 10.0.0.10, then the
# entry will be:
# 10.0.0.10/0 dpdk0
#
ROUTES 5
192.168.220.34/24 ens259f0
192.168.220.35/24 ens259f1
192.168.220.33/24 ens259f1
192.168.220.60/24 ens259f1
192.168.220.61/24 ens259f0
############### mtcp configuration file ###############
# The underlying I/O module you want to use. Please
# enable only one out of the three.
#io = psio
#io = netmap
io = dpdk
# No. of cores setting (enabling this option will override
# the `cpu' config for those applications that accept
# num_cores as command line arguments)
#
# e.g. in case ./epserver is executed with `-N 4', the
# mtcp core will still invoke 8 mTCP threads if the
# following line is uncommented.
#num_cores = 8
# Number of memory channels per processor socket (dpdk-only)
num_mem_ch = 4
# Enable multi-process support
multiprocess = 1
# Used port (please adjust accordingly)
#------ PSIO ports -------#
#port = xge0 xge1
#port = xge1
#------ DPDK ports -------#
port = dpdk0
#port = dpdk1
#port = dpdk0 dpdk1
# Maximum concurrency per core (default = 10000)
#max_concurrency = 10000
# Maximum number of socket buffers per core (default = 10000)
# Set this to small value if there are many idle connections
#max_num_buffers = 10000
# Receive buffer size of sockets; if not set: rcvbuf = sndbuf
rcvbuf = 8192
# Send buffer size of sockets; if not set: sndbuf = rcvbuf
sndbuf = 8192
# if sndbuf & rcvbuf not set: sndbuf = rcvbuf = 8192
# TCP timeout seconds
# (tcp_timeout = -1 can disable the timeout check)
tcp_timeout = 30
# TCP timewait seconds
tcp_timewait = 0
# Interface to print stats (please adjust accordingly)
# You can enable multiple ports in separate lines
#------ PSIO ports -------#
#stat_print = xge0
#stat_print = xge1
#------ DPDK ports -------#
stat_print = dpdk0
#stat_print = dpdk1
#######################################################
\ No newline at end of file
#define _LARGEFILE64_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <dirent.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <signal.h>
#include <limits.h>
#include <mtcp_api.h>
#include <mtcp_epoll.h>
#include "cpu.h"
#include "http_parsing.h"
#include "netlib.h"
#include "debug.h"
#define MAX_FLOW_NUM (10000)
#define RCVBUF_SIZE (2*1024)
#define SNDBUF_SIZE (8*1024)
#define MAX_EVENTS (MAX_FLOW_NUM * 3)
#define HTTP_HEADER_LEN 1024
#define URL_LEN 128
#define MAX_FILES 30
#define NAME_LIMIT 256
#define FULLNAME_LIMIT 512
#ifndef TRUE
#define TRUE (1)
#endif
#ifndef FALSE
#define FALSE (0)
#endif
#ifndef ERROR
#define ERROR (-1)
#endif
#define HT_SUPPORT FALSE
#ifndef MAX_CPUS
#define MAX_CPUS 16
#endif
#define CAPACITY 50000 // Size of the Hash Table
/*----------------------------------------------------------------------------*/
struct file_cache
{
char name[NAME_LIMIT];
char fullname[FULLNAME_LIMIT];
uint64_t size;
char *file;
};
/*----------------------------------------------------------------------------*/
struct server_vars
{
char request[HTTP_HEADER_LEN];
int recv_len;
int request_len;
long int total_read, total_sent;
uint8_t done;
uint8_t rspheader_sent;
uint8_t keep_alive;
int fidx; // file cache index
char fname[NAME_LIMIT]; // file name
long int fsize; // file size
char key[7];
char value[7];
};
/*----------------------------------------------------------------------------*/
struct thread_context
{
mctx_t mctx;
int ep;
struct server_vars *svars;
};
/*----------------------------------------------------------------------------*/
static int num_cores;
static int core_limit;
static pthread_t app_thread[MAX_CPUS];
static int done[MAX_CPUS];
static char *conf_file = NULL;
static int backlog = -1;
/*----------------------------------------------------------------------------*/
const char *www_main;
static struct file_cache fcache[MAX_FILES];
static int nfiles;
/*----------------------------------------------------------------------------*/
static int finished;
/*----------------------------------------------------------------------------*/
unsigned long hash_function(char* str) {
unsigned long i = 0;
for (int j=0; str[j]; j++)
i += str[j];
return i % CAPACITY;
}
typedef struct Ht_item Ht_item;
// Define the Hash Table Item here
struct Ht_item {
char* key;
char* value;
};
typedef struct LinkedList LinkedList;
// Define the Linkedlist here
struct LinkedList {
Ht_item* item;
LinkedList* next;
};
typedef struct HashTable HashTable;
// Define the Hash Table here
struct HashTable {
// Contains an array of pointers
// to items
Ht_item** items;
LinkedList** overflow_buckets;
int size;
int count;
};
struct HashTable* ht;
static LinkedList* allocate_list () {
// Allocates memory for a Linkedlist pointer
LinkedList* list = (LinkedList*) malloc (sizeof(LinkedList));
return list;
}
static LinkedList* linkedlist_insert(LinkedList* list, Ht_item* item) {
// Inserts the item onto the Linked List
if (!list) {
LinkedList* head = allocate_list();
head->item = item;
head->next = NULL;
list = head;
return list;
}
else if (list->next == NULL) {
LinkedList* node = allocate_list();
node->item = item;
node->next = NULL;
list->next = node;
return list;
}
LinkedList* temp = list;
// @rinku
while (temp->next) { //->next) {
temp = temp->next;
}
LinkedList* node = allocate_list();
node->item = item;
node->next = NULL;
temp->next = node;
return list;
}
/*static Ht_item* linkedlist_remove(LinkedList* list) {
// Removes the head from the linked list
// and returns the item of the popped element
if (!list)
return NULL;
if (!list->next)
return NULL;
LinkedList* node = list->next;
LinkedList* temp = list;
temp->next = NULL;
list = node;
Ht_item* it = NULL;
memcpy(temp->item, it, sizeof(Ht_item));
free(temp->item->key);
free(temp->item->value);
free(temp->item);
free(temp);
return it;
}*/
static void free_linkedlist(LinkedList* list) {
LinkedList* temp = list;
while (list) {
temp = list;
list = list->next;
free(temp->item->key);
free(temp->item->value);
free(temp->item);
free(temp);
}
}
static LinkedList** create_overflow_buckets(HashTable* table) {
// Create the overflow buckets; an array of linkedlists
LinkedList** buckets = (LinkedList**) calloc (table->size, sizeof(LinkedList*));
for (int i=0; i<table->size; i++)
buckets[i] = NULL;
return buckets;
}
static void free_overflow_buckets(HashTable* table) {
// Free all the overflow bucket lists
LinkedList** buckets = table->overflow_buckets;
for (int i=0; i<table->size; i++)
free_linkedlist(buckets[i]);
free(buckets);
}
Ht_item* create_item(char* key, char* value) {
// Creates a pointer to a new hash table item
Ht_item* item = (Ht_item*) malloc (sizeof(Ht_item));
item->key = (char*) malloc (strlen(key) + 1);
item->value = (char*) malloc (strlen(value) + 1);
strcpy(item->key, key);
strcpy(item->value, value);
return item;
}
HashTable* create_table(int size) {
// Creates a new HashTable
HashTable* table = (HashTable*) malloc (sizeof(HashTable));
table->size = size;
table->count = 0;
table->items = (Ht_item**) calloc (table->size, sizeof(Ht_item*));
for (int i=0; i<table->size; i++)
table->items[i] = NULL;
table->overflow_buckets = create_overflow_buckets(table);
return table;
}
void free_item(Ht_item* item) {
// Frees an item
free(item->key);
free(item->value);
free(item);
}
void free_table(HashTable* table) {
// Frees the table
for (int i=0; i<table->size; i++) {
Ht_item* item = table->items[i];
if (item != NULL)
free_item(item);
}
free_overflow_buckets(table);
free(table->items);
free(table);
}
void handle_collision(HashTable* table, unsigned long index, Ht_item* item) {
LinkedList* head = table->overflow_buckets[index];
if (head == NULL) {
// We need to create the list
head = allocate_list();
head->item = item;
table->overflow_buckets[index] = head;
return;
}
else {
// Insert to the list
table->overflow_buckets[index] = linkedlist_insert(head, item);
return;
}
}
void ht_insert(HashTable* table, char* key, char* value) {
// Create the item
Ht_item* item = create_item(key, value);
// Compute the index
unsigned long index = hash_function(key);
Ht_item* current_item = table->items[index];
//if (strcmp(key,"1234") == 0)
// printf("%s %s %ld \n", key, value, index);
if (current_item == NULL) {
// Key does not exist.
if (table->count == table->size) {
// Hash Table Full
printf("Insert Error: Hash Table is full\n");
// Remove the create item
free_item(item);
return;
}
// Insert directly
table->items[index] = item;
table->count++;
}
else {
// Scenario 1: We only need to update value
if (strcmp(current_item->key, key) == 0) {
strcpy(table->items[index]->value, value);
return;
}
else {
// Scenario 2: Collision
handle_collision(table, index, item);
return;
}
}
}
char* ht_search(HashTable* table, char* key) {
// Searches the key in the hashtable
// and returns NULL if it doesn't exist
int index = hash_function(key);
Ht_item* item = table->items[index];
LinkedList* head = table->overflow_buckets[index];
// Ensure that we move to items which are not NULL
while (item != NULL) {
if (strcmp(item->key, key) == 0)
return item->value;
if (head == NULL)
return NULL;
item = head->item;
head = head->next;
}
return NULL;
}
void ht_delete(HashTable* table, char* key) {
// Deletes an item from the table
int index = hash_function(key);
Ht_item* item = table->items[index];
LinkedList* head = table->overflow_buckets[index];
if (item == NULL) {
// Does not exist. Return
return;
}
else {
if (head == NULL && strcmp(item->key, key) == 0) {
// No collision chain. Remove the item
// and set table index to NULL
table->items[index] = NULL;
free_item(item);
table->count--;
return;
}
else if (head != NULL) {
// Collision Chain exists
if (strcmp(item->key, key) == 0) {
// Remove this item and set the head of the list
// as the new item
free_item(item);
LinkedList* node = head;
head = head->next;
node->next = NULL;
table->items[index] = create_item(node->item->key, node->item->value);
free_linkedlist(node);
table->overflow_buckets[index] = head;
return;
}
LinkedList* curr = head;
LinkedList* prev = NULL;
while (curr) {
if (strcmp(curr->item->key, key) == 0) {
if (prev == NULL) {
// First element of the chain. Remove the chain
free_linkedlist(head);
table->overflow_buckets[index] = NULL;
return;
}
else {
// This is somewhere in the chain
prev->next = curr->next;
curr->next = NULL;
free_linkedlist(curr);
table->overflow_buckets[index] = head;
return;
}
}
curr = curr->next;
prev = curr;
}
}
}
}
void print_search(HashTable* table, char* key) {
char* val;
if ((val = ht_search(table, key)) == NULL) {
printf("%s does not exist\n", key);
return;
}
else {
printf("Key:%s, Value:%s\n", key, val);
}
}
char* return_search(HashTable* table, char* key) {
char* val;
if ((val = ht_search(table, key)) == NULL) {
//printf("%s does not exist\n", key);
return "0";
}
else {
//printf("Key:%s, Value:%s\n", key, val);
return val;
}
}
void print_table(HashTable* table) {
printf("\n-------------------\n");
for (int i=0; i<table->size; i++) {
if (table->items[i]) {
printf("Index:%d, Key:%s, Value:%s", i, table->items[i]->key, table->items[i]->value);
if (table->overflow_buckets[i]) {
printf(" => Overflow Bucket => ");
LinkedList* head = table->overflow_buckets[i];
while (head) {
printf("Key:%s, Value:%s ", head->item->key, head->item->value);
head = head->next;
}
}
printf("\n");
}
}
printf("-------------------\n");
}
/*----------------------------------------------------------------------------*/
static char *
StatusCodeToString(int scode)
{
switch (scode) {
case 200:
return "OK";
break;
case 404:
return "Not Found";
break;
}
return NULL;
}
/*----------------------------------------------------------------------------*/
void
CleanServerVariable(struct server_vars *sv)
{
sv->recv_len = 0;
sv->request_len = 0;
sv->total_read = 0;
sv->total_sent = 0;
sv->done = 0;
sv->rspheader_sent = 0;
sv->keep_alive = 0;
}
/*----------------------------------------------------------------------------*/
void
CloseConnection(struct thread_context *ctx, int sockid, struct server_vars *sv)
{
mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_DEL, sockid, NULL);
mtcp_close(ctx->mctx, sockid);
}
/*----------------------------------------------------------------------------*/
static int
SendUntilAvailable(struct thread_context *ctx, int sockid, struct server_vars *sv)
{
int ret;
int sent;
int len;
if (sv->done || !sv->rspheader_sent) {
return 0;
}
sent = 0;
ret = 1;
/*
//@rinku: key value generation
char k[7]; // v[6];
int r = rand() % CAPACITY;
sprintf(k, "%d", r);
char* v = return_search(ht, k);
//int tmp_val = atoi(v);
char tmp_val[7];
sprintf(tmp_val, "%s", v);
//data->val = htons(tmp_val);
//printf("K: %s, V: %s\n", k, tmp_val);
memcpy(sv->key, k, sizeof(sv->key));
memcpy(sv->value, tmp_val, sizeof(sv->value));
char tmp_str[14];
strcat(tmp_str, sv->key);
strcat(tmp_str, sv->value);
ret = mtcp_write(ctx->mctx, sockid,
tmp_str, strlen(tmp_str));
if (ret < 0) {
TRACE_APP("Connection closed with client.\n");
//break;
}
*/
while (ret > 0) {
len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent);
if (len <= 0) {
break;
}
ret = mtcp_write(ctx->mctx, sockid,
fcache[sv->fidx].file + sv->total_sent, len);
if (ret < 0) {
TRACE_APP("Connection closed with client.\n");
break;
}
TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret);
sent += ret;
sv->total_sent += ret;
}
if (sv->total_sent >= fcache[sv->fidx].size) {
struct mtcp_epoll_event ev;
sv->done = TRUE;
finished++;
if (sv->keep_alive) {
/* if keep-alive connection, wait for the incoming request */
ev.events = MTCP_EPOLLIN;
ev.data.sockid = sockid;
mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_MOD, sockid, &ev);
CleanServerVariable(sv);
} else {
/* else, close connection */
CloseConnection(ctx, sockid, sv);
}
}
return sent;
}
/*----------------------------------------------------------------------------*/
static int
HandleReadEvent(struct thread_context *ctx, int sockid, struct server_vars *sv)
{
struct mtcp_epoll_event ev;
char buf[HTTP_HEADER_LEN];
char url[URL_LEN];
char response[HTTP_HEADER_LEN];
int scode; // status code
time_t t_now;
char t_str[128];
char keepalive_str[128];
int rd;
int i;
int len;
int sent;
/* HTTP request handling */
rd = mtcp_read(ctx->mctx, sockid, buf, HTTP_HEADER_LEN);
if (rd <= 0) {
return rd;
}
memcpy(sv->request + sv->recv_len,
(char *)buf, MIN(rd, HTTP_HEADER_LEN - sv->recv_len));
sv->recv_len += rd;
//sv->request[rd] = '\0';
//fprintf(stderr, "HTTP Request: \n%s", request);
sv->request_len = find_http_header(sv->request, sv->recv_len);
if (sv->request_len <= 0) {
TRACE_ERROR("Socket %d: Failed to parse HTTP request header.\n"
"read bytes: %d, recv_len: %d, "
"request_len: %d, strlen: %ld, request: \n%s\n",
sockid, rd, sv->recv_len,
sv->request_len, strlen(sv->request), sv->request);
return rd;
}
http_get_url(sv->request, sv->request_len, url, URL_LEN);
TRACE_APP("Socket %d URL: %s\n", sockid, url);
sprintf(sv->fname, "%s%s", www_main, url);
TRACE_APP("Socket %d File name: %s\n", sockid, sv->fname);
sv->keep_alive = FALSE;
if (http_header_str_val(sv->request, "Connection: ",
strlen("Connection: "), keepalive_str, 128)) {
if (strstr(keepalive_str, "Keep-Alive")) {
sv->keep_alive = TRUE;
} else if (strstr(keepalive_str, "Close")) {
sv->keep_alive = FALSE;
}
}
/* Find file in cache */
scode = 404;
for (i = 0; i < nfiles; i++) {
if (strcmp(sv->fname, fcache[i].fullname) == 0) {
sv->fsize = fcache[i].size;
sv->fidx = i;
scode = 200;
break;
}
}
TRACE_APP("Socket %d File size: %ld (%ldMB)\n",
sockid, sv->fsize, sv->fsize / 1024 / 1024);
//@rinku key value generation
char k[7]; // v[6];
int r = rand() % CAPACITY;
sprintf(k, "%d", r);
char* v = return_search(ht, k);
//int tmp_val = atoi(v);
char tmp_val[7];
sprintf(tmp_val, "%s", v);
//data->val = htons(tmp_val);
//printf("K: %s, V: %s\n", k, tmp_val);
memcpy(sv->key, k, sizeof(sv->key));
memcpy(sv->value, tmp_val, sizeof(sv->value));
/* Response header handling */
time(&t_now);
strftime(t_str, 128, "%a, %d %b %Y %X GMT", gmtime(&t_now));
if (sv->keep_alive)
sprintf(keepalive_str, "Keep-Alive");
else
sprintf(keepalive_str, "Close");
/*sprintf(response, "HTTP/1.1 %d %s\r\n"
"Date: %s\r\n"
"Server: Webserver on Middlebox TCP (Ubuntu)\r\n"
"Content-Length: %ld\r\n"
"Connection: %s\r\n\r\n",
scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str);
*/
/*sprintf(response, "HTTP/1.1 %d %s\r\n"
"Date: %s\r\n"
"Server: Webserver on Middlebox TCP (Ubuntu)\r\n"
"Content-Length: %ld\r\n"
"Connection: %s\r\n\r\n",
scode, StatusCodeToString(scode), t_str, sv->fsize+strlen(sv->key)+strlen(sv->key), keepalive_str);
*/
sprintf(response, "HTTP/1.1 %d %s\r\n"
"Date: %s\r\n"
"Server: Webserver on Middlebox TCP (Ubuntu)\r\n"
"Content-Length: %ld\r\n"
"Connection: %s\r\n\r\n"
"K: %s\r\n"
"V: %s\r\n",
scode, StatusCodeToString(scode), t_str, sv->fsize, keepalive_str, sv->key, sv->value);
len = strlen(response); //-strlen(sv->key)-strlen(sv->value)-2;
//len = sizeof(response);
//printf("len: %d\n", len);
TRACE_APP("Socket %d HTTP Response: \n%s", sockid, response);
sent = mtcp_write(ctx->mctx, sockid, response, len);
TRACE_APP("Socket %d Sent response header: try: %d, sent: %d\n",
sockid, len, sent);
assert(sent == len);
sv->rspheader_sent = TRUE;
ev.events = MTCP_EPOLLIN | MTCP_EPOLLOUT;
ev.data.sockid = sockid;
mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_MOD, sockid, &ev);
SendUntilAvailable(ctx, sockid, sv);
return rd;
}
/*----------------------------------------------------------------------------*/
int
AcceptConnection(struct thread_context *ctx, int listener)
{
mctx_t mctx = ctx->mctx;
struct server_vars *sv;
struct mtcp_epoll_event ev;
int c;
c = mtcp_accept(mctx, listener, NULL, NULL);
if (c >= 0) {
if (c >= MAX_FLOW_NUM) {
TRACE_ERROR("Invalid socket id %d.\n", c);
return -1;
}
sv = &ctx->svars[c];
CleanServerVariable(sv);
TRACE_APP("New connection %d accepted.\n", c);
ev.events = MTCP_EPOLLIN;
ev.data.sockid = c;
mtcp_setsock_nonblock(ctx->mctx, c);
mtcp_epoll_ctl(mctx, ctx->ep, MTCP_EPOLL_CTL_ADD, c, &ev);
TRACE_APP("Socket %d registered.\n", c);
} else {
if (errno != EAGAIN) {
TRACE_ERROR("mtcp_accept() error %s\n",
strerror(errno));
}
}
return c;
}
/*----------------------------------------------------------------------------*/
struct thread_context *
InitializeServerThread(int core)
{
struct thread_context *ctx;
/* affinitize application thread to a CPU core */
#if HT_SUPPORT
mtcp_core_affinitize(core + (num_cores / 2));
#else
mtcp_core_affinitize(core);
#endif /* HT_SUPPORT */
ctx = (struct thread_context *)calloc(1, sizeof(struct thread_context));
if (!ctx) {
TRACE_ERROR("Failed to create thread context!\n");
return NULL;
}
/* create mtcp context: this will spawn an mtcp thread */
ctx->mctx = mtcp_create_context(core);
if (!ctx->mctx) {
TRACE_ERROR("Failed to create mtcp context!\n");
free(ctx);
return NULL;
}
/* create epoll descriptor */
ctx->ep = mtcp_epoll_create(ctx->mctx, MAX_EVENTS);
if (ctx->ep < 0) {
mtcp_destroy_context(ctx->mctx);
free(ctx);
TRACE_ERROR("Failed to create epoll descriptor!\n");
return NULL;
}
/* allocate memory for server variables */
ctx->svars = (struct server_vars *)
calloc(MAX_FLOW_NUM, sizeof(struct server_vars));
if (!ctx->svars) {
mtcp_close(ctx->mctx, ctx->ep);
mtcp_destroy_context(ctx->mctx);
free(ctx);
TRACE_ERROR("Failed to create server_vars struct!\n");
return NULL;
}
return ctx;
}
/*----------------------------------------------------------------------------*/
int
CreateListeningSocket(struct thread_context *ctx)
{
int listener;
struct mtcp_epoll_event ev;
struct sockaddr_in saddr;
int ret;
/* create socket and set it as nonblocking */
listener = mtcp_socket(ctx->mctx, AF_INET, SOCK_STREAM, 0);
if (listener < 0) {
TRACE_ERROR("Failed to create listening socket!\n");
return -1;
}
ret = mtcp_setsock_nonblock(ctx->mctx, listener);
if (ret < 0) {
TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
return -1;
}
/* bind to port 80 */
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = INADDR_ANY;
saddr.sin_port = htons(80);
ret = mtcp_bind(ctx->mctx, listener,
(struct sockaddr *)&saddr, sizeof(struct sockaddr_in));
if (ret < 0) {
TRACE_ERROR("Failed to bind to the listening socket!\n");
return -1;
}
/* listen (backlog: can be configured) */
ret = mtcp_listen(ctx->mctx, listener, backlog);
if (ret < 0) {
TRACE_ERROR("mtcp_listen() failed!\n");
return -1;
}
/* wait for incoming accept events */
ev.events = MTCP_EPOLLIN;
ev.data.sockid = listener;
mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_ADD, listener, &ev);
return listener;
}
/*----------------------------------------------------------------------------*/
void *
RunServerThread(void *arg)
{
int core = *(int *)arg;
struct thread_context *ctx;
mctx_t mctx;
int listener;
int ep;
struct mtcp_epoll_event *events;
int nevents;
int i, ret;
int do_accept;
/* initialization */
ctx = InitializeServerThread(core);
if (!ctx) {
TRACE_ERROR("Failed to initialize server thread.\n");
return NULL;
}
mctx = ctx->mctx;
ep = ctx->ep;
events = (struct mtcp_epoll_event *)
calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event));
if (!events) {
TRACE_ERROR("Failed to create event struct!\n");
exit(-1);
}
listener = CreateListeningSocket(ctx);
if (listener < 0) {
TRACE_ERROR("Failed to create listening socket.\n");
exit(-1);
}
while (!done[core]) {
nevents = mtcp_epoll_wait(mctx, ep, events, MAX_EVENTS, -1);
if (nevents < 0) {
if (errno != EINTR)
perror("mtcp_epoll_wait");
break;
}
do_accept = FALSE;
for (i = 0; i < nevents; i++) {
if (events[i].data.sockid == listener) {
/* if the event is for the listener, accept connection */
do_accept = TRUE;
} else if (events[i].events & MTCP_EPOLLERR) {
int err;
socklen_t len = sizeof(err);
/* error on the connection */
TRACE_APP("[CPU %d] Error on socket %d\n",
core, events[i].data.sockid);
if (mtcp_getsockopt(mctx, events[i].data.sockid,
SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
if (err != ETIMEDOUT) {
fprintf(stderr, "Error on socket %d: %s\n",
events[i].data.sockid, strerror(err));
}
} else {
perror("mtcp_getsockopt");
}
CloseConnection(ctx, events[i].data.sockid,
&ctx->svars[events[i].data.sockid]);
} else if (events[i].events & MTCP_EPOLLIN) {
ret = HandleReadEvent(ctx, events[i].data.sockid,
&ctx->svars[events[i].data.sockid]);
if (ret == 0) {
/* connection closed by remote host */
CloseConnection(ctx, events[i].data.sockid,
&ctx->svars[events[i].data.sockid]);
} else if (ret < 0) {
/* if not EAGAIN, it's an error */
if (errno != EAGAIN) {
CloseConnection(ctx, events[i].data.sockid,
&ctx->svars[events[i].data.sockid]);
}
}
} else if (events[i].events & MTCP_EPOLLOUT) {
struct server_vars *sv = &ctx->svars[events[i].data.sockid];
if (sv->rspheader_sent) {
SendUntilAvailable(ctx, events[i].data.sockid, sv);
} else {
TRACE_APP("Socket %d: Response header not sent yet.\n",
events[i].data.sockid);
}
} else {
assert(0);
}
}
/* if do_accept flag is set, accept connections */
if (do_accept) {
while (1) {
ret = AcceptConnection(ctx, listener);
if (ret < 0)
break;
}
}
}
/* destroy mtcp context: this will kill the mtcp thread */
mtcp_destroy_context(mctx);
pthread_exit(NULL);
return NULL;
}
/*----------------------------------------------------------------------------*/
void
SignalHandler(int signum)
{
int i;
for (i = 0; i < core_limit; i++) {
if (app_thread[i] == pthread_self()) {
//TRACE_INFO("Server thread %d got SIGINT\n", i);
done[i] = TRUE;
} else {
if (!done[i]) {
pthread_kill(app_thread[i], signum);
}
}
}
}
/*----------------------------------------------------------------------------*/
static void
printHelp(const char *prog_name)
{
TRACE_CONFIG("%s -p <path_to_www/> -f <mtcp_conf_file> "
"[-N num_cores] [-c <per-process core_id>] [-h]\n",
prog_name);
exit(EXIT_SUCCESS);
}
/*----------------------------------------------------------------------------*/
int
main(int argc, char **argv)
{
DIR *dir;
struct dirent *ent;
int fd;
int ret;
uint64_t total_read;
struct mtcp_conf mcfg;
int cores[MAX_CPUS];
int process_cpu;
int i, o;
//HashTable* ht = create_table(CAPACITY);
ht = create_table(CAPACITY);
num_cores = GetNumCPUs();
core_limit = num_cores;
process_cpu = -1;
dir = NULL;
if (argc < 2) {
TRACE_CONFIG("$%s directory_to_service\n", argv[0]);
return FALSE;
}
while (-1 != (o = getopt(argc, argv, "N:f:p:c:b:h"))) {
switch (o) {
case 'p':
/* open the directory to serve */
www_main = optarg;
dir = opendir(www_main);
if (!dir) {
TRACE_CONFIG("Failed to open %s.\n", www_main);
perror("opendir");
return FALSE;
}
break;
case 'N':
core_limit = mystrtol(optarg, 10);
if (core_limit > num_cores) {
TRACE_CONFIG("CPU limit should be smaller than the "
"number of CPUs: %d\n", num_cores);
return FALSE;
}
/**
* it is important that core limit is set
* before mtcp_init() is called. You can
* not set core_limit after mtcp_init()
*/
mtcp_getconf(&mcfg);
mcfg.num_cores = core_limit;
mtcp_setconf(&mcfg);
break;
case 'f':
conf_file = optarg;
break;
case 'c':
process_cpu = mystrtol(optarg, 10);
if (process_cpu > core_limit) {
TRACE_CONFIG("Starting CPU is way off limits!\n");
return FALSE;
}
break;
case 'b':
backlog = mystrtol(optarg, 10);
break;
case 'h':
printHelp(argv[0]);
break;
}
}
if (dir == NULL) {
TRACE_CONFIG("You did not pass a valid www_path!\n");
exit(EXIT_FAILURE);
}
nfiles = 0;
while ((ent = readdir(dir)) != NULL) {
if (strcmp(ent->d_name, ".") == 0)
continue;
else if (strcmp(ent->d_name, "..") == 0)
continue;
snprintf(fcache[nfiles].name, NAME_LIMIT, "%s", ent->d_name);
snprintf(fcache[nfiles].fullname, FULLNAME_LIMIT, "%s/%s",
www_main, ent->d_name);
fd = open(fcache[nfiles].fullname, O_RDONLY);
if (fd < 0) {
perror("open");
continue;
} else {
fcache[nfiles].size = lseek64(fd, 0, SEEK_END);
lseek64(fd, 0, SEEK_SET);
}
fcache[nfiles].file = (char *)malloc(fcache[nfiles].size);
if (!fcache[nfiles].file) {
TRACE_CONFIG("Failed to allocate memory for file %s\n",
fcache[nfiles].name);
perror("malloc");
continue;
}
TRACE_INFO("Reading %s (%lu bytes)\n",
fcache[nfiles].name, fcache[nfiles].size);
total_read = 0;
//@rinku
//HashTable* ht = create_table(CAPACITY);
//ht = create_table(CAPACITY);
for (int i=0; i<CAPACITY; i++){
char k[6], v[6];
int val = i+1;
sprintf(k, "%d", i);
sprintf(v, "%d", val);
ht_insert(ht,k,v);
}
print_search(ht, "45678");
//print_search(ht, "1234");
/*ht_insert(ht, "1", "First address");
ht_insert(ht, "2", "Second address");
print_search(ht, "1");
print_search(ht, "2");
print_search(ht, "3");*/
//print_table(ht);
//free_table(ht);
while (1) {
ret = read(fd, fcache[nfiles].file + total_read,
fcache[nfiles].size - total_read);
if (ret < 1) {
break;
} else if (ret == 0) {
break;
}
total_read += ret;
}
if (total_read < fcache[nfiles].size) {
free(fcache[nfiles].file);
continue;
}
close(fd);
nfiles++;
if (nfiles >= MAX_FILES)
break;
}
finished = 0;
/* initialize mtcp */
if (conf_file == NULL) {
TRACE_CONFIG("You forgot to pass the mTCP startup config file!\n");
exit(EXIT_FAILURE);
}
ret = mtcp_init(conf_file);
if (ret) {
TRACE_CONFIG("Failed to initialize mtcp\n");
exit(EXIT_FAILURE);
}
mtcp_getconf(&mcfg);
if (backlog > mcfg.max_concurrency) {
TRACE_CONFIG("backlog can not be set larger than CONFIG.max_concurrency\n");
return FALSE;
}
/* if backlog is not specified, set it to 4K */
if (backlog == -1) {
backlog = 4096;
}
/* register signal handler to mtcp */
mtcp_register_signal(SIGINT, SignalHandler);
TRACE_INFO("Application initialization finished.\n");
for (i = ((process_cpu == -1) ? 0 : process_cpu); i < core_limit; i++) {
cores[i] = i;
done[i] = FALSE;
if (pthread_create(&app_thread[i],
NULL, RunServerThread, (void *)&cores[i])) {
perror("pthread_create");
TRACE_CONFIG("Failed to create server thread.\n");
exit(EXIT_FAILURE);
}
if (process_cpu != -1)
break;
}
for (i = ((process_cpu == -1) ? 0 : process_cpu); i < core_limit; i++) {
pthread_join(app_thread[i], NULL);
if (process_cpu != -1)
break;
}
mtcp_destroy();
closedir(dir);
if (i == 0)
free_table(ht);
return 0;
}
############### mtcp configuration file ###############
# The underlying I/O module you want to use. Please
# enable only one out of the four.
#io = psio
#io = netmap
#io = onvm
io = dpdk
# No. of cores setting (enabling this option will override
# the `cpu' config for those applications that accept
# num_cores as command line arguments)
#
# e.g. in case ./epserver is executed with `-N 4', the
# mtcp core will still invoke 8 mTCP threads if the
# following line is uncommented.
num_cores = 4
# Core mask
#core_mask = 0000000F0
core_mask = 0F
# number of TX descriptor ring size, the default is 128 (dpdk-only)
# e.g. in case of VMXNET3 PMD, the min TX ring size is 512
# this allows mTCP app runs on VMware ESXi VM
num_tx_desc = 512
# number of RX descriptor ring size, the default is 128 (dpdk-only)
num_rx_desc = 128
# Number of memory channels per processor socket (dpdk-only)
num_mem_ch = 4
#--- ONVM specific args ---#
# Service id (required)
#onvm_serv = 1
# Instance id (optional)
#onvm_inst = 1
# Destination id (will forward to another NF)
# If not set will send packets out
#onvm_dest = 2
# Sample ONVM configurations
# Single node epserver <-> epwget
#onvm_serv = 1
#onvm_dest = 2
# Simple endpoint server multi node setup
#onvm_serv = 1
#--------------------------#
# Enable multi-process support
#multiprocess = 1
# Used port (please adjust accordingly)
#------ PSIO ports -------#
#port = xge0 xge1
#port = xge1
#------ DPDK ports -------#
port = ens259f1 #ens259f0
#port = dpdk1
#port = dpdk0 dpdk1
# Congestion control algorithm
# (only available when configured with --enable-ccp)
# cc = reno
# cc = cubic
# Maximum concurrency per core (default = 10000)
#max_concurrency = 10000
# Maximum number of socket buffers per core (default = 10000)
# Set this to small value if there are many idle connections
#max_num_buffers = 10000
# Receive buffer size of sockets; if not set: rcvbuf = sndbuf
rcvbuf = 8192
# Send buffer size of sockets; if not set: sndbuf = rcvbuf
sndbuf = 8192
# if sndbuf & rcvbuf not set: sndbuf = rcvbuf = 8192
# TCP timeout seconds
# (tcp_timeout = -1 can disable the timeout check)
tcp_timeout = 30
# TCP timewait seconds
tcp_timewait = 0
# Interface to print stats (please adjust accordingly)
# You can enable multiple ports in a line
#------ PSIO ports -------#
#stat_print = xge0
#stat_print = xge1
#------ DPDK ports -------#
stat_print = ens259f1 #ens259f0
#stat_print = dpdk0 dpdk1
#######################################################
############### mtcp configuration file ###############
# The underlying I/O module you want to use. Please
# enable only one out of the three.
#io = psio
#io = netmap
io = dpdk
# No. of cores setting (enabling this option will override
# the `cpu' config for those applications that accept
# num_cores as command line arguments)
#
# e.g. in case ./epserver is executed with `-N 4', the
# mtcp core will still invoke 8 mTCP threads if the
# following line is uncommented.
#num_cores = 8
# Number of memory channels per processor socket (dpdk-only)
num_mem_ch = 4
# Enable multi-process support
multiprocess = 1
# Used port (please adjust accordingly)
#------ PSIO ports -------#
#port = xge0 xge1
#port = xge1
#------ DPDK ports -------#
port = dpdk0
#port = dpdk1
#port = dpdk0 dpdk1
# Maximum concurrency per core (default = 10000)
#max_concurrency = 10000
# Maximum number of socket buffers per core (default = 10000)
# Set this to small value if there are many idle connections
#max_num_buffers = 10000
# Receive buffer size of sockets; if not set: rcvbuf = sndbuf
rcvbuf = 8192
# Send buffer size of sockets; if not set: sndbuf = rcvbuf
sndbuf = 8192
# if sndbuf & rcvbuf not set: sndbuf = rcvbuf = 8192
# TCP timeout seconds
# (tcp_timeout = -1 can disable the timeout check)
tcp_timeout = 30
# TCP timewait seconds
tcp_timewait = 0
# Interface to print stats (please adjust accordingly)
# You can enable multiple ports in separate lines
#------ PSIO ports -------#
#stat_print = xge0
#stat_print = xge1
#------ DPDK ports -------#
stat_print = dpdk0
#stat_print = dpdk1
#######################################################
\ No newline at end of file
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/queue.h>
#include <assert.h>
#include <limits.h>
#include <mtcp_api.h>
#include <mtcp_epoll.h>
#include "cpu.h"
#include "rss.h"
#include "http_parsing.h"
#include "netlib.h"
#include "debug.h"
#define MAX_URL_LEN 128
#define FILE_LEN 128
#define FILE_IDX 10
#define MAX_FILE_LEN (FILE_LEN + FILE_IDX)
#define HTTP_HEADER_LEN 1024
#define IP_RANGE 1
#define MAX_IP_STR_LEN 16
#define BUF_SIZE (8*1024)
#define CALC_MD5SUM FALSE
#define TIMEVAL_TO_MSEC(t) ((t.tv_sec * 1000) + (t.tv_usec / 1000))
#define TIMEVAL_TO_USEC(t) ((t.tv_sec * 1000000) + (t.tv_usec))
#define TS_GT(a,b) ((int64_t)((a)-(b)) > 0)
#ifndef TRUE
#define TRUE (1)
#endif
#ifndef FALSE
#define FALSE (0)
#endif
#ifndef ERROR
#define ERROR (-1)
#endif
#ifndef MAX_CPUS
#define MAX_CPUS 16
#endif
/*----------------------------------------------------------------------------*/
static pthread_t app_thread[MAX_CPUS];
static mctx_t g_mctx[MAX_CPUS];
static int done[MAX_CPUS];
/*----------------------------------------------------------------------------*/
static int num_cores;
static int core_limit;
/*----------------------------------------------------------------------------*/
static int fio = FALSE;
static char outfile[FILE_LEN + 1];
/*----------------------------------------------------------------------------*/
static char host[MAX_IP_STR_LEN + 1] = {'\0'};
static char url[MAX_URL_LEN + 1] = {'\0'};
static in_addr_t daddr;
static in_port_t dport;
static in_addr_t saddr;
/*----------------------------------------------------------------------------*/
static int total_flows;
static int flows[MAX_CPUS];
static int flowcnt = 0;
static int concurrency;
static int max_fds;
static uint64_t response_size = 0;
/*----------------------------------------------------------------------------*/
struct wget_stat
{
uint64_t waits;
uint64_t events;
uint64_t connects;
uint64_t reads;
uint64_t writes;
uint64_t completes;
uint64_t errors;
uint64_t timedout;
uint64_t sum_resp_time;
uint64_t max_resp_time;
};
/*----------------------------------------------------------------------------*/
struct thread_context
{
int core;
mctx_t mctx;
int ep;
struct wget_vars *wvars;
int target;
int started;
int errors;
int incompletes;
int done;
int pending;
struct wget_stat stat;
};
typedef struct thread_context* thread_context_t;
/*----------------------------------------------------------------------------*/
struct wget_vars
{
int request_sent;
char response[HTTP_HEADER_LEN];
int resp_len;
int headerset;
uint32_t header_len;
uint64_t file_len;
uint64_t recv;
uint64_t write;
struct timeval t_start;
struct timeval t_end;
int fd;
};
/*----------------------------------------------------------------------------*/
static struct thread_context *g_ctx[MAX_CPUS] = {0};
static struct wget_stat *g_stat[MAX_CPUS] = {0};
/*----------------------------------------------------------------------------*/
thread_context_t
CreateContext(int core)
{
thread_context_t ctx;
ctx = (thread_context_t)calloc(1, sizeof(struct thread_context));
if (!ctx) {
perror("malloc");
TRACE_ERROR("Failed to allocate memory for thread context.\n");
return NULL;
}
ctx->core = core;
ctx->mctx = mtcp_create_context(core);
if (!ctx->mctx) {
TRACE_ERROR("Failed to create mtcp context.\n");
free(ctx);
return NULL;
}
g_mctx[core] = ctx->mctx;
return ctx;
}
/*----------------------------------------------------------------------------*/
void
DestroyContext(thread_context_t ctx)
{
g_stat[ctx->core] = NULL;
mtcp_destroy_context(ctx->mctx);
free(ctx);
}
/*----------------------------------------------------------------------------*/
static inline int
CreateConnection(thread_context_t ctx)
{
mctx_t mctx = ctx->mctx;
struct mtcp_epoll_event ev;
struct sockaddr_in addr;
int sockid;
int ret;
sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);
if (sockid < 0) {
TRACE_INFO("Failed to create socket!\n");
return -1;
}
memset(&ctx->wvars[sockid], 0, sizeof(struct wget_vars));
ret = mtcp_setsock_nonblock(mctx, sockid);
if (ret < 0) {
TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
exit(-1);
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = daddr;
addr.sin_port = dport;
ret = mtcp_connect(mctx, sockid,
(struct sockaddr *)&addr, sizeof(struct sockaddr_in));
if (ret < 0) {
if (errno != EINPROGRESS) {
perror("mtcp_connect");
mtcp_close(mctx, sockid);
return -1;
}
}
ctx->started++;
ctx->pending++;
ctx->stat.connects++;
ev.events = MTCP_EPOLLOUT;
ev.data.sockid = sockid;
mtcp_epoll_ctl(mctx, ctx->ep, MTCP_EPOLL_CTL_ADD, sockid, &ev);
return sockid;
}
/*----------------------------------------------------------------------------*/
static inline void
CloseConnection(thread_context_t ctx, int sockid)
{
mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_DEL, sockid, NULL);
mtcp_close(ctx->mctx, sockid);
ctx->pending--;
ctx->done++;
assert(ctx->pending >= 0);
while (ctx->pending < concurrency && ctx->started < ctx->target) {
if (CreateConnection(ctx) < 0) {
done[ctx->core] = TRUE;
break;
}
}
}
/*----------------------------------------------------------------------------*/
static inline int
SendHTTPRequest(thread_context_t ctx, int sockid, struct wget_vars *wv)
{
char request[HTTP_HEADER_LEN];
struct mtcp_epoll_event ev;
int wr;
int len;
wv->headerset = FALSE;
wv->recv = 0;
wv->header_len = wv->file_len = 0;
snprintf(request, HTTP_HEADER_LEN, "GET %s HTTP/1.0\r\n"
"User-Agent: Wget/1.12 (linux-gnu)\r\n"
"Accept: */*\r\n"
"Host: %s\r\n"
// "Connection: Keep-Alive\r\n\r\n",
"Connection: Close\r\n\r\n",
url, host);
len = strlen(request);
wr = mtcp_write(ctx->mctx, sockid, request, len);
if (wr < len) {
TRACE_ERROR("Socket %d: Sending HTTP request failed. "
"try: %d, sent: %d\n", sockid, len, wr);
}
ctx->stat.writes += wr;
TRACE_APP("Socket %d HTTP Request of %d bytes. sent.\n", sockid, wr);
wv->request_sent = TRUE;
ev.events = MTCP_EPOLLIN;
ev.data.sockid = sockid;
mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_MOD, sockid, &ev);
gettimeofday(&wv->t_start, NULL);
char fname[MAX_FILE_LEN + 1];
if (fio) {
snprintf(fname, MAX_FILE_LEN, "%s.%d", outfile, flowcnt++);
wv->fd = open(fname, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (wv->fd < 0) {
TRACE_APP("Failed to open file descriptor for %s\n", fname);
exit(1);
}
}
return 0;
}
/*----------------------------------------------------------------------------*/
static inline int
DownloadComplete(thread_context_t ctx, int sockid, struct wget_vars *wv)
{
#ifdef APP
mctx_t mctx = ctx->mctx;
#endif
uint64_t tdiff;
TRACE_APP("Socket %d File download complete!\n", sockid);
gettimeofday(&wv->t_end, NULL);
CloseConnection(ctx, sockid);
ctx->stat.completes++;
if (response_size == 0) {
response_size = wv->recv;
fprintf(stderr, "Response size set to %lu\n", response_size);
} else {
if (wv->recv != response_size) {
fprintf(stderr, "Response size mismatch! mine: %lu, theirs: %lu\n",
wv->recv, response_size);
}
}
tdiff = (wv->t_end.tv_sec - wv->t_start.tv_sec) * 1000000 +
(wv->t_end.tv_usec - wv->t_start.tv_usec);
TRACE_APP("Socket %d Total received bytes: %lu (%luMB)\n",
sockid, wv->recv, wv->recv / 1000000);
TRACE_APP("Socket %d Total spent time: %lu us\n", sockid, tdiff);
if (tdiff > 0) {
TRACE_APP("Socket %d Average bandwidth: %lf[MB/s]\n",
sockid, (double)wv->recv / tdiff);
}
ctx->stat.sum_resp_time += tdiff;
if (tdiff > ctx->stat.max_resp_time)
ctx->stat.max_resp_time = tdiff;
if (fio && wv->fd > 0)
close(wv->fd);
return 0;
}
/*----------------------------------------------------------------------------*/
static inline int
HandleReadEvent(thread_context_t ctx, int sockid, struct wget_vars *wv)
{
mctx_t mctx = ctx->mctx;
char buf[BUF_SIZE];
char *pbuf;
int rd, copy_len;
rd = 1;
while (rd > 0) {
rd = mtcp_read(mctx, sockid, buf, BUF_SIZE);
if (rd <= 0)
break;
ctx->stat.reads += rd;
TRACE_APP("Socket %d: mtcp_read ret: %d, total_recv: %lu, "
"header_set: %d, header_len: %u, file_len: %lu\n",
sockid, rd, wv->recv + rd,
wv->headerset, wv->header_len, wv->file_len);
pbuf = buf;
if (!wv->headerset) {
copy_len = MIN(rd, HTTP_HEADER_LEN - wv->resp_len);
memcpy(wv->response + wv->resp_len, buf, copy_len);
wv->resp_len += copy_len;
wv->header_len = find_http_header(wv->response, wv->resp_len);
if (wv->header_len > 0) {
wv->response[wv->header_len] = '\0';
wv->file_len = http_header_long_val(wv->response,
CONTENT_LENGTH_HDR, sizeof(CONTENT_LENGTH_HDR) - 1);
if (wv->file_len < 0) {
/* failed to find the Content-Length field */
wv->recv += rd;
rd = 0;
CloseConnection(ctx, sockid);
return 0;
}
TRACE_APP("Socket %d Parsed response header. "
"Header length: %u, File length: %lu (%luMB)\n",
sockid, wv->header_len,
wv->file_len, wv->file_len / 1024 / 1024);
wv->headerset = TRUE;
wv->recv += (rd - (wv->resp_len - wv->header_len));
pbuf += (rd - (wv->resp_len - wv->header_len));
rd = (wv->resp_len - wv->header_len);
//printf("Successfully parse header.\n");
//fflush(stdout);
} else {
/* failed to parse response header */
#if 0
printf("[CPU %d] Socket %d Failed to parse response header."
" Data: \n%s\n", ctx->core, sockid, wv->response);
fflush(stdout);
#endif
wv->recv += rd;
rd = 0;
ctx->stat.errors++;
ctx->errors++;
CloseConnection(ctx, sockid);
return 0;
}
//pbuf += wv->header_len;
//wv->recv += wv->header_len;
//rd -= wv->header_len;
}
wv->recv += rd;
if (fio && wv->fd > 0) {
int wr = 0;
while (wr < rd) {
int _wr = write(wv->fd, pbuf + wr, rd - wr);
assert (_wr == rd - wr);
if (_wr < 0) {
perror("write");
TRACE_ERROR("Failed to write.\n");
assert(0);
break;
}
wr += _wr;
wv->write += _wr;
}
}
if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
break;
}
}
if (rd > 0) {
if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
TRACE_APP("Socket %d Done Write: "
"header: %u file: %lu recv: %lu write: %lu\n",
sockid, wv->header_len, wv->file_len,
wv->recv - wv->header_len, wv->write);
DownloadComplete(ctx, sockid, wv);
return 0;
}
} else if (rd == 0) {
/* connection closed by remote host */
TRACE_DBG("Socket %d connection closed with server.\n", sockid);
if (wv->header_len && (wv->recv >= wv->header_len + wv->file_len)) {
DownloadComplete(ctx, sockid, wv);
} else {
ctx->stat.errors++;
ctx->incompletes++;
CloseConnection(ctx, sockid);
}
} else if (rd < 0) {
if (errno != EAGAIN) {
TRACE_DBG("Socket %d: mtcp_read() error %s\n",
sockid, strerror(errno));
ctx->stat.errors++;
ctx->errors++;
CloseConnection(ctx, sockid);
}
}
return 0;
}
/*----------------------------------------------------------------------------*/
#if 0
void
PrintStats()
{
#define LINE_LEN 2048
char line[LINE_LEN];
int total_trans;
int i;
total_trans = 0;
line[0] = '\0';
//sprintf(line, "Trans/s: ");
for (i = 0; i < core_limit; i++) {
//sprintf(line + strlen(line), "%6d ", g_trans[i]);
sprintf(line + strlen(line), "[CPU%2d] %7d trans/s ", i, g_trans[i]);
total_trans += g_trans[i];
g_trans[i] = 0;
if (i % 4 == 3)
sprintf(line + strlen(line), "\n");
}
fprintf(stderr, "%s", line);
fprintf(stderr, "[ ALL ] %7d trans/s\n", total_trans);
//sprintf(line + strlen(line), "total: %6d", total_trans);
//printf("%s\n", line);
//fprintf(stderr, "Transactions/s: %d\n", total_trans);
fflush(stderr);
}
#endif
/*----------------------------------------------------------------------------*/
static void
PrintStats()
{
struct wget_stat total = {0};
struct wget_stat *st;
uint64_t avg_resp_time;
uint64_t total_resp_time = 0;
int i;
for (i = 0; i < core_limit; i++) {
st = g_stat[i];
if (st == NULL) continue;
avg_resp_time = st->completes? st->sum_resp_time / st->completes : 0;
#if 0
fprintf(stderr, "[CPU%2d] epoll_wait: %5lu, event: %7lu, "
"connect: %7lu, read: %4lu MB, write: %4lu MB, "
"completes: %7lu (resp_time avg: %4lu, max: %6lu us), "
"errors: %2lu (timedout: %2lu)\n",
i, st->waits, st->events, st->connects,
st->reads / 1024 / 1024, st->writes / 1024 / 1024,
st->completes, avg_resp_time, st->max_resp_time,
st->errors, st->timedout);
#endif
total.waits += st->waits;
total.events += st->events;
total.connects += st->connects;
total.reads += st->reads;
total.writes += st->writes;
total.completes += st->completes;
total_resp_time += avg_resp_time;
if (st->max_resp_time > total.max_resp_time)
total.max_resp_time = st->max_resp_time;
total.errors += st->errors;
total.timedout += st->timedout;
memset(st, 0, sizeof(struct wget_stat));
}
fprintf(stderr, "[ ALL ] connect: %7lu, read: %4lu MB, write: %4lu MB, "
"completes: %7lu (resp_time avg: %4lu, max: %6lu us)\n",
total.connects,
total.reads / 1024 / 1024, total.writes / 1024 / 1024,
total.completes, total_resp_time / core_limit, total.max_resp_time);
#if 0
fprintf(stderr, "[ ALL ] epoll_wait: %5lu, event: %7lu, "
"connect: %7lu, read: %4lu MB, write: %4lu MB, "
"completes: %7lu (resp_time avg: %4lu, max: %6lu us), "
"errors: %2lu (timedout: %2lu)\n",
total.waits, total.events, total.connects,
total.reads / 1024 / 1024, total.writes / 1024 / 1024,
total.completes, total_resp_time / core_limit, total.max_resp_time,
total.errors, total.timedout);
#endif
}
/*----------------------------------------------------------------------------*/
void *
RunWgetMain(void *arg)
{
thread_context_t ctx;
mctx_t mctx;
int core = *(int *)arg;
struct in_addr daddr_in;
int n, maxevents;
int ep;
struct mtcp_epoll_event *events;
int nevents;
struct wget_vars *wvars;
int i;
struct timeval cur_tv, prev_tv;
//uint64_t cur_ts, prev_ts;
mtcp_core_affinitize(core);
ctx = CreateContext(core);
if (!ctx) {
return NULL;
}
mctx = ctx->mctx;
g_ctx[core] = ctx;
g_stat[core] = &ctx->stat;
srand(time(NULL));
mtcp_init_rss(mctx, saddr, IP_RANGE, daddr, dport);
n = flows[core];
if (n == 0) {
TRACE_DBG("Application thread %d finished.\n", core);
pthread_exit(NULL);
return NULL;
}
ctx->target = n;
daddr_in.s_addr = daddr;
fprintf(stderr, "Thread %d handles %d flows. connecting to %s:%u\n",
core, n, inet_ntoa(daddr_in), ntohs(dport));
/* Initialization */
maxevents = max_fds * 3;
ep = mtcp_epoll_create(mctx, maxevents);
if (ep < 0) {
TRACE_ERROR("Failed to create epoll struct!n");
exit(EXIT_FAILURE);
}
events = (struct mtcp_epoll_event *)
calloc(maxevents, sizeof(struct mtcp_epoll_event));
if (!events) {
TRACE_ERROR("Failed to allocate events!\n");
exit(EXIT_FAILURE);
}
ctx->ep = ep;
wvars = (struct wget_vars *)calloc(max_fds, sizeof(struct wget_vars));
if (!wvars) {
TRACE_ERROR("Failed to create wget variables!\n");
exit(EXIT_FAILURE);
}
ctx->wvars = wvars;
ctx->started = ctx->done = ctx->pending = 0;
ctx->errors = ctx->incompletes = 0;
gettimeofday(&cur_tv, NULL);
//prev_ts = TIMEVAL_TO_USEC(cur_tv);
prev_tv = cur_tv;
while (!done[core]) {
gettimeofday(&cur_tv, NULL);
//cur_ts = TIMEVAL_TO_USEC(cur_tv);
/* print statistics every second */
if (core == 0 && cur_tv.tv_sec > prev_tv.tv_sec) {
PrintStats();
prev_tv = cur_tv;
}
while (ctx->pending < concurrency && ctx->started < ctx->target) {
if (CreateConnection(ctx) < 0) {
done[core] = TRUE;
break;
}
}
nevents = mtcp_epoll_wait(mctx, ep, events, maxevents, -1);
ctx->stat.waits++;
if (nevents < 0) {
if (errno != EINTR) {
TRACE_ERROR("mtcp_epoll_wait failed! ret: %d\n", nevents);
}
done[core] = TRUE;
break;
} else {
ctx->stat.events += nevents;
}
for (i = 0; i < nevents; i++) {
if (events[i].events & MTCP_EPOLLERR) {
int err;
socklen_t len = sizeof(err);
TRACE_APP("[CPU %d] Error on socket %d\n",
core, events[i].data.sockid);
ctx->stat.errors++;
ctx->errors++;
if (mtcp_getsockopt(mctx, events[i].data.sockid,
SOL_SOCKET, SO_ERROR, (void *)&err, &len) == 0) {
if (err == ETIMEDOUT)
ctx->stat.timedout++;
}
CloseConnection(ctx, events[i].data.sockid);
} else if (events[i].events & MTCP_EPOLLIN) {
HandleReadEvent(ctx,
events[i].data.sockid, &wvars[events[i].data.sockid]);
} else if (events[i].events == MTCP_EPOLLOUT) {
struct wget_vars *wv = &wvars[events[i].data.sockid];
if (!wv->request_sent) {
SendHTTPRequest(ctx, events[i].data.sockid, wv);
} else {
//TRACE_DBG("Request already sent.\n");
}
} else {
TRACE_ERROR("Socket %d: event: %s\n",
events[i].data.sockid, EventToString(events[i].events));
assert(0);
}
}
if (ctx->done >= ctx->target) {
fprintf(stdout, "[CPU %d] Completed %d connections, "
"errors: %d incompletes: %d\n",
ctx->core, ctx->done, ctx->errors, ctx->incompletes);
break;
}
}
TRACE_INFO("Wget thread %d waiting for mtcp to be destroyed.\n", core);
DestroyContext(ctx);
TRACE_DBG("Wget thread %d finished.\n", core);
pthread_exit(NULL);
return NULL;
}
/*----------------------------------------------------------------------------*/
void
SignalHandler(int signum)
{
int i;
for (i = 0; i < core_limit; i++) {
done[i] = TRUE;
}
}
/*----------------------------------------------------------------------------*/
int
main(int argc, char **argv)
{
struct mtcp_conf mcfg;
char *conf_file;
int cores[MAX_CPUS];
int flow_per_thread;
int flow_remainder_cnt;
int total_concurrency = 0;
int ret;
int i, o;
int process_cpu;
if (argc < 3) {
TRACE_CONFIG("Too few arguments!\n");
TRACE_CONFIG("Usage: %s url #flows [output]\n", argv[0]);
return FALSE;
}
if (strlen(argv[1]) > MAX_URL_LEN) {
TRACE_CONFIG("Length of URL should be smaller than %d!\n", MAX_URL_LEN);
return FALSE;
}
char* slash_p = strchr(argv[1], '/');
if (slash_p) {
strncpy(host, argv[1], slash_p - argv[1]);
strncpy(url, strchr(argv[1], '/'), MAX_URL_LEN);
} else {
strncpy(host, argv[1], MAX_IP_STR_LEN);
strncpy(url, "/", 2);
}
conf_file = NULL;
process_cpu = -1;
daddr = inet_addr(host);
dport = htons(80);
saddr = INADDR_ANY;
total_flows = mystrtol(argv[2], 10);
if (total_flows <= 0) {
TRACE_CONFIG("Number of flows should be large than 0.\n");
return FALSE;
}
num_cores = GetNumCPUs();
core_limit = num_cores;
concurrency = 100;
while (-1 != (o = getopt(argc, argv, "N:c:o:n:f:"))) {
switch(o) {
case 'N':
core_limit = mystrtol(optarg, 10);
if (core_limit > num_cores) {
TRACE_CONFIG("CPU limit should be smaller than the "
"number of CPUS: %d\n", num_cores);
return FALSE;
} else if (core_limit < 1) {
TRACE_CONFIG("CPU limit should be greater than 0\n");
return FALSE;
}
/**
* it is important that core limit is set
* before mtcp_init() is called. You can
* not set core_limit after mtcp_init()
*/
mtcp_getconf(&mcfg);
mcfg.num_cores = core_limit;
mtcp_setconf(&mcfg);
break;
case 'c':
total_concurrency = mystrtol(optarg, 10);
break;
case 'o':
if (strlen(optarg) > MAX_FILE_LEN) {
TRACE_CONFIG("Output file length should be smaller than %d!\n",
MAX_FILE_LEN);
return FALSE;
}
fio = TRUE;
strncpy(outfile, optarg, FILE_LEN);
break;
case 'n':
process_cpu = mystrtol(optarg, 10);
if (process_cpu > core_limit) {
TRACE_CONFIG("Starting CPU is way off limits!\n");
return FALSE;
}
break;
case 'f':
conf_file = optarg;
break;
}
}
if (total_flows < core_limit) {
core_limit = total_flows;
}
/* per-core concurrency = total_concurrency / # cores */
if (total_concurrency > 0)
concurrency = total_concurrency / core_limit;
/* set the max number of fds 3x larger than concurrency */
max_fds = concurrency * 3;
TRACE_CONFIG("Application configuration:\n");
TRACE_CONFIG("URL: %s\n", url);
TRACE_CONFIG("# of total_flows: %d\n", total_flows);
TRACE_CONFIG("# of cores: %d\n", core_limit);
TRACE_CONFIG("Concurrency: %d\n", total_concurrency);
if (fio) {
TRACE_CONFIG("Output file: %s\n", outfile);
}
if (conf_file == NULL) {
TRACE_ERROR("mTCP configuration file is not set!\n");
exit(EXIT_FAILURE);
}
ret = mtcp_init(conf_file);
if (ret) {
TRACE_ERROR("Failed to initialize mtcp.\n");
exit(EXIT_FAILURE);
}
mtcp_getconf(&mcfg);
mcfg.max_concurrency = max_fds;
mcfg.max_num_buffers = max_fds;
mtcp_setconf(&mcfg);
mtcp_register_signal(SIGINT, SignalHandler);
flow_per_thread = total_flows / core_limit;
flow_remainder_cnt = total_flows % core_limit;
for (i = ((process_cpu == -1) ? 0 : process_cpu); i < core_limit; i++) {
cores[i] = i;
done[i] = FALSE;
flows[i] = flow_per_thread;
if (flow_remainder_cnt-- > 0)
flows[i]++;
if (flows[i] == 0)
continue;
if (pthread_create(&app_thread[i],
NULL, RunWgetMain, (void *)&cores[i])) {
perror("pthread_create");
TRACE_ERROR("Failed to create wget thread.\n");
exit(-1);
}
if (process_cpu != -1)
break;
}
for (i = ((process_cpu == -1) ? 0 : process_cpu); i < core_limit; i++) {
pthread_join(app_thread[i], NULL);
TRACE_INFO("Wget thread %d joined.\n", i);
if (process_cpu != -1)
break;
}
mtcp_destroy();
return 0;
}
/*----------------------------------------------------------------------------*/
############### mtcp configuration file ###############
# The underlying I/O module you want to use. Please
# enable only one out of the four.
#io = psio
#io = onvm
#io = netmap
io = dpdk
# No. of cores setting (enabling this option will override
# the `cpu' config for those applications that accept
# num_cores as command line arguments)
#
# e.g. in case ./epwget is executed with `-N 4', the
# mtcp core will still invoke 8 mTCP threads if the
# following line is uncommented.
num_cores = 4
# number of TX descriptor ring size, the default is 128 (dpdk-only)
# e.g. in case of VMXNET3 PMD, the min TX ring size is 512
# this allows mTCP app runs on VMware ESXi VM
num_tx_desc = 512
# number of RX descriptor ring size, the default is 128 (dpdk-only)
num_rx_desc = 128
# Number of memory channels per processor socket (dpdk-only)
num_mem_ch = 4
#--- ONVM specific args ---#
# Service id (required)
#onvm_serv = 2
# Dest id (used to forward traffic to specific NF)
#onvm_dest = 1
# Sample ONVM configurations
# Single node epserver <-> epwget
#onvm_serv = 2
#onvm_dest = 1
# Simple client for multi node setup
#onvm_serv = 1
#--------------------------#
# Used port (please adjust accordingly)
#------ PSIO ports -------#
#port = xge0 xge1
#port = xge1
#------ DPDK ports -------#
port = ens259f1 #ens259f0
#port = dpdk1
#port = dpdk0 dpdk1
# Enable multi-process support
#multiprocess = 1
# Congestion control algorithm
# (only available when configured with --enable-ccp)
# cc = reno
# cc = cubic
# Receive buffer size of sockets; if not set: rcvbuf = sndbuf
rcvbuf = 8192
# Send buffer size of sockets; if not set: sndbuf = rcvbuf
sndbuf = 8192
# if sndbuf & rcvbuf not set: sndbuf = rcvbuf = 8192
# Maximum concurrency per core (default = 10000)
#max_concurrency = 10000
# Maximum number of socket buffers per core (default = 10000)
# Set this to small value if there are many idle connections
#max_num_buffers = 10000
# TCO timeout seconds
# (tcp_timeout = -1 can disable the timeout check)
tcp_timeout = 30
# TCP timewait seconds
tcp_timewait = 0
# Interface to print stats (please adjust accordingly)
# You can enable multiple ports in a line
#------ PSIO ports -------#
#stat_print = xge0
#stat_print = xge1
#------ DPDK ports -------#
stat_print = ens259f1 #ens259f0
#stat_print = dpdk0 dpdk1
#######################################################
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment