Commit d2d47b86 authored by Smit Gangurde's avatar Smit Gangurde

RAMCloud Offload, Smit MTP

parent 33075e8b
*.gch
*.out
*.o
*code-workspace
.vscode
.*~
\ No newline at end of file
#ifndef ERR_MSSG_H
#define ERR_MSSG_H
#define D(x) do{x;}while(0)
#endif
\ No newline at end of file
TRANSPORT_TYPE=RDMA_RC
NUM_THREADS=8
\ No newline at end of file
TRANSPORT_TYPE=TCP
NUM_THREAD=8
\ No newline at end of file
###############_Work in Progress_###############
Things to look out for: cpu speed difference between host and nic can cause an issue while programming in RDMA
Current Issue: Memory leak on exit of connection
Keeps creating buffers for dead connection messages (why is select even returning dead connections?)
Fix to be implemented: Clean closing of connections
Compilation commands:
NIC:
g++ -g integrated_nic.cc include/common.cc transport_api/transport_config.cc config/read_config.cc include/connection_pool.cc include/thread_pool.cc include/threadsafe_queue.cc include/log.cc include/thread_functions.cc include/dispatcher.cc include/client_functions.cc include/hash.cc include/Buffer.cc include/cli_api.cc -libverbs -lpthread -mcmodel=small
SERVER:
g++ -g integrated_server.cc include/common.cc transport_api/transport_config.cc config/read_config.cc include/connection_pool.cc include/thread_pool.cc include/threadsafe_queue.cc include/log.cc include/thread_functions.cc include/cli_api.cc include/dispatcher.cc include/client_functions.cc include/hash.cc include/Buffer.cc -libverbs -lpthread -mcmodel=medium
CLIENT:
g++ -g threaded_client1.cc include/common.cc transport_api/transport_config.cc config/read_config.cc include/connection_pool.cc include/thread_pool.cc include/threadsafe_queue.cc include/log.cc include/thread_functions.cc include/dispatcher.cc include/client_functions.cc include/cli_api.cc include/hash.cc include/Buffer.cc -libverbs -lpthread -mcmodel=medium
class ReplicaManager {
public:
private:
}
\ No newline at end of file
#include <iostream>
#include <sys/time.h>
#include <infiniband/verbs.h>
#include "rdma_states.hpp"
#include "rdma_helper.hpp"
#include "metadata.hpp"
#include "transport_helper.hpp"
const short int MODE = 0;
char *SERVER_HOST = "192.168.200.20";
char *SERVER_NIC = "192.168.200.21";
const int NIC_PORT = 8090;
const double err_fraction = 0.5;
const short int dev_num = 0;
int main(int argc, char * argv[]) {
char *temp = (char *) malloc(128);
struct resource_base *base;
base = (struct resource_base*) malloc(sizeof(struct resource_base));
init_resources(base);
//base->server_name = SERVER_NIC;
base->ib_port = IB_PORT;
base->gid_idx = GID_IDX;
base->port = NIC_PORT;
open_dev(base, dev_num);
allocate_pd(base);
register_mr(base);
init_cq(base);
init_qp(base);
struct ibv_port_attr port_attr;
if(ibv_query_port(base->ctx, base->ib_port, &port_attr))
D(err_msg("ibv_query_gid", true, base));
if(port_attr.state != IBV_PORT_ACTIVE)
D(err_msg("IB PORT NOT ACTIVE", true, base));
base->port_attr = &port_attr;
union ibv_gid my_gid;
if(ibv_query_gid(base->ctx, base->ib_port, base->gid_idx, &my_gid))
D(err_msg("ibv_query_gid", true, base));
memcpy(base->local_conn->gid, &my_gid, 16);
connect_qp(base);
strcpy(base->mr_buf_addr, "yo");
sync_remote_qp(base, "R", temp, 1);
post_send(base, IBV_WR_RDMA_WRITE);
//base->mr_buf_addr = (char *) malloc(base->mr_size);
//strcpy(base->mr_buf_addr, "Hi from client\0");
//sock_connect(base);
//sync_remote_qp(base, base->mr_buf_addr, temp, 15);
union object test_obj;
union object ret_obj;
memset(&test_obj, 0, sizeof(test_obj));
memset(&ret_obj, 0, sizeof(ret_obj));
test_obj.obj.key = 1;
test_obj.obj.value[0] = 'T';
test_obj.obj.version = 1;
test_obj.obj.status = STATUS_OK;
struct timeval temp_time;
double snd_ts, rcvd_ts;
double avg = 0;
int err_cnt = 0;
int succ_cnt = 0;
// for(int i=0; i<1000; i++) {
// gettimeofday(&temp_time, NULL);
// //time in ms
// snd_ts = ((double)temp_time.tv_sec*1000.0) + ((double)temp_time.tv_usec/1000.0);
// test_obj.obj.send_ts = snd_ts;
// sync_remote_qp(base, "W", temp, 1);
// sync_remote_qp(base, (char *)&test_obj, (char *)&ret_obj, sizeof(test_obj));
// gettimeofday(&temp_time, NULL);
// rcvd_ts = ((double)temp_time.tv_sec*1000.0) + ((double)temp_time.tv_usec/1000.0);
// avg += (rcvd_ts - snd_ts);
// }
enum Status ret_status;
for(int i=0; i<cache_meta_size; i++) {
test_obj.obj.key = i;
gettimeofday(&temp_time, NULL);
snd_ts = ((double)temp_time.tv_sec*1000.0) + ((double)temp_time.tv_usec/1000.0);
test_obj.obj.send_ts = snd_ts;
//send_and_check(base, (char *)&test_obj, (char *)&ret_status, sizeof(test_obj), sizeof(ret_status));
memcpy((void*)base->mr_buf_addr, (void*)&test_obj, sizeof(test_obj));
send_obj(base, "R", 1);
sync_remote_qp(base, "T", temp, 1);
read_obj(base, (char *)&ret_status, sizeof(ret_status));
gettimeofday(&temp_time, NULL);
rcvd_ts = ((double)temp_time.tv_sec*1000.0) + ((double)temp_time.tv_usec/1000.0);
avg += (rcvd_ts - snd_ts);
if(ret_status == STATUS_WRONG_VERSION) err_cnt++;
else succ_cnt++;
}
std::cout<<"Errored requests: "<<err_cnt<<std::endl;
std::cout<<"Successful requests: "<<succ_cnt<<std::endl;
std::cout<<"Avg. RTT: "<<avg/(double)cache_meta_size<<" ms"<<std::endl;
// if(ret_obj.obj.status == STATUS_WRONG_VERSION)
// std::cout<<"Returned with wrong version status"<<std::endl;
cleanup(base);
return 0;
}
DEBUG=TRUE
ANALYZE=TRUE
INTERACTIVE_MODE=FALSE
TRANSPORT_TYPE=RDMA_RC
NUM_THREADS=0
CONN_PORT=8888
MAX_PACKET_SIZE_MBYTES=4;
RDMA_MR_SIZE_MBYTES=1
RDMA_MTU_SIZE=512
RDMA_MIN_RNR_TIMER=12
RDMA_TIMEOUT=12
RDMA_CQ_POLL_TIMEOUT_MS=5
RDMA_RETRY_CNT=4
RDMA_IB_PORT=1
RDMA_GID_IDX=1
\ No newline at end of file
#ifndef __CONFIG_PARAMETERS_H__
#define __CONFIG_PARAMETERS_H__
#include <unordered_map>
#include <string>
#include "../transport_api/transport_config.hpp"
enum params {
DEBUG,
ANALYZE,
INTERACTIVE_MODE,
TRANSPORT_TYPE,
NUM_THREADS,
CONN_PORT,
NUM_REPLICAS,
MAX_PACKET_SIZE_BYTES,
MAX_PACKET_SIZE_MBYTES,
RDMA_MR_SIZE_BYTES,
RDMA_MR_SIZE_MBYTES, //max 8
RDMA_MTU_SIZE_BYTES, //one of 256,512,1024,2048,4096
RDMA_MIN_RNR_TIMER,
RDMA_TIMEOUT,
RDMA_CQ_POLL_TIMEOUT_MS,
RDMA_RETRY_CNT,
RDMA_IB_PORT,
RDMA_GID_IDX,
ARRIVAL_RATE
};
const std::string param_strs[] = {
"DEBUG",
"ANALYZE",
"INTERACTIVE_MODE",
"TRANSPORT_TYPE",
"NUM_THREADS",
"CONN_PORT",
"NUM_REPLICAS",
"MAX_PACKET_SIZE_BYTES",
"MAX_PACKET_SIZE_MBYTES",
"RDMA_MR_SIZE_BYTES",
"RDMA_MR_SIZE_MBYTES",
"RDMA_MTU_SIZE_BYTES",
"RDMA_MIN_RNR_TIMER",
"RDMA_TIMEOUT",
"RDMA_CQ_POLL_TIMEOUT_MS",
"RDMA_RETRY_CNT",
"RDMA_IB_PORT",
"RDMA_GID_PORT",
"ARRIVAL_RATE"
};
const int num_params = 17;
#endif
\ No newline at end of file
DEBUG=TRUE
ANALYZE=TRUE
TRANSPORT_TYPE=RDMA_RC
NUM_THREADS=3
CONN_PORT=8888
MAX_PACKET_SIZE_MBYTES=4;
RDMA_MR_SIZE_MBYTES=1
RDMA_MTU_SIZE=512
RDMA_MIN_RNR_TIMER=12
RDMA_TIMEOUT=12
RDMA_CQ_POLL_TIMEOUT_MS=5
RDMA_RETRY_CNT=4
RDMA_IB_PORT=1
RDMA_GID_IDX=1
\ No newline at end of file
#ifndef __READ_CONFIG_CC__
#define __READ_CONFIG_CC__
#include <chrono>
#include <iostream>
#include <string>
#include <iterator>
#include <vector>
#include <fstream>
#include <infiniband/verbs.h>
#include "../transport_api/transport_config.hpp"
#include "read_config.hpp"
std::unordered_map<std::string, enum Transport_Type> transport_type_map = {
{"TCP", TCP_IP_TRANSPORT},
{"UDP", UDP_TRANSPORT},
{"RDMA_RC", RDMA_RC_TRANSPORT},
{"RDMA_UC", RDMA_UC_TRANSPORT},
};
std::unordered_map<std::string, enum params> param_map = {
{"DEBUG", DEBUG},
{"ANALYZE", ANALYZE},
{"INTERACTIVE_MODE", INTERACTIVE_MODE},
{"TRANSPORT_TYPE", TRANSPORT_TYPE},
{"NUM_THREADS", NUM_THREADS},
{"CONN_PORT", CONN_PORT},
{"NUM_REPLICAS", NUM_REPLICAS},
{"MAX_PACKET_SIZE_BYTES", MAX_PACKET_SIZE_BYTES},
{"MAX_PACKET_SIZE_MBYTES", MAX_PACKET_SIZE_MBYTES},
{"RDMA_MR_SIZE_BYTES", RDMA_MR_SIZE_BYTES},
{"RDMA_MR_SIZE_MBYTES", RDMA_MR_SIZE_MBYTES},
{"RDMA_MTU_SIZE_BYTES", RDMA_MTU_SIZE_BYTES},
{"RDMA_MIN_RNR_TIMER", RDMA_MIN_RNR_TIMER},
{"RDMA_TIMEOUT", RDMA_TIMEOUT},
{"RDMA_CQ_POLL_TIMEOUT_MS", RDMA_CQ_POLL_TIMEOUT_MS},
{"RDMA_RETRY_CNT", RDMA_RETRY_CNT},
{"RDMA_IB_PORT", RDMA_IB_PORT},
{"RDMA_GID_IDX", RDMA_GID_IDX},
{"ARRIVAL_RATE", ARRIVAL_RATE},
};
std::string strip_whitespaces(std::string str) {
std::string t;
std::string::iterator it = str.begin();
while(it!=str.end() && (*it)==' ') it++;
while(it!=str.end() && (*it)!=' ') {
t.push_back((*it));
it++;
}
return t;
}
Params::Params() {
//pass
}
Params::Params(std::string f) {
this->filename = f;
this->debug = false;
this->analyze = false;
this->interactive_mode = true;
this->transport_type = TCP_IP_TRANSPORT;
this->num_threads = 0;
this->conn_port = 8080;
this->num_replicas = 0;
this->max_packet_size_bytes = 0;
this->rdma_mr_size_bytes = 0;
this->rdma_mtu_size_bytes = IBV_MTU_512;
this->rdma_min_rnr_timer = 0;
this->rdma_timeout = 0;
this->rdma_cq_poll_timeout_ms = std::chrono::duration<double>{0.0};
this->rdma_retry_cnt = 0;
this->rdma_ib_port = 0;
this->rdma_gid_idx = 0;
this->arrival_rate = 0.0;
}
//debug functions
void Params::print_map() {
for(auto x: this->param_val_map) {
std::cout<<x.first<<" : "<<x.second<<std::endl;
}
return;
}
void Params::print_vals() {
std::cout<<"DEBUG: ";
if(this->debug) std::cout<<"TRUE"<<std::endl;
else std::cout<<"FALSE"<<std::endl;
std::cout<<"ANALYZE: ";
if(this->analyze) std::cout<<"TRUE"<<std::endl;
else std::cout<<"FALSE"<<std::endl;
std::cout<<"MODE: ";
if(this->interactive_mode) std::cout<<"INTERACTIVE MODE"<<std::endl;
else std::cout<<"BATCH MODE"<<std::endl;
std::cout<<"Transport: ";
switch(this->transport_type) {
case TCP_IP_TRANSPORT:
std::cout<<"TCP"<<std::endl;
break;
case UDP_TRANSPORT:
std::cout<<"UDP"<<std::endl;
break;
case RDMA_RC_TRANSPORT:
std::cout<<"RDMA RC"<<std::endl;
break;
case RDMA_UC_TRANSPORT:
std::cout<<"RDMA UC"<<std::endl;
break;
}
std::cout<<"Num thread: "<<this->num_threads<<std::endl;
std::cout<<"Conn Port: "<<this->conn_port<<std::endl;
std::cout<<"Num Replicas: "<<this->num_replicas<<std::endl;
std::cout<<"Max Packet Size (Bytes): "<<this->max_packet_size_bytes<<std::endl;
std::cout<<"RDMA MR size (Bytes): "<<this->rdma_mr_size_bytes<<std::endl;
std::cout<<"RDMA MTU size (Bytes): ";
switch (this->rdma_mtu_size_bytes) {
case IBV_MTU_256:
std::cout<<"256B"<<std::endl;
break;
case IBV_MTU_512:
std::cout<<"512B"<<std::endl;
break;
case IBV_MTU_1024:
std::cout<<"1024B"<<std::endl;
break;
case IBV_MTU_2048:
std::cout<<"2048B"<<std::endl;
break;
case IBV_MTU_4096:
std::cout<<"4096B"<<std::endl;
break;
}
std::cout<<"RDMA min rnr timer: "<<this->rdma_min_rnr_timer<<std::endl;
std::cout<<"RDMA timeout: "<<this->rdma_timeout<<std::endl;
std::cout<<"RDMA CQ poll timeout: "<<this->rdma_cq_poll_timeout_ms.count()<<std::endl;
std::cout<<"RDMA IB Port: "<<this->rdma_ib_port<<std::endl;
std::cout<<"RDMA GID IDX: "<<this->rdma_gid_idx<<std::endl;
std::cout<<"ARRIVAL RATE: "<<this->arrival_rate<<" requests/second"<<std::endl;
return;
}
// Just Reads config file
// to fill an internal map
void Params::read_config_file() {
std::string line;
std::ifstream config_file;
config_file.open(this->filename);
int pos;
while(getline(config_file, line)) {
if(line[0]=='\n' ||
(line[0]=='/' && line[1]=='/')) continue;
pos = line.find_first_of('=');
if(pos==std::string::npos) continue;
this->param_val_map[line.substr(0, pos)] = line.substr(pos+1, line.size()-pos);
}
return;
}
// Fills parameter variables
// using internal parameter map
void Params::fill_params() {
std::string t1, t2;
double tmp;
enum params param_type;
if(this->param_val_map.empty()) {
//error
return;
}
for(auto p: this->param_val_map) {
t1 = strip_whitespaces(p.first);
t2 = strip_whitespaces(p.second);
if(param_map.count(t1)==0) continue;
param_type = param_map[t1];
switch(param_type) {
case DEBUG:
if(t2.compare("TRUE") == 0) {
this->debug = true;
}
else {
this->debug = false;
}
break;
case ANALYZE:
if(t2.compare("TRUE") == 0) {
this->analyze = true;
}
else {
this->analyze = false;
}
break;
case INTERACTIVE_MODE:
if(t2.compare("TRUE") == 0) {
this->interactive_mode = true;
}
else {
this->interactive_mode = false;
}
case TRANSPORT_TYPE:
this->transport_type = transport_type_map[t2];
break;
case NUM_THREADS:
this->num_threads = std::stoi(t2);
break;
case CONN_PORT:
this->conn_port = std::stoi(t2);
break;
case NUM_REPLICAS:
this->num_replicas = std::stoi(t2);
break;
case MAX_PACKET_SIZE_BYTES:
this->max_packet_size_bytes = std::stoi(t2);
break;
case MAX_PACKET_SIZE_MBYTES:
this->max_packet_size_bytes = (int) (std::stoi(t2)*(1<<20));
break;
case RDMA_MR_SIZE_BYTES:
this->rdma_mr_size_bytes = (size_t) (std::stoi(t2));
break;
case RDMA_MR_SIZE_MBYTES:
this->rdma_mr_size_bytes = (size_t) (std::stoi(t2)*(1<<20));
break;
case RDMA_MTU_SIZE_BYTES:
switch(std::stoi(t2)) {
case 256:
this->rdma_mtu_size_bytes = IBV_MTU_256;
break;
case 512:
this->rdma_mtu_size_bytes = IBV_MTU_512;
break;
case 1024:
this->rdma_mtu_size_bytes = IBV_MTU_1024;
break;
case 2048:
this->rdma_mtu_size_bytes = IBV_MTU_2048;
break;
case 4096:
this->rdma_mtu_size_bytes = IBV_MTU_4096;
break;
default:
//error
break;
}
break;
case RDMA_MIN_RNR_TIMER:
this->rdma_min_rnr_timer = std::stoi(t2);
break;
case RDMA_TIMEOUT:
this->rdma_timeout = std::stoi(t2);
break;
case RDMA_CQ_POLL_TIMEOUT_MS:
tmp = std::stod(t2);
tmp *= 1e-3; //convert to ms
this->rdma_cq_poll_timeout_ms = std::chrono::duration<double>{tmp};
break;
case RDMA_RETRY_CNT:
this->rdma_retry_cnt = std::stoi(t2);
break;
case RDMA_IB_PORT:
this->rdma_ib_port = std::stoi(t2);
break;
case RDMA_GID_IDX:
this->rdma_gid_idx = std::stoi(t2);
break;
case ARRIVAL_RATE:
this->arrival_rate = std::stod(t2);
default:
//error
break;
}
}
return;
}
// Reads config file and
// Fills parameter variables
void Params::read_params() {
this->read_config_file();
this->fill_params();
return;
}
#endif
\ No newline at end of file
#ifndef __READ_CONFIG_H__
#define __READ_CONFIG_H__
#include <chrono>
#include <string>
#include <vector>
#include <unordered_map>
#include <infiniband/verbs.h>
#include "config_parameters.hpp"
#include "../transport_api/transport_config.hpp"
class Params {
private:
std::string filename;
std::vector<std::string> param_lines;
std::unordered_map<std::string, std::string> param_val_map;
public:
bool debug;
bool analyze;
bool interactive_mode;
enum Transport_Type transport_type;
int num_threads;
int conn_port;
int num_replicas;
int max_packet_size_bytes;
size_t rdma_mr_size_bytes;
enum ibv_mtu rdma_mtu_size_bytes;
int rdma_min_rnr_timer;
int rdma_timeout;
std::chrono::duration<double> rdma_cq_poll_timeout_ms;
int rdma_retry_cnt;
int rdma_ib_port;
int rdma_gid_idx;
double arrival_rate;
Params();
Params(std::string f);
//debug functions
void print_map();
void print_vals();
//required functions
void read_params();
void read_config_file();
void fill_params();
};
std::vector<std::string> get_param_lines(char *filename);
enum params check_token(std::string token);
#endif
\ No newline at end of file
#include <iostream>
#include "read_config.hpp"
using namespace std;
int main() {
Params p("config.conf");
p.read_params();
p.print_vals();
return 0;
}
\ No newline at end of file
DEBUG=TRUE
ANALYZE=TRUE
TRANSPORT_TYPE=RDMA_RC
NUM_THREADS=3
CONN_PORT=8888
MAX_PACKET_SIZE_MBYTES=4;
RDMA_MR_SIZE_MBYTES=1
RDMA_MTU_SIZE=512
RDMA_MIN_RNR_TIMER=12
RDMA_TIMEOUT=12
RDMA_CQ_POLL_TIMEOUT_MS=5
RDMA_RETRY_CNT=4
RDMA_IB_PORT=1
RDMA_GID_IDX=1
\ No newline at end of file
DEBUG=TRUE
ANALYZE=TRUE
INTERACTIVE_MODE=FALSE
TRANSPORT_TYPE=RDMA_RC
NUM_THREADS=0
CONN_PORT=8888
MAX_PACKET_SIZE_MBYTES=4;
RDMA_MR_SIZE_MBYTES=1
RDMA_MTU_SIZE=512
RDMA_MIN_RNR_TIMER=12
RDMA_TIMEOUT=12
RDMA_CQ_POLL_TIMEOUT_MS=5
RDMA_RETRY_CNT=4
RDMA_IB_PORT=1
RDMA_GID_IDX=1
ARRIVAL_RATE=80
\ No newline at end of file
#include <iostream>
#include <stdio.h>
#include <vector>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <string.h>
using namespace std;
char* NIC_IP = "192.168.200.21";
char* SERVER_IP = "192.168.200.20";
char* CLIENT_IP = "192.168.200.40";
int SERVER_PORT = 8989;
int CLIENT_PORT = 9898;
int send_obj(int cfd, char* obj, int size) {
int op_bytes = 0;
op_bytes = write(cfd, obj, size);
if(op_bytes < size) {
cout<<"write err"<<endl;
return -1;
}
return 0;
}
int read_obj(int cfd, char *obj, int size) {
int ip_bytes = 0;
int tot_bytes = 0;
while(tot_bytes < size) {
ip_bytes = read(cfd, obj, size);
if(ip_bytes == 0) break;
else if(ip_bytes > 0) tot_bytes += ip_bytes;
else break;
}
if(tot_bytes < size) {
cout<<"read err"<<endl;
return -1;
}
return 0;
}
int sock_connect(char* server_name, int port, int* local_fd, int* conn_fd) {
struct sockaddr_in host_addr;
memset(&host_addr, 0, sizeof(host_addr));
host_addr.sin_family = AF_INET;
host_addr.sin_port = htons(port);
host_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int sfd, cfd;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if(sfd < 0) {
cout<<"sfd:socket"<<endl;
return -1;
}
if(server_name==NULL) {
if(bind(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr))) {
close(sfd);
cout<<"bind"<<endl;
}
listen(sfd, 1);
cfd = accept(sfd, NULL, 0);
*local_fd = sfd;
*conn_fd = cfd;
return 0;
}
else {
inet_aton(server_name, &host_addr.sin_addr);
if(connect(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr)) < 0) {
close(sfd);
cout<<"connect"<<endl;
return -1;
}
*conn_fd = sfd;
return 0;
}
}
struct dummy {
long long key;
bool valid;
char val[100];
};
long long arr_size = 900000;
void populate_objs(dummy arr[], long long arr_size) {
for(int i=0; i<arr_size; i++) {
arr[i].key = i;
arr[i].val[0] = 'H';
arr[i].val[1] = 'E';
arr[i].val[2] = 'Y';
arr[i].val[3] = '\0';
}
}
int main() {
//dummy obj[arr_size];
//populate_objs(obj, arr_size);
int nic_sfd, nic_cfd;
sock_connect(NIC_IP, CLIENT_PORT, &nic_sfd, &nic_cfd);
struct timeval t_time;
double s1_time, s2_time, e_time, avg_time;
avg_time = 0.0;
int succ_cnt = 0;
int err_cnt = 0;
char *t = (char*)malloc(1);
int obj_size = sizeof(struct dummy);
dummy obj;
gettimeofday(&t_time, NULL);
s2_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
for(long long i=0; i<arr_size; i++) {
//printf("\r%lld",i);
//fflush(stdout);
gettimeofday(&t_time, NULL);
s1_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
obj.key = i;
send_obj(nic_cfd, (char *)&obj, obj_size);
read_obj(nic_cfd, t, 1);
gettimeofday(&t_time, NULL);
e_time = ((double)t_time.tv_sec*1000.0) +((double)t_time.tv_usec/1000.0);
avg_time += (e_time - s1_time);
//cout<<t<<endl;
if(t[0]=='S') succ_cnt++;
else err_cnt++;
//usleep(10000);
}
gettimeofday(&t_time, NULL);
e_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
double overall_time = (e_time - s2_time)/1000.0;
avg_time = avg_time/(succ_cnt+err_cnt);
cout<<"Success: "<<succ_cnt<<endl;
cout<<"Errored: "<<err_cnt<<endl;
cout<<"Avg. RTT: "<<avg_time<<" ms"<<endl;
cout<<"Overall Time: "<<overall_time<<endl;
cout<<"Througput: "<<(succ_cnt+err_cnt)/overall_time<<endl;
cout<<"Closing connections"<<endl;
close(nic_cfd);
close(nic_sfd);
return 0;
}
#include <iostream>
#include <stdio.h>
#include <vector>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <string.h>
using namespace std;
char* NIC_IP = "192.168.200.21";
char* SERVER_IP = "192.168.200.20";
char* CLIENT_IP = "192.168.200.40";
int SERVER_PORT = 8989;
int CLIENT_PORT = 9898;
int send_obj(int cfd, char* obj, int size) {
int op_bytes = 0;
op_bytes = write(cfd, obj, size);
if(op_bytes < size) {
cout<<"write err"<<endl;
return -1;
}
return 0;
}
int read_obj(int cfd, char *obj, int size) {
int ip_bytes = 0;
int tot_bytes = 0;
while(tot_bytes < size) {
ip_bytes = read(cfd, obj, size);
if(ip_bytes == 0) break;
else if(ip_bytes > 0) tot_bytes += ip_bytes;
else break;
}
if(tot_bytes < size) {
cout<<"read err"<<endl;
return -1;
}
return 0;
}
int sock_connect(char* server_name, int port, int* local_fd, int* conn_fd) {
struct sockaddr_in host_addr;
memset(&host_addr, 0, sizeof(host_addr));
host_addr.sin_family = AF_INET;
host_addr.sin_port = htons(port);
host_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int sfd, cfd;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if(sfd < 0) {
cout<<"sfd:socket"<<endl;
return -1;
}
if(server_name==NULL) {
if(bind(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr))) {
close(sfd);
cout<<"bind"<<endl;
}
listen(sfd, 1);
cfd = accept(sfd, NULL, 0);
*local_fd = sfd;
*conn_fd = cfd;
return 0;
}
else {
inet_aton(server_name, &host_addr.sin_addr);
if(connect(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr)) < 0) {
close(sfd);
cout<<"connect"<<endl;
return -1;
}
*conn_fd = sfd;
return 0;
}
}
struct dummy {
long long key;
bool valid;
char val[100];
};
void populate_objs(dummy arr[], long long arr_size, double err_fraction) {
for(long long i=0; i<arr_size; i++) {
arr[i].key = i;
if((double)rand()/RAND_MAX <= err_fraction) arr[i].valid = false;
else arr[i].valid = true;
}
}
void dummy_function() {
int j=0;
for(int i=0; i<1000; i++) j++;
}
double err_fraction = 0.0;
long long arr_size = 1000000;
int main() {
//dummy arr[arr_size];
//populate_objs(arr, arr_size, err_fraction);
int client_sfd, client_cfd;
sock_connect(NULL, SERVER_PORT, &client_sfd, &client_cfd);
dummy obj;
int obj_size = sizeof(obj);
char *s = "S";
char *e = "E";
int succ_cnt = 0;
int err_cnt = 0;
struct timeval t_time;
double s_time, e_time, overall_time;
gettimeofday(&t_time, NULL);
s_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
for(long long i=0; i<arr_size; i++) {
read_obj(client_cfd, (char*)&obj, obj_size);
if((double)rand()/RAND_MAX <= err_fraction) obj.valid = false;
else obj.valid = true;
if(obj.valid) {
succ_cnt++;
dummy_function();
send_obj(client_cfd, s, 1);
}
else {
err_cnt++;
send_obj(client_cfd, e, 1);
}
}
gettimeofday(&t_time, NULL);
e_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
overall_time = (e_time-s_time)/1000.0;
cout<<"Throughput: "<<(succ_cnt+err_cnt)/overall_time<<endl;
cout<<"Closing connections"<<endl;
//close(nic_cfd);
close(client_sfd);
return 0;
}
#include <iostream>
#include <stdio.h>
#include <vector>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <string.h>
using namespace std;
char* NIC_IP = "192.168.200.21";
char* SERVER_IP = "192.168.200.20";
char* CLIENT_IP = "192.168.200.40";
int SERVER_PORT = 8989;
int CLIENT_PORT = 9898;
int send_obj(int cfd, char* obj, int size) {
int op_bytes = 0;
op_bytes = write(cfd, obj, size);
if(op_bytes < size) {
cout<<"write err"<<endl;
return -1;
}
return 0;
}
int read_obj(int cfd, char *obj, int size) {
int ip_bytes = 0;
int tot_bytes = 0;
while(tot_bytes < size) {
ip_bytes = read(cfd, obj, size);
if(ip_bytes == 0) break;
else if(ip_bytes > 0) tot_bytes += ip_bytes;
else break;
}
if(tot_bytes < size) {
cout<<"read err"<<endl;
return -1;
}
return 0;
}
int sock_connect(char* server_name, int port, int* local_fd, int* conn_fd) {
struct sockaddr_in host_addr;
memset(&host_addr, 0, sizeof(host_addr));
host_addr.sin_family = AF_INET;
host_addr.sin_port = htons(port);
host_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int sfd, cfd;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if(sfd < 0) {
cout<<"sfd:socket"<<endl;
return -1;
}
if(server_name==NULL) {
if(bind(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr))) {
close(sfd);
cout<<"bind"<<endl;
}
listen(sfd, 1);
cfd = accept(sfd, NULL, 0);
*local_fd = sfd;
*conn_fd = cfd;
return 0;
}
else {
inet_aton(server_name, &host_addr.sin_addr);
if(connect(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr)) < 0) {
close(sfd);
cout<<"connect"<<endl;
return -1;
}
*conn_fd = sfd;
return 0;
}
}
struct dummy {
int key;
bool valid;
char val[100];
};
void populate_objs(dummy arr[], int arr_size, double err_fraction) {
for(int i=0; i<arr_size; i++) {
arr[i].key = i;
if((double)rand()/RAND_MAX <= err_fraction) arr[i].valid = false;
else arr[i].valid = true;
}
}
void dummy_function() {
int j=0;
for(int i=0; i<1000; i++) j++;
}
double err_fraction = 0.0;
int arr_size = 1000;
int main() {
dummy arr[arr_size];
populate_objs(arr, arr_size, err_fraction);
int client_sfd, client_cfd;
sock_connect(NULL, SERVER_PORT, &client_sfd, &client_cfd);
dummy obj;
int obj_size = sizeof(obj);
char *s = "S";
char *e = "E";
int succ_cnt = 0;
int err_cnt = 0;
for(int i=0; i<arr_size; i++) {
read_obj(client_cfd, (char*)&obj, obj_size);
if(arr[obj.key].valid) {
succ_cnt++;
dummy_function();
send_obj(client_cfd, s, 1);
}
else {
err_cnt++;
send_obj(client_cfd, e, 1);
}
}
cout<<"Closing connections"<<endl;
//close(nic_cfd);
close(client_sfd);
return 0;
}
#include <iostream>
#include <stdio.h>
#include <vector>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <string.h>
using namespace std;
char* NIC_IP = "192.168.200.21";
char* SERVER_IP = "192.168.200.20";
char* CLIENT_IP = "192.168.200.40";
int SERVER_PORT = 8989;
int CLIENT_PORT = 9898;
int send_obj(int cfd, char* obj, int size) {
int op_bytes = 0;
op_bytes = write(cfd, obj, size);
if(op_bytes < size) {
cout<<"write err"<<endl;
return -1;
}
return 0;
}
int read_obj(int cfd, char *obj, int size) {
int ip_bytes = 0;
int tot_bytes = 0;
while(tot_bytes < size) {
ip_bytes = read(cfd, obj, size);
if(ip_bytes == 0) break;
else if(ip_bytes > 0) tot_bytes += ip_bytes;
else break;
}
if(tot_bytes < size) {
cout<<"read err"<<endl;
return -1;
}
return 0;
}
int sock_connect(char* server_name, int port, int* local_fd, int* conn_fd) {
struct sockaddr_in host_addr;
memset(&host_addr, 0, sizeof(host_addr));
host_addr.sin_family = AF_INET;
host_addr.sin_port = htons(port);
host_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int sfd, cfd;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if(sfd < 0) {
cout<<"sfd:socket"<<endl;
return -1;
}
if(server_name==NULL) {
if(bind(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr))) {
close(sfd);
cout<<"bind"<<endl;
}
listen(sfd, 1);
cfd = accept(sfd, NULL, 0);
*local_fd = sfd;
*conn_fd = cfd;
return 0;
}
else {
inet_aton(server_name, &host_addr.sin_addr);
if(connect(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr)) < 0) {
close(sfd);
cout<<"connect"<<endl;
return -1;
}
*conn_fd = sfd;
return 0;
}
}
struct dummy {
long long key;
bool valid;
char val[100];
};
void populate_objs(dummy arr[], int arr_size, double err_fraction) {
for(int i=0; i<arr_size; i++) {
arr[i].key = i;
if((double)rand()/RAND_MAX <= err_fraction) arr[i].valid = false;
else arr[i].valid = true;
}
}
double err_fraction = 0.6;
long long arr_size = 9000000;
int main() {
//dummy obj[arr_size];
srand(time(NULL));
//populate_objs(obj, arr_size, err_fraction);
int server_sfd, server_cfd, client_sfd, client_cfd;
sock_connect(SERVER_IP, SERVER_PORT, &server_sfd, &server_cfd);
cout<<"Connected to server"<<endl;
sock_connect(NULL, CLIENT_PORT, &client_sfd, &client_cfd);
cout<<"Connected to client"<<endl;
double s_time, e_time, avg_time;
struct timeval temp_time;
cout<<"Connected to server and client"<<endl;
dummy t_obj;
int succ_cnt = 0;
int err_cnt = 0;
int obj_size = sizeof(t_obj);
char *temp_char = (char*)malloc(1);
char *s = "S";
char *e = "E";
memset((void*)&t_obj, 0, obj_size);
for(long long i=0; i<arr_size; i++) {
//cout<<i<<endl;
read_obj(client_cfd, (char*)&t_obj, obj_size);
if((double)rand()/RAND_MAX <= err_fraction) t_obj.valid = false;
else t_obj.valid = true;
if(t_obj.valid) {
succ_cnt++;
//cout<<t_obj.val<<endl;
//dummy_function();
send_obj(server_cfd, (char*)&t_obj, obj_size);
//cout<<"obj_sent"<<endl;
read_obj(server_cfd, temp_char, 1);
send_obj(client_cfd, s, 1);
}
else {
err_cnt++;
send_obj(client_cfd, e, 1);
}
}
cout<<"Error Fraction: "<<err_fraction<<endl;
cout<<"Succcessful: "<<succ_cnt<<endl;
cout<<"Errored: "<<err_cnt<<endl;
cout<<"....Closing connections...."<<endl;
t_obj.val[0]='D';
send_obj(server_cfd, (char*)&t_obj, obj_size);
close(client_cfd);
close(server_cfd);
close(client_sfd);
close(server_sfd);
return 0;
}
#include <iostream>
#include <stdio.h>
#include <vector>
#include <unistd.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <string.h>
using namespace std;
char* NIC_IP = "192.168.200.21";
char* SERVER_IP = "192.168.200.20";
char* CLIENT_IP = "192.168.200.40";
int SERVER_PORT = 8989;
int CLIENT_PORT = 9898;
int send_obj(int cfd, char* obj, int size) {
int op_bytes = 0;
op_bytes = write(cfd, obj, size);
if(op_bytes < size) {
cout<<"write err"<<endl;
return -1;
}
return 0;
}
int read_obj(int cfd, char *obj, int size) {
int ip_bytes = 0;
int tot_bytes = 0;
while(tot_bytes < size) {
ip_bytes = read(cfd, obj, size);
if(ip_bytes == 0) break;
else if(ip_bytes > 0) tot_bytes += ip_bytes;
else break;
}
if(tot_bytes < size) {
cout<<"read err"<<endl;
return -1;
}
return 0;
}
int sock_connect(char* server_name, int port, int* local_fd, int* conn_fd) {
struct sockaddr_in host_addr;
memset(&host_addr, 0, sizeof(host_addr));
host_addr.sin_family = AF_INET;
host_addr.sin_port = htons(port);
host_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int sfd, cfd;
sfd = socket(AF_INET, SOCK_STREAM, 0);
if(sfd < 0) {
cout<<"sfd:socket"<<endl;
return -1;
}
if(server_name==NULL) {
if(bind(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr))) {
close(sfd);
cout<<"bind"<<endl;
}
listen(sfd, 1);
cfd = accept(sfd, NULL, 0);
*local_fd = sfd;
*conn_fd = cfd;
return 0;
}
else {
inet_aton(server_name, &host_addr.sin_addr);
if(connect(sfd, (struct sockaddr *)&host_addr, sizeof(host_addr)) < 0) {
close(sfd);
cout<<"connect"<<endl;
return -1;
}
*conn_fd = sfd;
return 0;
}
}
struct dummy {
long long key;
bool valid;
char val[100];
};
void populate_objs(dummy arr[], long long arr_size, double err_fraction) {
for(long long i=0; i<arr_size; i++) {
arr[i].key = i;
if((double)rand()/RAND_MAX <= err_fraction) arr[i].valid = false;
else arr[i].valid = true;
}
}
void dummy_function() {
int j=0;
for(int i=0; i<1000; i++) j++;
}
double err_fraction = 0.0;
long long arr_size = 900000;
int main() {
//dummy arr[arr_size];
//populate_objs(arr, arr_size, err_fraction);
int nic_sfd, nic_cfd;
sock_connect(NULL, SERVER_PORT, &nic_sfd, &nic_cfd);
dummy obj;
int obj_size = sizeof(obj);
char *s = (char *)malloc(1);
char *e = "E";
int succ_cnt = 0;
int err_cnt = 0;
int req_cnt = 0;
struct timeval t_time;
double s_time, s1_time, e_time, overall_time, overall_time1;
overall_time1=0.0;
gettimeofday(&t_time, NULL);
s_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
while(true) {
read_obj(nic_cfd, (char*)&obj, obj_size);
if(obj.val[0]=='D') break;
gettimeofday(&t_time, NULL);
s1_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
//if((double)rand()/RAND_MAX <= err_fraction) obj.valid = false;
//else obj.valid = true;
// if(obj.valid) {
// succ_cnt++;
// dummy_function();
// send_obj(client_cfd, s, 1);
// }
// else {
// err_cnt++;
// send_obj(client_cfd, e, 1);
// }
req_cnt++;
dummy_function();
send_obj(nic_cfd, s, 1);
gettimeofday(&t_time, NULL);
e_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
overall_time1 += (e_time-s1_time);
}
gettimeofday(&t_time, NULL);
e_time = ((double)t_time.tv_sec*1000.0) + ((double)t_time.tv_usec/1000.0);
overall_time = (e_time-s_time)/1000.0;
overall_time1 = overall_time1/1000.0;
cout<<"Req count: "<<req_cnt<<endl;
cout<<"Processing time: "<<overall_time1<<"s"<<endl;
cout<<"Throughput: "<<(req_cnt)/overall_time1<<endl;
cout<<"Closing connections"<<endl;
//close(nic_cfd);
close(nic_sfd);
return 0;
}
This diff is collapsed.
// Utility functions for socket servers in C.
//
// Eli Bendersky [http://eli.thegreenplace.net]
// This code is in the public domain.
#include "utils.h"
#include <fcntl.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
// #define _GNU_SOURCE
#include <netdb.h>
#define N_BACKLOG 64
void die(char* fmt, ...) {
va_list args;
va_start(args, fmt);
vfprintf(stderr, fmt, args);
va_end(args);
fprintf(stderr, "\n");
exit(EXIT_FAILURE);
}
void* xmalloc(size_t size) {
void* ptr = malloc(size);
if (!ptr) {
die("malloc failed");
}
return ptr;
}
void perror_die(char* msg) {
perror(msg);
exit(EXIT_FAILURE);
}
void report_peer_connected(const struct sockaddr_in* sa, socklen_t salen) {
char hostbuf[NI_MAXHOST];
char portbuf[NI_MAXSERV];
if (getnameinfo((struct sockaddr*)sa, salen, hostbuf, NI_MAXHOST, portbuf,
NI_MAXSERV, 0) == 0) {
printf("peer (%s, %s) connected\n", hostbuf, portbuf);
} else {
printf("peer (unknonwn) connected\n");
}
}
void report_backup_connected(const struct sockaddr_in* sa, socklen_t salen) {
char hostbuf[NI_MAXHOST];
char portbuf[NI_MAXSERV];
if (getnameinfo((struct sockaddr*)sa, salen, hostbuf, NI_MAXHOST, portbuf,
NI_MAXSERV, 0) == 0) {
printf("backup (%s, %s) connected\n", hostbuf, portbuf);
} else {
printf("backup (unknonwn) connected\n");
}
}
int listen_inet_socket(int portnum) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror_die("ERROR opening socket");
}
// This helps avoid spurious EADDRINUSE when the previous instance of this
// server died.
int opt = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror_die("setsockopt");
}
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(portnum);
if (bind(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
perror_die("ERROR on binding");
}
if (listen(sockfd, N_BACKLOG) < 0) {
perror_die("ERROR on listen");
}
return sockfd;
}
void make_socket_non_blocking(int sockfd) {
int flags = fcntl(sockfd, F_GETFL, 0);
if (flags == -1) {
perror_die("fcntl F_GETFL");
}
if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror_die("fcntl F_SETFL O_NONBLOCK");
}
}
\ No newline at end of file
// Utility functions for socket servers in C.
#ifndef UTILS_H
#define UTILS_H
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
// Dies (exits with a failure status) after printing the given printf-like
// message to stdout.
void die(char* fmt, ...);
// Wraps malloc with error checking: dies if malloc fails.
void* xmalloc(size_t size);
// Dies (exits with a failure status) after printing the current perror status
// prefixed with msg.
void perror_die(char* msg);
// Reports a peer connection to stdout. sa is the data populated by a successful
// accept() call.
void report_peer_connected(const struct sockaddr_in* sa, socklen_t salen);
// Reports a backup connection to stdout. sa is the data populated by a successful
// accept() call.
void report_backup_connected(const struct sockaddr_in* sa, socklen_t salen);
// Creates a bound and listening INET socket on the given port number. Returns
// the socket fd when successful; dies in case of errors.
int listen_inet_socket(int portnum);
// Sets the given socket into non-blocking mode.
void make_socket_non_blocking(int sockfd);
#endif /* UTILS_H */
#ifndef __BUFFER_CC__
#define __BUFFER_CC__
#include <algorithm>
#include <thread>
#include <vector>
#include "Buffer.hpp"
#include "common.hpp"
#include "queue_context.hpp"
Buffer::Buffer(size_t buf_size) {
this->buffer_size = buf_size;
this->buffer.resize(buf_size);
// this->ready_events.resize(buf_size);
for(int i=0; i<buf_size; i++) {
this->buffer[i].fd_id = -1;
this->buffer[i].in_use = false;
this->buffer[i].cr = NULL;
// this->ready_events[i] = -1;
}
if(pthread_mutex_init(&(this->buffer_lock), NULL)) {
perror("Unable to init buffer lock");
exit(-1);
}
}
Buffer::~Buffer() {
for(int i=0; i<this->buffer_size; i++) {
if(this->buffer[i].fd_id==-1) continue;
this->buffer[i].fd_id = -1;
free(this->buffer[i].cr);
this->buffer[i].cr = NULL;
free(this->buffer[i].job);
this->buffer[i].job = NULL;
}
pthread_mutex_destroy(&(this->buffer_lock));
}
int Buffer::add_element(int fd, job_context *job, Common_Request *cr) {
bool found = false;
for(int i=0; i<this->buffer_size; i++) {
if(this->buffer[i].fd_id >= 0) continue;
else {
if(job->transport_type == TCP_IP_TRANSPORT) {
this->buffer[i].fd_id = job->tcp_transport->get_conn_fd();
}
else if(job->transport_type == RDMA_RC_TRANSPORT) {
this->buffer[i].fd_id = job->rdma_transport->get_conn_fd();
}
this->buffer[i].cr = cr;
this->buffer[i].job = job;
// this->ready_events[i] = 1;
found = true;
break;
}
}
return found? 0 : -1;
}
void Buffer::erase_element(int fd) {
for(int i=0; i<this->buffer_size; i++) {
if(this->buffer[i].fd_id != fd) continue;
else {
this->buffer[i].fd_id = -1;
this->buffer[i].in_use = false;
this->buffer[i].cr = NULL;
this->buffer[i].job = NULL;
// this->ready_events[i] = -1;
break;
}
}
return;
}
void Buffer::erase_element_by_pos(int pos) {
this->buffer[pos].fd_id = -1;
this->buffer[pos].in_use = false;
free(this->buffer[pos].cr);
this->buffer[pos].cr = NULL;
free(this->buffer[pos].job);
this->buffer[pos].job = NULL;
// this->ready_events[i] = -1;
return;
}
int Buffer::poll(int fd) {
pthread_mutex_lock(&(this->buffer_lock));
int i;
int j = -1;
for(i=0; i<this->buffer_size; i++) {
if(this->buffer[i].fd_id==fd && !this->buffer[i].in_use) {
this->buffer[i].in_use = true;
j = i;
break;
}
}
pthread_mutex_unlock(&(this->buffer_lock));
return j;
}
Buffer_Element Buffer::get_element(int pos) {
return this->buffer[pos];
}
#endif
\ No newline at end of file
#ifndef __BUFFER_H__
#define __BUFFER_H__
#include <thread>
#include <vector>
#include "common.hpp"
#include "queue_context.hpp"
struct Buffer_Element {
int fd_id; // using fd as an id for threads
bool in_use;
struct Common_Request *cr;
struct job_context *job;
Buffer_Element() {this->fd_id = -1; this->cr = NULL;}
};
class Buffer {
private:
pthread_mutex_t buffer_lock;
public:
// std::vector<int> ready_events;
size_t buffer_size;
std::vector<Buffer_Element> buffer;
Buffer(size_t buf_size);
~Buffer();
int add_element(int fd, job_context *job, Common_Request *cr);
void erase_element(int fd);
void erase_element_by_pos(int pos);
int poll();
int poll(int fd);
Buffer_Element get_element(int pos);
};
#endif
\ No newline at end of file
#include <iostream>
#include <thread>
#include <pthread.h>
#include "queue_context.hpp"
#include "threadsafe_queue.hpp"
#include "../transport_api/transport_config.hpp"
using namespace std;
Thread_Safe_Queue q;
void work(int a) {
struct job_context* t = q.get_job();
if(t==NULL) {
cout << "Queue Empty" <<endl;
return;
}
cout<<"Got job"<<endl;
RDMA_config *config = t->rdma_transport->get_config();
cout<<config->mr.mr_size<<endl;
}
int main() {
RDMA_Transport* t = new RDMA_Transport();
cout<<q.queue_size<<endl;
q.enqueue(t, RDMA_RC_WRITE);
cout<<q.queue_size<<endl;
thread th(work, 0);
thread th1(work, 1);
th.join();
th1.join();
return 0;
}
\ No newline at end of file
This diff is collapsed.
#ifndef __CLI_API_H__
#define __CLI_API_H__
#include <string>
#include <vector>
std::vector<std::string> tokenize(std::string raw_str);
std::string get_key(char* kv);
std::string get_val(char* kv);
std::pair<std::string, std::string> get_kv(char* kv);
char* get_key_ptr(char* blob);
char* get_val_ptr(char* blob);
size_t get_key_size_from_blob(char* blob);
size_t get_val_size_from_blob(char* blob);
size_t get_key_size_from_start_ptr(char* key_ptr);
size_t get_val_size_from_start_ptr(char* val_ptr);
#endif
\ No newline at end of file
#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include "cli_api.hpp"
using namespace std;
int main() {
char* ip = (char*) malloc(1024);
vector<string> tokens;
while(true) {
memset(ip, 0, 1024);
cin.getline(ip, 1024);
printf("Got raw cmd: %s\n", ip);
tokens = tokenize(string(ip));
if(tokens[0].compare("exit")==0 || tokens[0].compare("EXIT")==0) {
printf("Got exit: %s\n", tokens[0].c_str());
break;
}
printf("Command\t\tArg1\t\tArg2\n");
printf("%s\t\t%s\t\t%s\n", tokens[0].c_str(), tokens[1].c_str(), tokens[2].c_str());
}
}
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#ifndef __HASH_C__
#define __HASH_C__
#include <stdint.h>
#include "hash.hpp"
uint32_t hash(uint64_t key, int n) {
uint64_t t_n = static_cast<uint64_t>(n);
uint64_t hash_val = key%t_n;
return static_cast<uint32_t>(hash_val);
}
#endif
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment