Commit 3a6cef8c authored by Shah Rinku's avatar Shah Rinku

Working solution for dispatcher NIC offload

parent 2867ff9b
......@@ -7,3 +7,29 @@ secrets.json
resource_system/bin/**
resource_system/version.linux
local_experiments/
.vscode
p4src/Makefile-nfp4build
p4src/app_master.list/
p4src/blm0.list/
p4src/echo.nffw
p4src/echo.yml
p4src/flowcache_timeout_emu0.list/
p4src/gro0.list/
p4src/gro1.list/
p4src/nbi_init_csr.list/
p4src/nfd_pcie0_notify.list/
p4src/nfd_pcie0_pci_in_gather.list/
p4src/nfd_pcie0_pci_in_issue0.list/
p4src/nfd_pcie0_pci_in_issue1.list/
p4src/nfd_pcie0_pci_out_me0.list/
p4src/nfd_pcie0_pd0.list/
p4src/nfd_pcie0_pd1.list/
p4src/nfd_pcie0_sb.list/
p4src/nfd_svc.list/
p4src/out/
p4src/pif_app_nfd.list/
client/Makefile-nfp4build
*.list
p4src/out_dir
*.nffw
import socket
import struct
import time
import threading
import random
import time
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
SERVER_IP = "192.168.2.3"
egress_time = []
ingress_time = []
stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 8080))
print "listening to {} at port {}".format(CLIENT_IP, 8080)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
# print packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
t = int(time.time() * 1000) % 1000000000
data = int(data) - t
print "rec", chain_id, exec_id, data, function_id, function_count,
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
print exec_id
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
dataInt = int(time.time() * 1000) % 1000000000
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id_packed = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id_packed + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, function_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
packet, function_id = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
# op = struct.unpack("B", packet[0])
packet, _ = genPacket()
if args.n is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
elif args.rps is not None:
runtime = 10
thread_count = args.c
start_time = time.time()
sleep_time = thread_count / float(args.rps)
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(1)
send()
r.join()
......@@ -13,6 +13,7 @@
"body-parser": "^1.19.0",
"express": "^4.17.1",
"express-fileupload": "^1.1.6",
"jspack": "^0.0.4",
"kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
......
This diff is collapsed.
......@@ -66,7 +66,7 @@ app.use(fileUpload())
app.use('/serverless/chain', chainHandler.router); // chain router (explicit_chain_handler.js) for handling explicit chains
let requestQueue = []
const WINDOW_SIZE = 10
const WINDOW_SIZE = 1
const port = constants.master_port
const registry_url = constants.registry_url
......@@ -220,6 +220,7 @@ app.post('/serverless/execute/:id', (req, res) => {
res.timestamp = Date.now()
if (functionToResource.has(id)) {
res.start = 'warmstart'
res.dispatch_time = Date.now()
libSupport.reverseProxy(req, res)
} else {
res.start = 'coldstart'
......
......@@ -77,6 +77,7 @@ function generateExecutor(functionPath, functionHash) {
* @param {JSON} res Object to use to return the response to the user
*/
async function reverseProxy(req, res) {
res.reverse_ingress = Date.now()
if (req.headers['x-chain-type'] !== 'explicit' && req.body.type === "tcp")
branchChainPredictor(req)
let runtime = req.body.runtime
......@@ -93,8 +94,10 @@ async function reverseProxy(req, res) {
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
forwardTo.open_request_count += 1
heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
// forwardTo.open_request_count += 1
// TODO: stopping loadbalancer
// heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
res.lookup_time = Date.now()
var options = {
method: 'POST',
uri: url,
......@@ -153,6 +156,7 @@ async function reverseProxy(req, res) {
let payload = req.body
payload.request_id = request_id
let data = payload.data
res.data_set_time = Date.now()
let packet = packPacket({
chain_id: 0,
exec_id: request_id,
......@@ -160,8 +164,10 @@ async function reverseProxy(req, res) {
data,
function_count: 1
})
res.pack_time = Date.now()
udpProxy.send(packet, 0, packet.length, resource.port, resource.node_id, function (err, bytes) {
// logger.info(`forwarded request via UDP, IP 192.168.2.5 Port ${resource.port}`)
res.send_time = Date.now()
})
}
}
......@@ -415,6 +421,23 @@ udpProxy.on('message', (msg, rinfo) => {
let result = unpackPacket(msg)
let res = requestFlightQueue.get(result.exec_id)
res.json(result)
console.log("resource_lookup",
res.dispatch_time - res.timestamp,
"reverse_proxy_call",
res.reverse_ingress - res.dispatch_time,
"metadata_lookup",
res.lookup_time - res.reverse_ingress,
"data_set_time",
res.data_set_time - res.lookup_time,
"pack_time",
res.pack_time - res.data_set_time,
"network_send",
res.send_time - res.pack_time,
"total_dispatch_delay",
res.send_time - res.timestamp,
"E2E time:",
Date.now() - res.timestamp
)
});
udpProxy.on('listening', () => {
......@@ -459,6 +482,7 @@ function packPacket(dataPacket) {
let message = new Array(1024)
let base = 0, chain_id, exec_id, function_id, data, function_count
let f0, f1, f2, f3, f4, t1, t2, t3, t4
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
......@@ -468,6 +492,27 @@ function packPacket(dataPacket) {
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
base += 1
f0 = struct.PackTo("B", message, base, [0])
base += 1
f1 = struct.PackTo("B", message, base, [12])
base += 1
f2 = struct.PackTo("B", message, base, [0])
base += 1
f3 = struct.PackTo("B", message, base, [34])
base += 1
f4 = struct.PackTo("B", message, base, [0])
base += 1
t1 = struct.PackTo(">I", message, base, [Date.now()])
base += 4
t2 = struct.PackTo("I", message, base, [1234])
base += 4
t3 = struct.PackTo("I", message, base, [0])
base += 8
t4 = struct.PackTo("I", message, base, [0])
message = Buffer.from(message)
return message
}
......
#
# Generated Makefile for echo
# Generated Makefile for orchestrator
#
ifndef SDKDIR
......@@ -122,7 +122,7 @@ ifneq ($(NFAS_FOUND),found)
$(warning warning: nfas not found or not executable, on windows please run nfp4term.bat)
endif
$(OUTDIR)/echo.nffw: $(OUTDIR)/nfd_pcie0_pd0.list/nfd_pcie0_pd0.list \
$(OUTDIR)/orchestrator.nffw: $(OUTDIR)/nfd_pcie0_pd0.list/nfd_pcie0_pd0.list \
$(OUTDIR)/nfd_pcie0_pci_in_issue1.list/nfd_pcie0_pci_in_issue1.list \
$(OUTDIR)/nfd_pcie0_pci_out_me0.list/nfd_pcie0_pci_out_me0.list \
$(OUTDIR)/nbi_init_csr.list/nbi_init_csr.list \
......@@ -186,17 +186,17 @@ $(PIFOUTDIR)/build_info.json: $(MAKEFILE_LIST)
# Generate IR from P4
#
$(OUTDIR)/echo.yml: p4src/echo.p4 \
$(OUTDIR)/orchestrator.yml: p4src/orchestrator.p4 \
$(MAKEFILE_LIST)
@echo ---------
@echo compiling p4 $@
@echo ---------
@mkdir -p $(PIFOUTDIR)
$(SDKP4DIR)/bin/nfp4c -o $(OUTDIR)/echo.yml \
$(SDKP4DIR)/bin/nfp4c -o $(OUTDIR)/orchestrator.yml \
--p4-version 16 \
--p4-compiler p4c-nfp \
--source_info \
p4src/echo.p4
p4src/orchestrator.p4
#
......@@ -229,15 +229,16 @@ $(PIFOUTDIR)/pif_pkt_clone%h \
$(PIFOUTDIR)/pif_flcalc%c \
$(PIFOUTDIR)/pif_flcalc%h \
$(PIFOUTDIR)/pif_field_lists%h \
$(PIFOUTDIR)/pif_parrep_pvs_sync%c : $(OUTDIR)/echo%yml $(MAKEFILE_LIST)
$(PIFOUTDIR)/pif_parrep_pvs_sync%c : $(OUTDIR)/orchestrator%yml $(MAKEFILE_LIST)
@echo ---------
@echo generating pif $@
@echo ---------
@mkdir -p $(PIFOUTDIR)
$(SDKP4DIR)/bin/nfirc -o $(PIFOUTDIR)/ \
--p4info $(OUTDIR)/echo.p4info.json \
--p4info $(OUTDIR)/orchestrator.p4info.json \
--debugpoints \
$(OUTDIR)/echo.yml
--mac_ingress_timestamp \
$(OUTDIR)/orchestrator.yml
#
......
#!/bin/bash -x
sudo ifconfig vf0_0 down
sudo ifconfig vf0_0 hw ether 00:11:11:11:11:11
sudo ifconfig vf0_1 down
sudo ifconfig vf0_1 hw ether 00:22:22:22:22:22
sudo ifconfig vf0_0 192.168.2.2/24 up
sudo ifconfig vf0_1 192.168.2.3/24 up
......@@ -7,17 +11,15 @@ sudo ifconfig vf0_1 192.168.2.3/24 up
echo "y" | docker system prune
docker network create -d macvlan --subnet=192.168.2.0/24 --aux-address="vf0_0=192.168.2.2" --aux-address="vf0_1=192.168.2.3" -o parent=vf0_1 pub_net
# move vf0_0 into its own namespace
# sudo ip netns exec ns_server ip link set vf0_0 netns 1
# sudo ip netns delete ns_server
# sudo ip netns add ns_server
move vf0_0 into its own namespace
sudo ip netns exec ns_server ip link set vf0_0 netns 1
sudo ip netns delete ns_server
sudo ip netns add ns_server
# sudo ip link set vf0_0 netns ns_server
# sudo ip netns exec ns_server ip addr add dev vf0_0 192.168.2.2/24
# sudo ip netns exec ns_server ip link set dev vf0_0 up
sudo ip link set vf0_0 netns ns_server
sudo ip netns exec ns_server ip addr add dev vf0_0 192.168.2.2/24
sudo ip netns exec ns_server ip link set dev vf0_0 up
# sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_0
# sudo ip netns exec ns_server arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_0
sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_0
# sudo arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_1
# sudo arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_1
\ No newline at end of file
sudo arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_1
sudo ip netns delete ns_server
sudo ip netns delete ns_client
sudo ip netns add ns_server
sudo ip netns add ns_client
echo "namespace created"
sudo ifconfig vf0_0 down
sudo ifconfig vf0_0 hw ether 00:11:11:11:11:11
sudo ip link set vf0_0 up
sudo ip address add 192.168.2.2/24 dev vf0_0
sudo ethtool --offload vf0_0 rx off tx off sg off
sudo ethtool -K vf0_0 gso off
sudo ifconfig vf0_1 down
sudo ifconfig vf0_1 hw ether 00:22:22:22:22:22
sudo ip link set vf0_1 up
sudo ip address add 192.168.2.3/24 dev vf0_1
sudo ethtool --offload vf0_1 rx off tx off sg off
sudo ethtool -K vf0_1 gso off
echo "IPs assigned"
#sudo arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_0
#sudo arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_0
#sudo arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_1
#sudo arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_1
sudo ip link set vf0_0 netns ns_server
sudo ip netns exec ns_server ip addr add dev vf0_0 192.168.2.2/24
sudo ip netns exec ns_server ip link set dev vf0_0 up
sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_0
sudo ip netns exec ns_server arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_0
sudo ip link set vf0_1 netns ns_client
sudo ip netns exec ns_client ip addr add dev vf0_1 192.168.2.3/24
sudo ip netns exec ns_client ip link set dev vf0_1 up
sudo ip netns exec ns_client arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_1
sudo ip netns exec ns_client arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_1
sudo ip netns exec ns_client arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_1
\ No newline at end of file
......@@ -15,7 +15,7 @@ done
if [[ $compile_flag -eq 1 ]]
then
# compile the nfp code
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/echo.nffw -p ./p4src/out -4 ./p4src/echo.p4 -l lithium --nfp4c_p4_version 16 --nfp4c_p4_compiler p4c-nfp -c ./p4src/prime.c
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp -c ./p4src/prime.c
fi
if [[ $offload_flag -eq 1 ]]
......@@ -24,7 +24,7 @@ then
cd /opt/netronome/p4/bin/
# offload
sudo ./rtecli design-load -f $location/p4src/echo.nffw -c $location/p4src/echo.p4cfg -p $location/p4src/out/pif_design.json
sudo ./rtecli design-load -f $location/p4src/orchestrator.nffw -c $location/p4src/echo.p4cfg -p $location/p4src/out/pif_design.json
# returning back to base
cd $location
......
#! /bin/bash -ex
compile_flag=0
offload_flag=0
assign_ip_flag=0
location=$(pwd)
while getopts 'coi' flag; do
case "${flag}" in
c) compile_flag=1 ;;
o) offload_flag=1 ;;
i) assign_ip_flag=1 ;;
esac
done
if [[ $compile_flag -eq 1 ]]
then
# compile the nfp code
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/test.nffw -p ./p4src/out -4 ./p4src/test.p4 -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp -e
fi
if [[ $offload_flag -eq 1 ]]
then
# move to p4 bin
cd /opt/netronome/p4/bin/
# offload
sudo ./rtecli design-load -f $location/p4src/test.nffw -c $location/p4src/test.p4cfg -p $location/p4src/out/pif_design.json
# returning back to base
cd $location
fi
if [[ $assign_ip_flag -eq 1 ]]
then
#assigning IPs to network interfaces
sudo ./assign_ip_test.sh
fi
......@@ -643,35 +643,9 @@
"source_fragment" : "dispatch_act"
}
},
{
"name" : "act",
"id" : 4,
"runtime_data" : [],
"primitives" : [
{
"op" : "assign",
"parameters" : [
{
"type" : "field",
"value" : ["map_hdr", "data"]
},
{
"type" : "hexstr",
"value" : "0x00000064"
}
],
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 51,
"column" : 9,
"source_fragment" : "hdr.map_hdr.data = 32w100"
}
}
]
},
{
"name" : "fix_checksum",
"id" : 5,
"id" : 4,
"runtime_data" : [],
"primitives" : [
{
......@@ -714,32 +688,9 @@
},
"init_table" : "node_2",
"tables" : [
{
"name" : "tbl_act",
"id" : 0,
"key" : [],
"match_type" : "exact",
"type" : "simple",
"max_size" : 1024,
"with_counters" : false,
"support_timeout" : false,
"direct_meters" : null,
"action_ids" : [4],
"actions" : ["act"],
"base_default_next" : "dispatch",
"next_tables" : {
"act" : "dispatch"
},
"default_entry" : {
"action_id" : 4,
"action_const" : true,
"action_data" : [],
"action_entry_const" : true
}
},
{
"name" : "dispatch",
"id" : 1,
"id" : 0,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 40,
......@@ -775,7 +726,7 @@
},
{
"name" : "fwd",
"id" : 2,
"id" : 1,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 16,
......@@ -811,7 +762,7 @@
},
{
"name" : "fwd",
"id" : 3,
"id" : 2,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 16,
......@@ -888,7 +839,7 @@
}
}
},
"true_next" : "tbl_act",
"true_next" : "dispatch",
"false_next" : "fwd"
}
]
......@@ -906,7 +857,7 @@
"tables" : [
{
"name" : "tbl_fix_checksum",
"id" : 4,
"id" : 3,
"key" : [],
"match_type" : "exact",
"type" : "simple",
......@@ -914,14 +865,14 @@
"with_counters" : false,
"support_timeout" : false,
"direct_meters" : null,
"action_ids" : [5],
"action_ids" : [4],
"actions" : ["fix_checksum"],
"base_default_next" : null,
"next_tables" : {
"fix_checksum" : null
},
"default_entry" : {
"action_id" : 5,
"action_id" : 4,
"action_const" : true,
"action_data" : [],
"action_entry_const" : true
......
......@@ -227,12 +227,6 @@ egress::fix_checksum:
src_lineno: 75
type: action
ingress::act:
implementation: modify_field(map_hdr.data, 0x00000064);
src_filename: ''
src_lineno: 1
type: action
ingress::dispatch_act:
implementation: |-
modify_field(ipv4.dstAddr, dstAddr);
......@@ -290,17 +284,6 @@ ingress::fwd:
src_lineno: 16
type: table
ingress::tbl_act:
allowed_actions:
- ingress::act
default_entry:
action: ingress::act
const: true
max_entries: 1025
src_filename: ''
src_lineno: 1
type: table
##########################################
# Ingress conditionals sets #
......@@ -324,9 +307,8 @@ ingress_flow:
implementation: |-
digraph {
"_condition_0" -> "ingress::fwd" [condition = false]
"_condition_0" -> "ingress::tbl_act" [condition = true]
"_condition_0" -> "ingress::dispatch" [condition = true]
"ingress::fwd" -> "exit_control_flow" [action = always]
"ingress::tbl_act" -> "ingress::dispatch" [action = always]
"ingress::dispatch" -> "ingress::fwd" [action = always]
}
start_state: _condition_0
......@@ -379,7 +361,7 @@ layout:
##########################################
source_info:
date: 2021/03/04 18:32:12
date: 2021/03/19 14:57:28
output_file: p4src/echo.yml
p4_version: '16'
source_files:
......
#define REPLY_PORT 9000
#define DISPATCHER_PORT 8000
#define MDS_PORT 8000
#define NUM_CACHE 128
#define CLUSTER_COUNT 2
#define DEAD_EGRESS_PORT 9
#define CONTROLLER_IP 0x0a000002
#define SERVER_IP 0x0a000001
#define CONTROLLER_IP 0xc0a80203
#define SERVER_IP 0xc0a80202
#define NC_READ_REQUEST 0
#define NC_READ_REPLY 1
......@@ -16,8 +17,8 @@
#define PKT_INSTANCE_TYPE_NORMAL 0
#define PKT_INSTANCE_TYPE_INGRESS_CLONE 1
#define PKT_INSTANCE_TYPE_EGRESS_CLONE 2
#define PKT_INSTANCE_TYPE_COALESCED 3
#define PKT_INSTANCE_TYPE_INGRESS_RECIRC 4
#define PKT_INSTANCE_TYPE_EGRESS_CLONE 9
#define PKT_INSTANCE_TYPE_COALESCED 10
#define PKT_INSTANCE_TYPE_INGRESS_RECIRC 3
#define PKT_INSTANCE_TYPE_REPLICATION 5
#define PKT_INSTANCE_TYPE_RESUBMIT 6
\ No newline at end of file
#define PKT_INSTANCE_TYPE_RESUBMIT 4
\ No newline at end of file
struct resubmit_meta_t {
bit<8> current_state;
bit<32> data;
}
struct ingress_metadata_t {
bit<1> drop;
bit<9> egress_port;
bit<4> packet_type;
}
register< bit<8>>(16384) current_state;
register< bit<8>>(16384) dispatch_state;