Commit be13de7c authored by NILANJAN DAW's avatar NILANJAN DAW

Integrated netcache with hash router

-- Enabled caching at switch using netcache
-- Integrated cache with Key partition router
-- Tested partition routing and caching with 2 replicaset
-- Tested functionality of KV working even if server is down
parent f09261c0
......@@ -8,3 +8,5 @@ controller/hot.txt
generator/hot.txt
generator/kv.txt
generator/query.txt
log*
......@@ -13,7 +13,7 @@ path_query = "query.txt"
query_rate = 1000
len_key = 16
print NC_PORT
counter = 0
def counting():
last_counter = 0
......@@ -32,7 +32,7 @@ for line in f.readlines():
key_header = int(line[1])
key_body = line[2:]
op_field = struct.pack("B", 8)
op_field = struct.pack("B", 0)
key_field = struct.pack(">I", key_header)
for i in range(len(key_body)):
......
......@@ -2,13 +2,21 @@ import socket
import struct
import time
import thread
import argparse
from nc_config import *
NC_PORT = 8889
NC_PORT = 8888
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--server-ip', help='IP of server',
type=str, action="store", required=True)
parser.add_argument('--controller-ip', help='IP of controller',
type=str, action="store", required=True)
args = parser.parse_args()
CLIENT_IP = "10.0.0.1"
SERVER_IP = "10.0.0.2"
CONTROLLER_IP = "10.0.0.3"
SERVER_IP = args.server_ip
CONTROLLER_IP = args.controller_ip
path_hot = "hot.txt"
path_log = "controller_log.txt"
......@@ -18,52 +26,52 @@ len_val = 128
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CONTROLLER_IP, NC_PORT))
## Initiate the switch
# op = NC_UPDATE_REQUEST
# op_field = struct.pack("B", op)
# f = open(path_hot, "r")
# for line in f.readlines():
# line = line.split()
# key_header = line[0]
# key_body = line[1:]
# Initiate the switch
op = NC_UPDATE_REQUEST
op_field = struct.pack("B", op)
f = open(path_hot, "r")
for line in f.readlines():
line = line.split()
key_header = line[0]
key_body = line[1:]
# key_header = int(key_header)
# for i in range(len(key_body)):
# key_body[i] = int(key_body[i], 16)
key_header = int(key_header)
for i in range(len(key_body)):
key_body[i] = int(key_body[i], 16)
# key_field = ""
# key_field += struct.pack(">I", key_header)
# for i in range(len(key_body)):
# key_field += struct.pack("B", key_body[i])
key_field = ""
key_field += struct.pack(">I", key_header)
for i in range(len(key_body)):
key_field += struct.pack("B", key_body[i])
# packet = op_field + key_field
# s.sendto(packet, (SERVER_IP, NC_PORT))
# time.sleep(0.001)
# f.close()
packet = op_field + key_field
s.sendto(packet, (SERVER_IP, NC_PORT))
time.sleep(0.001)
f.close()
## Listen hot report
#f = open(path_log, "w")
# Listen hot report
f = open(path_log, "w")
while True:
packet, addr = s.recvfrom(2048)
# print(packet, addr)
op_field = packet[0]
key_field = packet[1:len_key + 1]
# load_field = packet[len_key + 1:]
load_field = packet[len_key + 1:]
# op = struct.unpack("B", op_field)[0]
# if (op != NC_HOT_READ_REQUEST):
# continue
op = struct.unpack("B", op_field)[0]
if (op != NC_HOT_READ_REQUEST):
continue
key_header = struct.unpack(">I", key_field[:4])[0]
print(op_field, key_header)
# load = struct.unpack(">IIII", load_field)
load = struct.unpack(">IIII", load_field)
# counter = counter + 1
# print "\tHot Item:", key_header, load
counter = counter + 1
print "\tHot Item:", key_header, load
#f.write(str(key_header) + ' ')
#f.write(str(load) + ' ')
#f.write("\n")
#f.flush()
#f.close()
f.write(str(key_header) + ' ')
f.write(str(load) + ' ')
f.write("\n")
f.flush()
f.close()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -107,15 +107,28 @@ def main():
h.setMAC("aa:bb:cc:dd:ee:0%d" % (n + 1))
for i in range(nb_hosts):
if (i != n):
print("setting", "10.0.0.%d" % (i + 1), "aa:bb:cc:dd:ee:0%d" % (i + 1))
h.setARP("10.0.0.%d" % (i + 1), "aa:bb:cc:dd:ee:0%d" % (i + 1))
net.get('s1').setMAC("aa:bb:cc:dd:ee:1%d" % (n + 1), "s1-eth%d" % (n + 1))
ctr = 1
for i in range(3):
for j in range(3):
# if (n + 1) == 1:
print("setting s%d" % (i + 1), "aa:bb:cc:dd:ee:1%d" % (ctr), "s%d-eth%d" % (i + 1, j + 1))
net.get('s%d' % (i + 1)).setMAC("aa:bb:cc:dd:ee:1%d" % (ctr), "s%d-eth%d" % (i + 1, j + 1))
# elif (n + 1) < 4:
# print("setting s2", "aa:bb:cc:dd:ee:1%d" % (n + 1), "s2-eth%d" % (n + 1))
# net.get('s2').setMAC("aa:bb:cc:dd:ee:1%d" % (n + 1), "s2-eth%d" % (n + 1))
# else:
# print("setting s3", "aa:bb:cc:dd:ee:1%d" % (n + 1), "s3-eth%d" % (n + 1 - 2))
# net.get('s3').setMAC("aa:bb:cc:dd:ee:1%d" % (n + 1), "s3-eth%d" % (n + 1 - 2))
ctr += 1
sleep(1)
for i in range(nb_switches):
#cmd = [args.cli, "--json", args.json, "--thrift-port", str(_THRIFT_BASE_PORT + i)]
cmd = [args.cli, args.json, str(_THRIFT_BASE_PORT + i)]
with open("c.txt", "r") as f:
with open("c%d.txt" % (i + 1), "r") as f:
print " ".join(cmd)
try:
output = subprocess.check_output(cmd, stdin = f)
......
switches 1
hosts 3
switches 3
hosts 5
h1 s1
h2 s1
h3 s1
s2 s1
s3 s1
h2 s2
h3 s2
h4 s3
h5 s3
header_type nc_cache_md_t {
fields {
cache_exist: 1;
cache_index: 14;
cache_valid: 1;
}
}
metadata nc_cache_md_t nc_cache_md;
action check_cache_exist_act(index) {
modify_field (nc_cache_md.cache_exist, 1);
modify_field (nc_cache_md.cache_index, index);
}
table check_cache_exist {
reads {
nc_hdr.key: exact;
}
actions {
check_cache_exist_act;
}
size: NUM_CACHE;
}
register cache_valid_reg {
width: 1;
instance_count: NUM_CACHE;
}
action check_cache_valid_act() {
register_read(nc_cache_md.cache_valid, cache_valid_reg, nc_cache_md.cache_index);
}
table check_cache_valid {
actions {
check_cache_valid_act;
}
//default_action: check_cache_valid_act;
}
action set_cache_valid_act() {
register_write(cache_valid_reg, nc_cache_md.cache_index, 1);
}
table set_cache_valid {
actions {
set_cache_valid_act;
}
//default_action: set_cache_valid_act;
}
control process_cache {
apply (check_cache_exist);
if (nc_cache_md.cache_exist == 1) {
if (nc_hdr.op == NC_READ_REQUEST) {
apply (check_cache_valid);
}
else if (nc_hdr.op == NC_UPDATE_REPLY) {
apply (set_cache_valid);
}
}
}
#define HH_LOAD_WIDTH 32
#define HH_LOAD_NUM 256
#define HH_LOAD_HASH_WIDTH 8
#define HH_THRESHOLD 128
#define HH_BF_NUM 512
#define HH_BF_HASH_WIDTH 9
header_type nc_load_md_t {
fields {
index_1: 16;
index_2: 16;
index_3: 16;
index_4: 16;
load_1: 32;
load_2: 32;
load_3: 32;
load_4: 32;
}
}
metadata nc_load_md_t nc_load_md;
field_list hh_hash_fields {
nc_hdr.key;
}
register hh_load_1_reg {
width: HH_LOAD_WIDTH;
instance_count: HH_LOAD_NUM;
}
field_list_calculation hh_load_1_hash {
input {
hh_hash_fields;
}
algorithm : crc32;
output_width : HH_LOAD_HASH_WIDTH;
}
action hh_load_1_count_act() {
modify_field_with_hash_based_offset(nc_load_md.index_1, 0, hh_load_1_hash, HH_LOAD_NUM);
register_read(nc_load_md.load_1, hh_load_1_reg, nc_load_md.index_1);
register_write(hh_load_1_reg, nc_load_md.index_1, nc_load_md.load_1 + 1);
}
table hh_load_1_count {
actions {
hh_load_1_count_act;
}
}
register hh_load_2_reg {
width: HH_LOAD_WIDTH;
instance_count: HH_LOAD_NUM;
}
field_list_calculation hh_load_2_hash {
input {
hh_hash_fields;
}
algorithm : csum16;
output_width : HH_LOAD_HASH_WIDTH;
}
action hh_load_2_count_act() {
modify_field_with_hash_based_offset(nc_load_md.index_2, 0, hh_load_2_hash, HH_LOAD_NUM);
register_read(nc_load_md.load_2, hh_load_2_reg, nc_load_md.index_2);
register_write(hh_load_2_reg, nc_load_md.index_2, nc_load_md.load_2 + 1);
}
table hh_load_2_count {
actions {
hh_load_2_count_act;
}
}
register hh_load_3_reg {
width: HH_LOAD_WIDTH;
instance_count: HH_LOAD_NUM;
}
field_list_calculation hh_load_3_hash {
input {
hh_hash_fields;
}
algorithm : crc16;
output_width : HH_LOAD_HASH_WIDTH;
}
action hh_load_3_count_act() {
modify_field_with_hash_based_offset(nc_load_md.index_3, 0, hh_load_3_hash, HH_LOAD_NUM);
register_read(nc_load_md.load_3, hh_load_3_reg, nc_load_md.index_3);
register_write(hh_load_3_reg, nc_load_md.index_3, nc_load_md.load_3 + 1);
}
table hh_load_3_count {
actions {
hh_load_3_count_act;
}
}
register hh_load_4_reg {
width: HH_LOAD_WIDTH;
instance_count: HH_LOAD_NUM;
}
field_list_calculation hh_load_4_hash {
input {
hh_hash_fields;
}
algorithm : crc32;
output_width : HH_LOAD_HASH_WIDTH;
}
action hh_load_4_count_act() {
modify_field_with_hash_based_offset(nc_load_md.index_4, 0, hh_load_4_hash, HH_LOAD_NUM);
register_read(nc_load_md.load_4, hh_load_4_reg, nc_load_md.index_4);
register_write(hh_load_4_reg, nc_load_md.index_4, nc_load_md.load_4 + 1);
}
table hh_load_4_count {
actions {
hh_load_4_count_act;
}
}
control count_min {
apply (hh_load_1_count);
apply (hh_load_2_count);
apply (hh_load_3_count);
apply (hh_load_4_count);
}
header_type hh_bf_md_t {
fields {
index_1: 16;
index_2: 16;
index_3: 16;
bf_1: 1;
bf_2: 1;
bf_3: 1;
}
}
metadata hh_bf_md_t hh_bf_md;
register hh_bf_1_reg {
width: 1;
instance_count: HH_BF_NUM;
}
field_list_calculation hh_bf_1_hash {
input {
hh_hash_fields;
}
algorithm : crc32;
output_width : HH_BF_HASH_WIDTH;
}
action hh_bf_1_act() {
modify_field_with_hash_based_offset(hh_bf_md.index_1, 0, hh_bf_1_hash, HH_BF_NUM);
register_read(hh_bf_md.bf_1, hh_bf_1_reg, hh_bf_md.index_1);
register_write(hh_bf_1_reg, hh_bf_md.index_1, 1);
}
table hh_bf_1 {
actions {
hh_bf_1_act;
}
}
register hh_bf_2_reg {
width: 1;
instance_count: HH_BF_NUM;
}
field_list_calculation hh_bf_2_hash {
input {
hh_hash_fields;
}
algorithm : csum16;
output_width : HH_BF_HASH_WIDTH;
}
action hh_bf_2_act() {
modify_field_with_hash_based_offset(hh_bf_md.index_2, 0, hh_bf_2_hash, HH_BF_NUM);
register_read(hh_bf_md.bf_2, hh_bf_2_reg, hh_bf_md.index_2);
register_write(hh_bf_2_reg, hh_bf_md.index_2, 1);
}
table hh_bf_2 {
actions {
hh_bf_2_act;
}
}
register hh_bf_3_reg {
width: 1;
instance_count: HH_BF_NUM;
}
field_list_calculation hh_bf_3_hash {
input {
hh_hash_fields;
}
algorithm : crc16;
output_width : HH_BF_HASH_WIDTH;
}
action hh_bf_3_act() {
modify_field_with_hash_based_offset(hh_bf_md.index_3, 0, hh_bf_3_hash, HH_BF_NUM);
register_read(hh_bf_md.bf_3, hh_bf_3_reg, hh_bf_md.index_3);
register_write(hh_bf_3_reg, hh_bf_md.index_3, 1);
}
table hh_bf_3 {
actions {
hh_bf_3_act;
}
}
control bloom_filter {
apply (hh_bf_1);
apply (hh_bf_2);
apply (hh_bf_3);
}
field_list mirror_list {
nc_load_md.load_1;
nc_load_md.load_2;
nc_load_md.load_3;
nc_load_md.load_4;
}
#define CONTROLLER_MIRROR_DSET 3
action clone_to_controller_act() {
clone_egress_pkt_to_egress(CONTROLLER_MIRROR_DSET, mirror_list);
}
table clone_to_controller {
actions {
clone_to_controller_act;
}
}
control report_hot_step_1 {
apply (clone_to_controller);
}
#define CONTROLLER_IP 0x0a000003
action report_hot_act() {
modify_field (nc_hdr.op, NC_HOT_READ_REQUEST);
add_header (nc_load);
add_to_field(ipv4.totalLen, 16);
add_to_field(udp.len, 16);
modify_field (nc_load.load_1, nc_load_md.load_1);
modify_field (nc_load.load_2, nc_load_md.load_2);
modify_field (nc_load.load_3, nc_load_md.load_3);
modify_field (nc_load.load_4, nc_load_md.load_4);
modify_field (ipv4.dstAddr, CONTROLLER_IP);
}
table report_hot {
actions {
report_hot_act;
}
}
control report_hot_step_2 {
apply (report_hot);
}
control heavy_hitter {
if (standard_metadata.instance_type == 0) {
count_min();
if (nc_load_md.load_1 > HH_THRESHOLD) {
if (nc_load_md.load_2 > HH_THRESHOLD) {
if (nc_load_md.load_3 > HH_THRESHOLD) {
if (nc_load_md.load_4 > HH_THRESHOLD) {
bloom_filter();
if (hh_bf_md.bf_1 == 0 or hh_bf_md.bf_2 == 0 or hh_bf_md.bf_3 == 0){
report_hot_step_1();
}
}
}
}
}
}
else {
report_hot_step_2();
}
}
......@@ -5,12 +5,55 @@
#include "ethernet.p4"
#include "ipv4.p4"
#include "mds.p4"
#include "cache.p4"
#include "heavy_hitter.p4"
#include "value.p4"
#define PKT_INSTANCE_TYPE_NORMAL 0
#define PKT_INSTANCE_TYPE_INGRESS_CLONE 1
#define PKT_INSTANCE_TYPE_EGRESS_CLONE 2
#define PKT_INSTANCE_TYPE_COALESCED 3
#define PKT_INSTANCE_TYPE_INGRESS_RECIRC 4
#define PKT_INSTANCE_TYPE_REPLICATION 5
#define PKT_INSTANCE_TYPE_RESUBMIT 6
header_type resubmit_meta_t {
fields {
resubmit_key: 8;
}
}
metadata resubmit_meta_t resubmit_meta;
field_list resubmit_metadata {
resubmit_meta.resubmit_key;
}
action _resubmit_act() {
resubmit(resubmit_metadata);
}
table _resubmit{
actions {
_resubmit_act;
}
}
control ingress {
routePacket();
if (udp.dstPort == MDS_PORT) {
routePacket();
} else if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_NORMAL) {
apply(_resubmit);
} else {
process_cache();
process_value();
}
apply(ipv4_route);
}
control egress {
if (nc_hdr.op == NC_READ_REQUEST and nc_cache_md.cache_exist != 1) {
heavy_hitter();
}
apply(ethernet_set_mac);
}
\ No newline at end of file
#define NC_PORT 8888
#define SILLY_PORT 8089
#define MDS_PORT 8889
#define NUM_CACHE 128
#define CLUSTER_COUNT 2
#define NC_READ_REQUEST 0
#define NC_READ_REPLY 1
......
......@@ -52,12 +52,20 @@ header_type udp_t {
}
header udp_t udp;
header_type nc_hdr_t {
header_type router_hdr_t {
fields {
op: 8;
key: 32;
}
}
header router_hdr_t router_hdr;
header_type nc_hdr_t {
fields {
op: 8;
key: 128;
}
}
header nc_hdr_t nc_hdr;
header_type nc_load_t {
......
......@@ -30,20 +30,39 @@ parser parse_tcp {
parser parse_udp {
extract (udp);
return select (latest.dstPort) {
// NC_PORT: drop;
SILLY_PORT: parse_nc_hdr;
default: parse_nc_hdr;
NC_PORT: parse_nc_hdr;
MDS_PORT: parse_router_hdr;
default: ingress;
}
}
parser parse_router_hdr {
extract (router_hdr);
return ingress;
}
parser parse_nc_hdr {
extract (nc_hdr);
return select(latest.op) {
NC_READ_REQUEST: ingress;
NC_READ_REPLY: parse_value;
NC_HOT_READ_REQUEST: parse_nc_load;
NC_UPDATE_REQUEST: ingress;
NC_UPDATE_REPLY: parse_value;
default: ingress;
}
}
parser parse_nc_load {
extract (nc_load);
return ingress;
}
parser parse_value {
return parse_nc_value_1;
}
/*
The parsers for value headers are defined in value.p4
k = 1, 2, ..., 8
......
#define CLUSTER_COUNT 2
#define ROUTER_HASH_WIDTH 8
header_type router_t {
......@@ -11,6 +10,7 @@ metadata router_t router;
action update_dst_act(dstAddr) {
modify_field (ipv4.dstAddr, dstAddr);
modify_field (udp.dstPort, NC_PORT);
}
table update_dst {
......@@ -23,7 +23,7 @@ table update_dst {
}
field_list router_hash_fields {
nc_hdr.key;
router_hdr.key;
}
field_list_calculation router_hash {
......
#include "includes/defines.p4"
#include "includes/headers.p4"
#include "includes/parsers.p4"
#include "includes/checksum.p4"
#include "cache.p4"
#include "heavy_hitter.p4"
#include "value.p4"
#include "ipv4.p4"
#include "ethernet.p4"
control ingress {
process_cache();
process_value();
apply (ipv4_route);
}
control egress {
if (nc_hdr.op == NC_READ_REQUEST and nc_cache_md.cache_exist != 1) {
heavy_hitter();
}
apply (ethernet_set_mac);
}
#define HEADER_VALUE(i) \
header_type nc_value_##i##_t { \
fields { \
value_##i##_1: 32; \
value_##i##_2: 32; \
value_##i##_3: 32; \
value_##i##_4: 32; \
} \
} \
header nc_value_##i##_t nc_value_##i;
#define PARSER_VALUE(i, ip1) \
parser parse_nc_value_##i { \
extract (nc_value_##i); \
return parse_nc_value_##ip1; \
}
#define REGISTER_VALUE_SLICE(i, j) \
register value_##i##_##j##_reg { \
width: 32; \
instance_count: NUM_CACHE; \
}
#define REGISTER_VALUE(i) \
REGISTER_VALUE_SLICE(i, 1) \
REGISTER_VALUE_SLICE(i, 2) \
REGISTER_VALUE_SLICE(i, 3) \
REGISTER_VALUE_SLICE(i, 4)
#define ACTION_READ_VALUE_SLICE(i, j) \
action read_value_##i##_##j##_act() { \
register_read(nc_value_##i.value_##i##_##j, value_##i##_##j##_reg, nc_cache_md.cache_index); \
}
#define ACTION_READ_VALUE(i) \
ACTION_READ_VALUE_SLICE(i, 1) \
ACTION_READ_VALUE_SLICE(i, 2) \
ACTION_READ_VALUE_SLICE(i, 3) \
ACTION_READ_VALUE_SLICE(i, 4)
#define TABLE_READ_VALUE_SLICE(i, j) \
table read_value_##i##_##j { \
actions { \
read_value_##i##_##j##_act; \
} \
}
#define TABLE_READ_VALUE(i) \
TABLE_READ_VALUE_SLICE(i, 1) \
TABLE_READ_VALUE_SLICE(i, 2) \
TABLE_READ_VALUE_SLICE(i, 3) \
TABLE_READ_VALUE_SLICE(i, 4)
#define ACTION_ADD_VALUE_HEADER(i) \
action add_value_header_##i##_act() { \
add_to_field(ipv4.totalLen, 16);\
add_to_field(udp.len, 16);\
add_header(nc_value_##i); \
}
#define TABLE_ADD_VALUE_HEADER(i) \
table add_value_header_##i { \
actions { \
add_value_header_##i##_act; \
} \
}
#define ACTION_WRITE_VALUE_SLICE(i, j) \