Commit c8834a73 authored by Zhihao Bai's avatar Zhihao Bai

NetCache

parent 735f0f1f
================================================= 0 Introduction ==================================================
In this repository, I implemented a simple NetCache with standard P4 language and designed an experiemnt with behavior model to show the efficiency of NetCache.
I create a network with mininet, containing 1 switch and 3 hosts. One host is the server, which handles READ queries. One host is the client, which sends READ queries. The last host is to simulate the controller of the programmable switch, because the behavior model does not provide such an interface.
The experiemnts runs in the following steps. First, the switch starts. Some table entries are added. Second, the server starts. The server loads pre-generated key-value items. Third, the controller starts. The controller sends some UPDATE queries to the server to get values of some pre-determined hot items, and insert them to the switch. Finally, the client starts. The client sends READ queries to the server and receives replies. If a query hits the cache in the switch, the switch would directly reply this query without routing it to the server.
If some uncached items are detected to be hot by the heavy-hitter, the switch would send a HOT_READ to the controller.
================================================= 1 Obtaining required software ==================================================
It is recommended to do the following things in the directory "NetCache/".
Firstly, you need to get the p4 compiler from Github, and install required dependencies.
git clone https://github.com/p4lang/p4c-bm.git p4c-bmv2
cd p4c-bmv2
sudo pip install -r requirements.txt
Secondly, you need to get the behavior model of p4 from github, install dependencies and compile the behavior model.
git clone https://github.com/p4lang/behavioral-model.git bmv2
cd bmv2
install_deps.sh
./autogen.sh
./configure
make
Finally, you need to install some other tools which are used in this simulation.
sudo apt-get install mininet python-ipaddr
sudo pip install scapy thrift networkx
If you do not do the above things in "NetCache/", you need to modify the path to p4c-bmv2 and bmv2 in NetCache/mininet/run_demo.sh.
================================================= 2 Content ==================================================
NetCache/generator: Python programs, which generates key-value items and queries.
NetCache/client: Two Python programs for the client. "send.py" can read queries from the "query.txt" file and send queries to the server. "receive.py" can receive replies from the server and the switch. In addition, both of these programs can print current READ throughput to the screen.
NetCache/server: One Python program for the server. "server.py" can read key-value items from the "kv.txt" file, and reply UPDATE queries from the controller and READ queries from the client. In addition, this program can print current READ throughput to the screen.
NetCache/p4src: Codes for the NetCache in standard P4 language.
NetCache/controller: One Python program for the controller. "controller.py" can read hot keys from the "hot.txt" file, and send UPDATE requests to the server. Then the switch would insert values to the cache when detect UDPATE replies from the server. After updating the cache of the switch, the controller would wait for "HOT_READ" packets, which shows that a key is detected as hot. In addition, this program can print HOT_READ reports with heavy hitter information to the screen.
NetCache/mininet: Scripts to run the experiments.
================================================= 3 Run Simulation ==================================================
Experiment configuration: IP address "10.0.0.1" is for the client. IP address "10.0.0.2" is for the server. IP address "10.0.0.3" is for the controller. There are 1000 key-value items in total, following zipf-0.90 distribution. Items whose keys are 1, 3, 5, ..., 99 will be inserted to the cache of the switch, and items whose keys are 2, 4, ..., 100 will be detected as hot items and reported to the controller after running for several seconds. If an uncached item is accessed for 128 times, it would be reported to the controller, and this parameter can be changed in "NetCache/p4src/heavy_hitter.p4".
Before the experiment starts, you need to generate some files. Run "python gen_kv.py" in "NetCache/generator", and you will get "kv.txt" and "hot.txt". Copy "kv.txt" to "NetCache/server" and copy "hot.txt" to "NetCache/controller". Then run "python gen_query_zipf.py" in "NetCache/generator", and you will get "query.txt". It takes several minutes. Then copy "query.txt" to "NetCache/client".
To initialize the environment, open a terminal in "NetCache/mininet", and run "./run_demo.sh". After you can see "Ready! Starting CLI: mininet>", you can begin to run the experiment. In the following description, I will call this terminal "mininet terminal".
Firstly, in the mininet terminal, run "xterm h2" to open a terminal for the server. In the new terminal, enter "NetCache/server" by running "cd ../server" and run "python server.py". Then the server starts. You can see two numbers, which are the number of READ queries in one second and the total number of READ queries in the past time.
Secondly, in the mininet terminal, run "xterm h3" to open a terminal for the controller. In the new terminal, enter "NetCache/controller" by running "cd ../controller" and run "python controller.py". Then the controller starts. When the controller receives a HOT_READ report, the detected key and values of the heavy hitter will be displayed.
Thidly, in the mininet terminal, run "xterm h1" to open a terminal for "receive.py" of the client. In the new terminal, enter "NetCache/client" by running "cd ../client" and run "python receive.py". Then you can see the number of READ replies received per second and the total number of READ replies in the past time.
Finally, in the mininet terminal, run "xterm h1" to open a terminal for "send.py" of the client. In the new terminal, enter "NetCache/client" by running "cd ../client" and run "python send.py". THen you can the number of READ replies received per second and the total number of READ replies in the past time. At the same time, the displayed numbers of the server and the "receive.py" will change with the "send.py", and after several seconds the controller will show the detected hot keys.
In addition, READ replies received by the "receive.py" is more than READ requests handled by the server. This is because some queries are handled by the switch.
NC_READ_REQUEST = 0
NC_READ_REPLY = 1
NC_HOT_READ_REQUEST = 2
NC_WRITE_REQUEST = 4
NC_WRITE_REPLY = 5
NC_UPDATE_REQUEST = 8
NC_UPDATE_REPLY = 9
import socket
import struct
import time
import thread
from nc_config import *
NC_PORT = 8888
CLIENT_IP = "10.0.0.1"
SERVER_IP = "10.0.0.2"
CONTROLLER_IP = "10.0.0.3"
path_reply = "reply.txt"
len_key = 16
counter = 0
def counting():
last_counter = 0
while True:
print (counter - last_counter), counter
last_counter = counter
time.sleep(1)
thread.start_new_thread(counting, ())
#f = open(path_reply, "w")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, NC_PORT))
while True:
packet, addr = s.recvfrom(1024)
counter = counter + 1
#op = struct.unpack("B", packet[0])
#key_header = struct.unpack(">I", packet[1:5])[0]
#f.write(str(op) + ' ')
#f.write(str(key_header) + '\n')
#f.flush()
#print counter
#f.close()
import socket
import struct
import time
import thread
from nc_config import *
NC_PORT = 8888
CLIENT_IP = "10.0.0.1"
SERVER_IP = "10.0.0.2"
CONTROLLER_IP = "10.0.0.3"
path_query = "query.txt"
query_rate = 1000
len_key = 16
counter = 0
def counting():
last_counter = 0
while True:
print (counter - last_counter), counter
last_counter = counter
time.sleep(1)
thread.start_new_thread(counting, ())
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
f = open(path_query, "r")
interval = 1.0 / (query_rate + 1)
for line in f.readlines():
line = line.split()
op = line[0]
key_header = int(line[1])
key_body = line[2:]
op_field = struct.pack("B", NC_READ_REQUEST)
key_field = struct.pack(">I", key_header)
for i in range(len(key_body)):
key_field += struct.pack("B", int(key_body[i], 16))
packet = op_field + key_field
s.sendto(packet, (SERVER_IP, NC_PORT))
counter = counter + 1
time.sleep(interval)
f.close()
import socket
import struct
import time
import thread
from nc_config import *
NC_PORT = 8888
CLIENT_IP = "10.0.0.1"
SERVER_IP = "10.0.0.2"
CONTROLLER_IP = "10.0.0.3"
path_hot = "hot.txt"
path_log = "controller_log.txt"
len_key = 16
len_val = 128
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CONTROLLER_IP, NC_PORT))
## Initiate the switch
op = NC_UPDATE_REQUEST
op_field = struct.pack("B", op)
f = open(path_hot, "r")
for line in f.readlines():
line = line.split()
key_header = line[0]
key_body = line[1:]
key_header = int(key_header)
for i in range(len(key_body)):
key_body[i] = int(key_body[i], 16)
key_field = ""
key_field += struct.pack(">I", key_header)
for i in range(len(key_body)):
key_field += struct.pack("B", key_body[i])
packet = op_field + key_field
s.sendto(packet, (SERVER_IP, NC_PORT))
time.sleep(0.001)
f.close()
## Listen hot report
#f = open(path_log, "w")
while True:
packet, addr = s.recvfrom(2048)
op_field = packet[0]
key_field = packet[1:len_key + 1]
load_field = packet[len_key + 1:]
op = struct.unpack("B", op_field)[0]
if (op != NC_HOT_READ_REQUEST):
continue
key_header = struct.unpack(">I", key_field[:4])[0]
load = struct.unpack(">IIII", load_field)
counter = counter + 1
print "\tHot Item:", key_header, load
#f.write(str(key_header) + ' ')
#f.write(str(load) + ' ')
#f.write("\n")
#f.flush()
#f.close()
NC_READ_REQUEST = 0
NC_READ_REPLY = 1
NC_HOT_READ_REQUEST = 2
NC_WRITE_REQUEST = 4
NC_WRITE_REPLY = 5
NC_UPDATE_REQUEST = 8
NC_UPDATE_REPLY = 9
import random
path_kv = "kv.txt" #The path to save generated keys and values
path_hot = "hot.txt" #The path to save the hot keys
len_key = 16 #The number of bytes in the key
len_val = 128 #The number of bytes in the value
max_key = 1000 #The number of keys
max_hot = 100 #The number of hot keys
f = open(path_kv, "w")
f_hot = open(path_hot, "w")
f.write(str(max_key) + "\n\n")
for i in range(1, max_key + 1):
## Generate a key-value item
#Select a key
key_header = i
key_body = [0] * (len_key - 4)
#Select a value
val = [1] * len_val #The value
###################################################################################################
## Output the key and the value to the file
f.write(str(key_header) + " ")
for i in range(len(key_body)):
f.write(hex(key_body[i]) + " ")
f.write("\n")
for i in range(len(val)):
f.write(hex(val[i]) + " ")
f.write("\n\n")
###################################################################################################
##Output the hot key to the file
if (key_header <= max_hot and key_header % 2 == 1):
f_hot.write(str(key_header) + " ")
for i in range(len(key_body)):
f_hot.write(hex(key_body[i]) + " ")
f_hot.write("\n")
###################################################################################################
f.flush()
f.close()
f_hot.flush()
f_hot.close()
import random
path_query = "query.txt"
num_query = 1000000
len_key = 16
len_val = 128
max_key = 1000
f = open(path_query, "w")
for i in range(num_query):
#Randomly select a key
key_header = random.randint(1, max_key)
key_body = [0] * (len_key - 4)
#Save the generated query to the file
f.write("get ")
f.write(str(key_header) + ' ')
for i in range(len(key_body)):
f.write(hex(key_body[i]) + ' ')
f.write('\n')
f.flush()
f.close()
import random
import math
path_query = "query.txt"
num_query = 1000000
zipf = 0.99
len_key = 16
len_val = 128
max_key = 1000
#Zipf
zeta = [0.0]
for i in range(1, max_key + 1):
zeta.append(zeta[i - 1] + 1 / pow(i, zipf))
field = [0] * (num_query + 1)
k = 1
for i in range(1, num_query + 1):
if (i > num_query * zeta[k] / zeta[max_key]):
k = k + 1
field[i] = k
#Generate queries
f = open(path_query, "w")
for i in range(num_query):
#Randomly select a key in zipf distribution
r = random.randint(1, num_query)
key_header = field[r]
key_body = [0] * (len_key - 4)
#Save the generated query to the file
f.write("get ")
f.write(str(key_header) + ' ')
for i in range(len_key - 4):
f.write(hex(key_body[i]) + ' ')
f.write('\n')
f.flush()
f.close()
path_to_cmd = "commands_cache.txt"
max_hot = 100
len_key = 16
f = open(path_to_cmd, "w")
for i in range(1, max_hot + 1, 2):
x = i << ((len_key - 4) * 8)
f.write("table_add check_cache_exist check_cache_exist_act %d => %d\n" % (x, i))
f.flush()
f.close()
path_to_cmd = "commands_value.txt"
f = open(path_to_cmd, "w")
for i in range(1, 9):
for j in range(1, 5):
f.write("table_set_default read_value_%d_%d read_value_%d_%d_act\n" % (i, j, i, j))
f.write("table_set_default add_value_header_%d add_value_header_%d_act\n" % (i, i))
f.write("table_set_default write_value_%d_%d write_value_%d_%d_act\n" % (i, j, i, j))
f.write("table_set_default remove_value_header_%d remove_value_header_%d_act\n" % (i, i))
f.flush()
f.close()
This diff is collapsed.
This diff is collapsed.
# Copyright 2013-present Barefoot Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from mininet.net import Mininet
from mininet.node import Switch, Host
from mininet.log import setLogLevel, info, error, debug
from mininet.moduledeps import pathCheck
from sys import exit
import os
import tempfile
import socket
class P4Host(Host):
def config(self, **params):
r = super(Host, self).config(**params)
self.defaultIntf().rename("eth0")
for off in ["rx", "tx", "sg"]:
cmd = "/sbin/ethtool --offload eth0 %s off" % off
self.cmd(cmd)
# disable IPv6
self.cmd("sysctl -w net.ipv6.conf.all.disable_ipv6=1")
self.cmd("sysctl -w net.ipv6.conf.default.disable_ipv6=1")
self.cmd("sysctl -w net.ipv6.conf.lo.disable_ipv6=1")
return r
def describe(self):
print "**********"
print self.name
print "default interface: %s\t%s\t%s" %(
self.defaultIntf().name,
self.defaultIntf().IP(),
self.defaultIntf().MAC()
)
print "**********"
class P4Switch(Switch):
"""P4 virtual switch"""
device_id = 0
def __init__(self, name, sw_path = None, json_path = None,
thrift_port = None,
pcap_dump = False,
log_console = False,
verbose = False,
device_id = None,
enable_debugger = False,
**kwargs):
Switch.__init__(self, name, **kwargs)
assert(sw_path)
assert(json_path)
# make sure that the provided sw_path is valid
pathCheck(sw_path)
# make sure that the provided JSON file exists
if not os.path.isfile(json_path):
error("Invalid JSON file.\n")
exit(1)
self.sw_path = sw_path
self.json_path = json_path
self.verbose = verbose
logfile = "/tmp/p4s.{}.log".format(self.name)
self.output = open(logfile, 'w')
self.thrift_port = thrift_port
self.pcap_dump = pcap_dump
self.enable_debugger = enable_debugger
self.log_console = log_console
if device_id is not None:
self.device_id = device_id
P4Switch.device_id = max(P4Switch.device_id, device_id)
else:
self.device_id = P4Switch.device_id
P4Switch.device_id += 1
self.nanomsg = "ipc:///tmp/bm-{}-log.ipc".format(self.device_id)
@classmethod
def setup(cls):
pass
def check_switch_started(self, pid):
"""While the process is running (pid exists), we check if the Thrift
server has been started. If the Thrift server is ready, we assume that
the switch was started successfully. This is only reliable if the Thrift
server is started at the end of the init process"""
while True:
if not os.path.exists(os.path.join("/proc", str(pid))):
return False
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex(("localhost", self.thrift_port))
if result == 0:
return True
def start(self, controllers):
"Start up a new P4 switch"
info("Starting P4 switch {}.\n".format(self.name))
args = [self.sw_path]
for port, intf in self.intfs.items():
if not intf.IP():
args.extend(['-i', str(port) + "@" + intf.name])
if self.pcap_dump:
args.append("--pcap")
# args.append("--useFiles")
if self.thrift_port:
args.extend(['--thrift-port', str(self.thrift_port)])
if self.nanomsg:
args.extend(['--nanolog', self.nanomsg])
args.extend(['--device-id', str(self.device_id)])
P4Switch.device_id += 1
args.append(self.json_path)
if self.enable_debugger:
args.append("--debugger")
if self.log_console:
args.append("--log-console")
logfile = "/tmp/p4s.{}.log".format(self.name)
info(' '.join(args) + "\n")
pid = None
with tempfile.NamedTemporaryFile() as f:
# self.cmd(' '.join(args) + ' > /dev/null 2>&1 &')
self.cmd(' '.join(args) + ' >' + logfile + ' 2>&1 & echo $! >> ' + f.name)
pid = int(f.read())
debug("P4 switch {} PID is {}.\n".format(self.name, pid))
if not self.check_switch_started(pid):
error("P4 switch {} did not start correctly.\n".format(self.name))
exit(1)
info("P4 switch {} has been started.\n".format(self.name))
def stop(self):
"Terminate P4 switch."
self.output.flush()
self.cmd('kill %' + self.sw_path)
self.cmd('wait')
self.deleteIntfs()
def attach(self, intf):
"Connect a data port"
assert(0)
def detach(self, intf):
"Disconnect a data port"
assert(0)
#!/bin/bash
# Copyright 2013-present Barefoot Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
BMV2_PATH=../bmv2
P4C_BM_PATH=../p4c-bmv2
P4C_BM_SCRIPT=$P4C_BM_PATH/p4c_bm/__main__.py
SWITCH_PATH=$BMV2_PATH/targets/simple_switch/simple_switch
#CLI_PATH=$BMV2_PATH/tools/runtime_CLI.py
CLI_PATH=$BMV2_PATH/targets/simple_switch/sswitch_CLI
$P4C_BM_SCRIPT ../p4src/netcache.p4 --json netcache.json
# This gives libtool the opportunity to "warm-up"
sudo $SWITCH_PATH >/dev/null 2>&1
sudo PYTHONPATH=$PYTHONPATH:$BMV2_PATH/mininet/ python topo.py \
--behavioral-exe $SWITCH_PATH \
--json netcache.json \
--cli $CLI_PATH
#!/usr/bin/python
# Copyright 2013-present Barefoot Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mininet.net import Mininet
from mininet.topo import Topo
from mininet.log import setLogLevel, info
from mininet.cli import CLI
from mininet.link import TCLink
from p4_mininet import P4Switch, P4Host
import argparse
from time import sleep
import os
import subprocess
_THIS_DIR = os.path.dirname(os.path.realpath(__file__))
_THRIFT_BASE_PORT = 22222
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--behavioral-exe', help='Path to behavioral executable',
type=str, action="store", required=True)
parser.add_argument('--json', help='Path to JSON config file',
type=str, action="store", required=True)
parser.add_argument('--cli', help='Path to BM CLI',
type=str, action="store", required=True)
args = parser.parse_args()
class MyTopo(Topo):
def __init__(self, sw_path, json_path, nb_hosts, nb_switches, links, **opts):
# Initialize topology and default options
Topo.__init__(self, **opts)
for i in xrange(nb_switches):
switch = self.addSwitch('s%d' % (i + 1),
sw_path = sw_path,
json_path = json_path,
thrift_port = _THRIFT_BASE_PORT + i,
pcap_dump = True,
device_id = i)
for h in xrange(nb_hosts):
host = self.addHost('h%d' % (h + 1))
for a, b in links:
self.addLink(a, b)
def read_topo():
nb_hosts = 0
nb_switches = 0
links = []
with open("topo.txt", "r") as f:
line = f.readline()[:-1]
w, nb_switches = line.split()
assert(w == "switches")
line = f.readline()[:-1]
w, nb_hosts = line.split()
assert(w == "hosts")
for line in f:
if not f: break
a, b = line.split()
links.append( (a, b) )
return int(nb_hosts), int(nb_switches), links
def main():
nb_hosts, nb_switches, links = read_topo()
topo = MyTopo(args.behavioral_exe,
args.json,
nb_hosts, nb_switches, links)
net = Mininet(topo = topo,
host = P4Host,
switch = P4Switch,
controller = None,
autoStaticArp=True )
net.start()
for n in range(nb_hosts):
h = net.get('h%d' % (n + 1))
for off in ["rx", "tx", "sg"]:
cmd = "/sbin/ethtool --offload eth0 %s off" % off
print cmd
h.cmd(cmd)
print "disable ipv6"
h.cmd("sysctl -w net.ipv6.conf.all.disable_ipv6=1")
h.cmd("sysctl -w net.ipv6.conf.default.disable_ipv6=1")
h.cmd("sysctl -w net.ipv6.conf.lo.disable_ipv6=1")
h.cmd("sysctl -w net.ipv4.tcp_congestion_control=reno")
h.cmd("iptables -I OUTPUT -p icmp --icmp-type destination-unreachable -j DROP")
h.setIP("10.0.0.%d" % (n + 1))
h.setMAC("aa:bb:cc:dd:ee:0%d" % (n + 1))
for i in range(nb_hosts):
if (i != n):
h.setARP("10.0.0.%d" % (i + 1), "aa:bb:cc:dd:ee:0%d" % (i + 1))
net.get('s1').setMAC("aa:bb:cc:dd:ee:1%d" % (n + 1), "s1-eth%d" % (n + 1))
sleep(1)
for i in range(nb_switches):
#cmd = [args.cli, "--json", args.json, "--thrift-port", str(_THRIFT_BASE_PORT + i)]
cmd = [args.cli, args.json, str(_THRIFT_BASE_PORT + i)]
with open("commands.txt", "r") as f:
print " ".join(cmd)
try:
output = subprocess.check_output(cmd, stdin = f)
print output
except subprocess.CalledProcessError as e:
print e
print e.output
sleep(1)
print "Ready !"
CLI( net )
net.stop()
if __name__ == '__main__':
setLogLevel( 'info' )
main()
switches 1
hosts 3
h1 s1
h2 s1
h3 s1
NC_READ_REQUEST = 0
NC_READ_REPLY = 1
NC_HOT_READ_REQUEST = 2
NC_WRITE_REQUEST = 4
NC_WRITE_REPLY = 5
NC_UPDATE_REQUEST = 8
NC_UPDATE_REPLY = 9
header_type nc_cache_md_t {
fields {
cache_exist: 1;
cache_index: 14;
cache_valid: 1;
}
}
metadata nc_cache_md_t nc_cache_md;
action check_cache_exist_act(index) {
modify_field (nc_cache_md.cache_exist, 1);
modify_field (nc_cache_md.cache_index, index);
}
table check_cache_exist {
reads {
nc_hdr.key: exact;
}
actions {
check_cache_exist_act;
}
size: NUM_CACHE;
}
register cache_valid_reg {
width: 1;
instance_count: NUM_CACHE;
}
action check_cache_valid_act() {
register_read(nc_cache_md.cache_valid, cache_valid_reg, nc_cache_md.cache_index);
}
table check_cache_valid {
actions {
check_cache_valid_act;
}
//default_action: check_cache_valid_act;
}
action set_cache_valid_act() {
register_write(cache_valid_reg, nc_cache_md.cache_index, 1);
}
table set_cache_valid {
actions {
set_cache_valid_act;
}
//default_action: set_cache_valid_act;
}
control process_cache {
apply (check_cache_exist);
if (nc_cache_md.cache_exist == 1) {
if (nc_hdr.op == NC_READ_REQUEST) {
apply (check_cache_valid);
}
else if (nc_hdr.op == NC_UPDATE_REPLY) {
apply (set_cache_valid);
}
}
}
action ethernet_set_mac_act (smac, dmac) {
modify_field (ethernet.srcAddr, smac);
modify_field (ethernet.dstAddr, dmac);
}
table ethernet_set_mac {
reads {
standard_metadata.egress_port: exact;
}
actions {
ethernet_set_mac_act;
}
}
#define HH_LOAD_WIDTH 32
#define HH_LOAD_NUM 256
#define HH_LOAD_HASH_WIDTH 8
#define HH_THRESHOLD 128
#define HH_BF_NUM 512
#define HH_BF_HASH_WIDTH 9
header_type nc_load_md_t {
fields {
index_1: 16;
index_2: 16;
index_3: 16;
index_4: 16;
load_1: 32;
load_2: 32;
load_3: 32;
load_4: 32;
}
}
metadata nc_load_md_t nc_load_md;
field_list hh_hash_fields {
nc_hdr.key;
}
register hh_load_1_reg {
width: HH_LOAD_WIDTH;
instance_count: HH_LOAD_NUM;
}
field_list_calculation hh_load_1_hash {
input {
hh_hash_fields;
}
algorithm : crc32;
output_width : HH_LOAD_HASH_WIDTH;
}
action hh_load_1_count_act() {
modify_field_with_hash_based_offset(nc_load_md.index_1, 0, hh_load_1_hash, HH_LOAD_NUM);
register_read(nc_load_md.load_1, hh_load_1_reg, nc_load_md.index_1);
register_write(hh_load_1_reg, nc_load_md.index_1, nc_load_md.load_1 + 1);
}
table hh_load_1_count {
actions {
hh_load_1_count_act;
}
}
register hh_load_2_reg {
width: HH_LOAD_WIDTH;
instance_count: HH_LOAD_NUM;
}
field_list_calculation hh_load_2_hash {
input {
hh_hash_fields;
}
algorithm : csum16;
output_width : HH_LOAD_HASH_WIDTH;
}
action hh_load_2_count_act() {
modify_field_with_hash_based_offset(nc_load_md.index_2, 0, hh_load_2_hash, HH_LOAD_NUM);
register_read(nc_load_md.load_2, hh_load_2_reg, nc_load_md.index_2);
register_write(hh_load_2_reg, nc_load_md.index_2, nc_load_md.load_2 + 1);
}
table hh_load_2_count {
actions {
hh_load_2_count_act;
}
}
register hh_load_3_reg {
width: HH_LOAD_WIDTH;
instance_count: HH_LOAD_NUM;
}
field_list_calculation hh_load_3_hash {
input {
hh_hash_fields;
}
algorithm : crc16;
output_width : HH_LOAD_HASH_WIDTH;
}
action hh_load_3_count_act() {
modify_field_with_hash_based_offset(nc_load_md.index_3, 0, hh_load_3_hash, HH_LOAD_NUM);
register_read(nc_load_md.load_3, hh_load_3_reg, nc_load_md.index_3);
register_write(hh_load_3_reg, nc_load_md.index_3, nc_load_md.load_3 + 1);
}
table hh_load_3_count {
actions {
hh_load_3_count_act;
}
}
register hh_load_4_reg {
width: HH_LOAD_WIDTH;
instance_count: HH_LOAD_NUM;
}
field_list_calculation hh_load_4_hash {
input {
hh_hash_fields;
}
algorithm : crc32;
output_width : HH_LOAD_HASH_WIDTH;
}
action hh_load_4_count_act() {
modify_field_with_hash_based_offset(nc_load_md.index_4, 0, hh_load_4_hash, HH_LOAD_NUM);
register_read(nc_load_md.load_4, hh_load_4_reg, nc_load_md.index_4);
register_write(hh_load_4_reg, nc_load_md.index_4, nc_load_md.load_4 + 1);
}
table hh_load_4_count {
actions {
hh_load_4_count_act;
}
}
control count_min {
apply (hh_load_1_count);
apply (hh_load_2_count);
apply (hh_load_3_count);
apply (hh_load_4_count);
}
header_type hh_bf_md_t {
fields {
index_1: 16;
index_2: 16;
index_3: 16;
bf_1: 1;
bf_2: 1;
bf_3: 1;
}
}
metadata hh_bf_md_t hh_bf_md;
register hh_bf_1_reg {
width: 1;
instance_count: HH_BF_NUM;
}
field_list_calculation hh_bf_1_hash {
input {
hh_hash_fields;
}
algorithm : crc32;
output_width : HH_BF_HASH_WIDTH;
}
action hh_bf_1_act() {
modify_field_with_hash_based_offset(hh_bf_md.index_1, 0, hh_bf_1_hash, HH_BF_NUM);
register_read(hh_bf_md.bf_1, hh_bf_1_reg, hh_bf_md.index_1);
register_write(hh_bf_1_reg, hh_bf_md.index_1, 1);
}
table hh_bf_1 {
actions {
hh_bf_1_act;
}
}
register hh_bf_2_reg {
width: 1;
instance_count: HH_BF_NUM;
}
field_list_calculation hh_bf_2_hash {
input {
hh_hash_fields;
}
algorithm : csum16;
output_width : HH_BF_HASH_WIDTH;
}
action hh_bf_2_act() {
modify_field_with_hash_based_offset(hh_bf_md.index_2, 0, hh_bf_2_hash, HH_BF_NUM);
register_read(hh_bf_md.bf_2, hh_bf_2_reg, hh_bf_md.index_2);
register_write(hh_bf_2_reg, hh_bf_md.index_2, 1);
}
table hh_bf_2 {
actions {
hh_bf_2_act;
}
}
register hh_bf_3_reg {
width: 1;
instance_count: HH_BF_NUM;
}
field_list_calculation hh_bf_3_hash {
input {
hh_hash_fields;
}
algorithm : crc16;
output_width : HH_BF_HASH_WIDTH;
}
action hh_bf_3_act() {
modify_field_with_hash_based_offset(hh_bf_md.index_3, 0, hh_bf_3_hash, HH_BF_NUM);
register_read(hh_bf_md.bf_3, hh_bf_3_reg, hh_bf_md.index_3);
register_write(hh_bf_3_reg, hh_bf_md.index_3, 1);
}
table hh_bf_3 {
actions {
hh_bf_3_act;
}
}
control bloom_filter {
apply (hh_bf_1);
apply (hh_bf_2);
apply (hh_bf_3);
}
field_list mirror_list {
nc_load_md.load_1;
nc_load_md.load_2;
nc_load_md.load_3;
nc_load_md.load_4;
}
#define CONTROLLER_MIRROR_DSET 3
action clone_to_controller_act() {
clone_egress_pkt_to_egress(CONTROLLER_MIRROR_DSET, mirror_list);
}
table clone_to_controller {
actions {
clone_to_controller_act;
}
}
control report_hot_step_1 {
apply (clone_to_controller);
}
#define CONTROLLER_IP 0x0a000003
action report_hot_act() {
modify_field (nc_hdr.op, NC_HOT_READ_REQUEST);
add_header (nc_load);
add_to_field(ipv4.totalLen, 16);
add_to_field(udp.len, 16);
modify_field (nc_load.load_1, nc_load_md.load_1);
modify_field (nc_load.load_2, nc_load_md.load_2);
modify_field (nc_load.load_3, nc_load_md.load_3);
modify_field (nc_load.load_4, nc_load_md.load_4);
modify_field (ipv4.dstAddr, CONTROLLER_IP);
}
table report_hot {
actions {
report_hot_act;
}
}
control report_hot_step_2 {
apply (report_hot);
}
control heavy_hitter {
if (standard_metadata.instance_type == 0) {
count_min();
if (nc_load_md.load_1 > HH_THRESHOLD) {
if (nc_load_md.load_2 > HH_THRESHOLD) {
if (nc_load_md.load_3 > HH_THRESHOLD) {
if (nc_load_md.load_4 > HH_THRESHOLD) {
bloom_filter();
if (hh_bf_md.bf_1 == 0 or hh_bf_md.bf_2 == 0 or hh_bf_md.bf_3 == 0){
report_hot_step_1();
}
}
}
}
}
}
else {
report_hot_step_2();
}
}
field_list ipv4_field_list {
ipv4.version;
ipv4.ihl;
ipv4.diffserv;
ipv4.totalLen;
ipv4.identification;
ipv4.flags;
ipv4.fragOffset;
ipv4.ttl;
ipv4.protocol;
ipv4.srcAddr;
ipv4.dstAddr;
}
field_list_calculation ipv4_chksum_calc {
input {
ipv4_field_list;
}
algorithm : csum16;
output_width: 16;
}
calculated_field ipv4.hdrChecksum {
update ipv4_chksum_calc;
}
field_list udp_checksum_list {
// IPv4 Pseudo Header Format. Must modify for IPv6 support.
ipv4.srcAddr;
ipv4.dstAddr;
8'0;
ipv4.protocol;
udp.len;
udp.srcPort;
udp.dstPort;
udp.len;
// udp.checksum;
payload;
}
field_list_calculation udp_checksum {
input {
udp_checksum_list;
}
algorithm : csum16;
output_width : 16;
}
calculated_field udp.checksum {
update udp_checksum;
}
#define NC_PORT 8888
#define NUM_CACHE 128
#define NC_READ_REQUEST 0
#define NC_READ_REPLY 1
#define NC_HOT_READ_REQUEST 2
#define NC_WRITE_REQUEST 4
#define NC_WRITE_REPLY 5
#define NC_UPDATE_REQUEST 8
#define NC_UPDATE_REPLY 9
header_type ethernet_t {
fields {
dstAddr : 48;
srcAddr : 48;
etherType : 16;
}
}
header ethernet_t ethernet;
header_type ipv4_t {
fields {
version : 4;
ihl : 4;
diffserv : 8;
totalLen : 16;
identification : 16;
flags : 3;
fragOffset : 13;
ttl : 8;
protocol : 8;
hdrChecksum : 16;
srcAddr : 32;
dstAddr: 32;
}
}
header ipv4_t ipv4;
header_type tcp_t {
fields {
srcPort : 16;
dstPort : 16;
seqNo : 32;
ackNo : 32;
dataOffset : 4;
res : 3;
ecn : 3;
ctrl : 6;
window : 16;
checksum : 16;
urgentPtr : 16;
}
}
header tcp_t tcp;
header_type udp_t {
fields {
srcPort : 16;
dstPort : 16;
len : 16;
checksum : 16;
}
}
header udp_t udp;
header_type nc_hdr_t {
fields {
op: 8;
key: 128;
}
}
header nc_hdr_t nc_hdr;
header_type nc_load_t {
fields {
load_1: 32;
load_2: 32;
load_3: 32;
load_4: 32;
}
}
header nc_load_t nc_load;
/*
The headers for value are defined in value.p4
k = 1, 2, ..., 8
header_type nc_value_{k}_t {
fields {
value_{k}_1: 32;
value_{k}_2: 32;
value_{k}_3: 32;
value_{k}_4: 32;
}
}
*/
parser start {
return parse_ethernet;
}
#define ETHER_TYPE_IPV4 0x0800
parser parse_ethernet {
extract (ethernet);
return select (latest.etherType) {
ETHER_TYPE_IPV4: parse_ipv4;
default: ingress;
}
}
#define IPV4_PROTOCOL_TCP 6
#define IPV4_PROTOCOL_UDP 17
parser parse_ipv4 {
extract(ipv4);
return select (latest.protocol) {
IPV4_PROTOCOL_TCP: parse_tcp;
IPV4_PROTOCOL_UDP: parse_udp;
default: ingress;
}
}
parser parse_tcp {
extract (tcp);
return ingress;
}
parser parse_udp {
extract (udp);
return select (latest.dstPort) {
NC_PORT: parse_nc_hdr;
default: ingress;
}
}
parser parse_nc_hdr {
extract (nc_hdr);
return select(latest.op) {
NC_READ_REQUEST: ingress;
NC_READ_REPLY: parse_value;
NC_HOT_READ_REQUEST: parse_nc_load;
NC_UPDATE_REQUEST: ingress;
NC_UPDATE_REPLY: parse_value;
default: ingress;
}
}
parser parse_nc_load {
extract (nc_load);
return ingress;
}
parser parse_value {
return parse_nc_value_1;
}
/*
The parsers for value headers are defined in value.p4
k = 1, 2, ..., 8
parser parse_value_{k} {
extract (nc_value_{k});
return select(k) {
8: ingress;
default: parse_value_{k + 1};
}
}
*/
action set_egress(egress_spec) {
modify_field(standard_metadata.egress_spec, egress_spec);
add_to_field(ipv4.ttl, -1);
}
@pragma stage 11
table ipv4_route {
reads {
ipv4.dstAddr : exact;
}
actions {
set_egress;
}
size : 8192;
}
#include "includes/defines.p4"
#include "includes/headers.p4"
#include "includes/parsers.p4"
#include "includes/checksum.p4"
#include "cache.p4"
#include "heavy_hitter.p4"
#include "value.p4"
#include "ipv4.p4"
#include "ethernet.p4"
control ingress {
process_cache();
process_value();
apply (ipv4_route);
}
control egress {
if (nc_hdr.op == NC_READ_REQUEST and nc_cache_md.cache_exist != 1) {
heavy_hitter();
}
apply (ethernet_set_mac);
}
#define HEADER_VALUE(i) \
header_type nc_value_##i##_t { \
fields { \
value_##i##_1: 32; \
value_##i##_2: 32; \
value_##i##_3: 32; \
value_##i##_4: 32; \
} \
} \
header nc_value_##i##_t nc_value_##i;
#define PARSER_VALUE(i, ip1) \
parser parse_nc_value_##i { \
extract (nc_value_##i); \
return parse_nc_value_##ip1; \
}
#define REGISTER_VALUE_SLICE(i, j) \
register value_##i##_##j##_reg { \
width: 32; \
instance_count: NUM_CACHE; \
}
#define REGISTER_VALUE(i) \
REGISTER_VALUE_SLICE(i, 1) \
REGISTER_VALUE_SLICE(i, 2) \
REGISTER_VALUE_SLICE(i, 3) \
REGISTER_VALUE_SLICE(i, 4)
#define ACTION_READ_VALUE_SLICE(i, j) \
action read_value_##i##_##j##_act() { \
register_read(nc_value_##i.value_##i##_##j, value_##i##_##j##_reg, nc_cache_md.cache_index); \
}
#define ACTION_READ_VALUE(i) \
ACTION_READ_VALUE_SLICE(i, 1) \
ACTION_READ_VALUE_SLICE(i, 2) \
ACTION_READ_VALUE_SLICE(i, 3) \
ACTION_READ_VALUE_SLICE(i, 4)
#define TABLE_READ_VALUE_SLICE(i, j) \
table read_value_##i##_##j { \
actions { \
read_value_##i##_##j##_act; \
} \
}
#define TABLE_READ_VALUE(i) \
TABLE_READ_VALUE_SLICE(i, 1) \
TABLE_READ_VALUE_SLICE(i, 2) \
TABLE_READ_VALUE_SLICE(i, 3) \
TABLE_READ_VALUE_SLICE(i, 4)
#define ACTION_ADD_VALUE_HEADER(i) \
action add_value_header_##i##_act() { \
add_to_field(ipv4.totalLen, 16);\
add_to_field(udp.len, 16);\
add_header(nc_value_##i); \
}
#define TABLE_ADD_VALUE_HEADER(i) \
table add_value_header_##i { \
actions { \
add_value_header_##i##_act; \
} \
}
#define ACTION_WRITE_VALUE_SLICE(i, j) \
action write_value_##i##_##j##_act() { \
register_write(value_##i##_##j##_reg, nc_cache_md.cache_index, nc_value_##i.value_##i##_##j); \
}
#define ACTION_WRITE_VALUE(i) \
ACTION_WRITE_VALUE_SLICE(i, 1) \
ACTION_WRITE_VALUE_SLICE(i, 2) \
ACTION_WRITE_VALUE_SLICE(i, 3) \
ACTION_WRITE_VALUE_SLICE(i, 4)
#define TABLE_WRITE_VALUE_SLICE(i, j) \
table write_value_##i##_##j { \
actions { \
write_value_##i##_##j##_act; \
} \
}
#define TABLE_WRITE_VALUE(i) \
TABLE_WRITE_VALUE_SLICE(i, 1) \
TABLE_WRITE_VALUE_SLICE(i, 2) \
TABLE_WRITE_VALUE_SLICE(i, 3) \
TABLE_WRITE_VALUE_SLICE(i, 4)
#define ACTION_REMOVE_VALUE_HEADER(i) \
action remove_value_header_##i##_act() { \
subtract_from_field(ipv4.totalLen, 16);\
subtract_from_field(udp.len, 16);\
remove_header(nc_value_##i); \
}
#define TABLE_REMOVE_VALUE_HEADER(i) \
table remove_value_header_##i { \
actions { \
remove_value_header_##i##_act; \
} \
}
#define CONTROL_PROCESS_VALUE(i) \
control process_value_##i { \
if (nc_hdr.op == NC_READ_REQUEST and nc_cache_md.cache_valid == 1) { \
apply (add_value_header_##i); \
apply (read_value_##i##_1); \
apply (read_value_##i##_2); \
apply (read_value_##i##_3); \
apply (read_value_##i##_4); \
} \
else if (nc_hdr.op == NC_UPDATE_REPLY and nc_cache_md.cache_exist == 1) { \
apply (write_value_##i##_1); \
apply (write_value_##i##_2); \
apply (write_value_##i##_3); \
apply (write_value_##i##_4); \
apply (remove_value_header_##i); \
} \
}
#define HANDLE_VALUE(i, ip1) \
HEADER_VALUE(i) \
PARSER_VALUE(i, ip1) \
REGISTER_VALUE(i) \
ACTION_READ_VALUE(i) \
TABLE_READ_VALUE(i) \
ACTION_ADD_VALUE_HEADER(i) \
TABLE_ADD_VALUE_HEADER(i) \
ACTION_WRITE_VALUE(i) \
TABLE_WRITE_VALUE(i) \
ACTION_REMOVE_VALUE_HEADER(i) \
TABLE_REMOVE_VALUE_HEADER(i) \
CONTROL_PROCESS_VALUE(i)
#define FINAL_PARSER(i) \
parser parse_nc_value_##i { \
return ingress; \
}
HANDLE_VALUE(1, 2)
HANDLE_VALUE(2, 3)
HANDLE_VALUE(3, 4)
HANDLE_VALUE(4, 5)
HANDLE_VALUE(5, 6)
HANDLE_VALUE(6, 7)
HANDLE_VALUE(7, 8)
HANDLE_VALUE(8, 9)
FINAL_PARSER(9)
header_type reply_read_hit_info_md_t {
fields {
ipv4_srcAddr: 32;
ipv4_dstAddr: 32;
}
}
metadata reply_read_hit_info_md_t reply_read_hit_info_md;
action reply_read_hit_before_act() {
modify_field (reply_read_hit_info_md.ipv4_srcAddr, ipv4.srcAddr);
modify_field (reply_read_hit_info_md.ipv4_dstAddr, ipv4.dstAddr);
}
table reply_read_hit_before {
actions {
reply_read_hit_before_act;
}
}
action reply_read_hit_after_act() {
modify_field (ipv4.srcAddr, reply_read_hit_info_md.ipv4_dstAddr);
modify_field (ipv4.dstAddr, reply_read_hit_info_md.ipv4_srcAddr);
modify_field (nc_hdr.op, NC_READ_REPLY);
}
table reply_read_hit_after {
actions {
reply_read_hit_after_act;
}
}
control process_value {
if (nc_hdr.op == NC_READ_REQUEST and nc_cache_md.cache_valid == 1) {
apply (reply_read_hit_before);
}
process_value_1();
process_value_2();
process_value_3();
process_value_4();
process_value_5();
process_value_6();
process_value_7();
process_value_8();
if (nc_hdr.op == NC_READ_REQUEST and nc_cache_md.cache_valid == 1) {
apply (reply_read_hit_after);
}
}
NC_READ_REQUEST = 0
NC_READ_REPLY = 1
NC_HOT_READ_REQUEST = 2
NC_WRITE_REQUEST = 4
NC_WRITE_REPLY = 5
NC_UPDATE_REQUEST = 8
NC_UPDATE_REPLY = 9
import socket
import struct
import time
import thread
from nc_config import *
NC_PORT = 8888
CLIENT_IP = "10.0.0.1"
SERVER_IP = "10.0.0.2"
CONTROLLER_IP = "10.0.0.3"
path_kv = "kv.txt"
path_log = "server_log.txt"
len_key = 16
len_val = 128
f = open(path_kv, "r")
lines = f.readlines()
f.close()
kv = {}
for i in range(2, 3002, 3):
line = lines[i].split();
key_header = line[0]
key_body = line[1:]
val = lines[i + 1].split()
key_header = int(key_header)
for i in range(len(key_body)):
key_body[i] = int(key_body[i], 16)
for i in range(len(val)):
val[i] = int(val[i], 16)
key_field = ""
key_field += struct.pack(">I", key_header)
for i in range(len(key_body)):
key_field += struct.pack("B", key_body[i])
val_field = ""
for i in range(len(val)):
val_field += struct.pack("B", val[i])
kv[key_header] = (key_field, val_field)
f.close()
counter = 0
def counting():
last_counter = 0
while True:
print (counter - last_counter), counter
last_counter = counter
time.sleep(1)
thread.start_new_thread(counting, ())
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((SERVER_IP, NC_PORT))
#f = open(path_log, "w")
while True:
packet, addr = s.recvfrom(2048)
op_field = packet[0]
key_field = packet[1:]
op = struct.unpack("B", op_field)[0]
key_header = struct.unpack(">I", key_field[:4])[0]
if (op == NC_READ_REQUEST or op == NC_HOT_READ_REQUEST):
op = NC_READ_REPLY
op_field = struct.pack("B", op)
key_field, val_field = kv[key_header]
packet = op_field + key_field + val_field
s.sendto(packet, (CLIENT_IP, NC_PORT))
counter = counter + 1
elif (op == NC_UPDATE_REQUEST):
op = NC_UPDATE_REPLY
op_field = struct.pack("B", op)
key_field, val_field = kv[key_header]
packet = op_field + key_field + val_field
s.sendto(packet, (CONTROLLER_IP, NC_PORT))
#f.write(str(op) + ' ')
#f.write(str(key_header) + '\n')
#f.flush()
#print counter
#f.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment