Commit 23b21377 authored by Shah Rinku's avatar Shah Rinku

added partial support for routing at nic

parent bc8d9766
...@@ -13,6 +13,8 @@ const { createLogger, format, transports } = winston; ...@@ -13,6 +13,8 @@ const { createLogger, format, transports } = winston;
const heap = require('heap') const heap = require('heap')
const dgram = require('dgram'); const dgram = require('dgram');
const udpProxy = dgram.createSocket('udp4'); const udpProxy = dgram.createSocket('udp4');
let struct = require('jspack')
struct = struct.jspack
let db = sharedMeta.db, // queue holding request to be dispatched let db = sharedMeta.db, // queue holding request to be dispatched
...@@ -144,14 +146,21 @@ async function reverseProxy(req, res) { ...@@ -144,14 +146,21 @@ async function reverseProxy(req, res) {
logger.error("error" + err) logger.error("error" + err)
} }
} else if (req.body.type === "udp") { } else if (req.body.type === "udp") {
let request_id = makeid(4) let request_id = Math.floor(Math.random() * 1000)
req.body.request_id = request_id req.body.request_id = request_id
// res.request_id = request_id // res.request_id = request_id
requestFlightQueue.set(request_id, res) requestFlightQueue.set(request_id, res)
let payload = req.body let payload = req.body
payload.request_id = request_id payload.request_id = request_id
payload = JSON.stringify(payload) let data = payload.data
udpProxy.send(payload, 0, payload.length, resource.port, resource.node_id, function (err, bytes) { let packet = packPacket({
chain_id: 0,
exec_id: request_id,
function_id: 0,
data,
function_count: 1
})
udpProxy.send(packet, 0, packet.length, resource.port, resource.node_id, function (err, bytes) {
// logger.info(`forwarded request via UDP, IP 192.168.2.5 Port ${resource.port}`) // logger.info(`forwarded request via UDP, IP 192.168.2.5 Port ${resource.port}`)
}) })
} }
...@@ -403,8 +412,8 @@ udpProxy.on('error', (err) => { ...@@ -403,8 +412,8 @@ udpProxy.on('error', (err) => {
}); });
udpProxy.on('message', (msg, rinfo) => { udpProxy.on('message', (msg, rinfo) => {
let result = JSON.parse(msg) let result = unpackPacket(msg)
let res = requestFlightQueue.get(result.request_id) let res = requestFlightQueue.get(result.exec_id)
res.json(result) res.json(result)
}); });
...@@ -422,6 +431,48 @@ async function fetchData(url, data = null) { ...@@ -422,6 +431,48 @@ async function fetchData(url, data = null) {
return await res.json() return await res.json()
} }
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null, exec_id = null, function_count = null, function_id = null, data = null
let base = 0
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack(">I", packet, base)
base += 1
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0]
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
let base = 0, chain_id, exec_id, function_id, data, function_count
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
base += 1
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
udpProxy.bind(constants.master_port); // starting UDP server for offloaded endpoints udpProxy.bind(constants.master_port); // starting UDP server for offloaded endpoints
module.exports = { module.exports = {
......
#! /bin/bash -x #! /bin/bash -ex
compile_flag=0 compile_flag=0
offload_flag=0
assign_ip_flag=0
location=$(pwd) location=$(pwd)
while getopts 'c' flag; do while getopts 'coi' flag; do
case "${flag}" in case "${flag}" in
c) compile_flag=1 ;; c) compile_flag=1 ;;
o) offload_flag=1 ;;
i) assign_ip_flag=1 ;;
esac esac
done done
...@@ -13,11 +17,24 @@ then ...@@ -13,11 +17,24 @@ then
# compile the nfp code # compile the nfp code
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/echo.nffw -p ./p4src/out -4 ./p4src/echo.p4 -l lithium --nfp4c_p4_version 16 --nfp4c_p4_compiler p4c-nfp -c ./p4src/prime.c sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/echo.nffw -p ./p4src/out -4 ./p4src/echo.p4 -l lithium --nfp4c_p4_version 16 --nfp4c_p4_compiler p4c-nfp -c ./p4src/prime.c
fi fi
# move to p4 bin
cd /opt/netronome/p4/bin/
# #offload if [[ $offload_flag -eq 1 ]]
sudo ./rtecli design-load -f $location/p4src/echo.nffw -c $location/p4src/echo.p4cfg -p $location/p4src/out/pif_design.json then
# move to p4 bin
cd /opt/netronome/p4/bin/
# offload
sudo ./rtecli design-load -f $location/p4src/echo.nffw -c $location/p4src/echo.p4cfg -p $location/p4src/out/pif_design.json
# returning back to base
cd $location
fi
if [[ $assign_ip_flag -eq 1 ]]
then
#killing all running containers
docker stop $(docker ps -a -q) || true
#assigning IPs to network interfaces
sudo ./assign_ip.sh
fi
# returning back to base
cd $location
\ No newline at end of file
...@@ -8,21 +8,6 @@ ...@@ -8,21 +8,6 @@
extern void prime(); extern void prime();
control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_t standard_metadata) { control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_t standard_metadata) {
@name(".set_egress") action set_egress() {
prime();
bit<32> temp = hdr.ipv4.dstAddr;
hdr.ipv4.dstAddr = hdr.ipv4.srcAddr;
hdr.ipv4.srcAddr = temp;
hdr.udp.dstPort = 9000;
bit<48> tempEth = hdr.ethernet.dstAddr;
hdr.ethernet.dstAddr = hdr.ethernet.srcAddr;
hdr.ethernet.srcAddr = tempEth;
standard_metadata.egress_spec = standard_metadata.ingress_port;
hdr.ipv4.ttl = hdr.ipv4.ttl - 8w1;
}
@name(".fwd_act") action fwd_act(bit<16> port) { @name(".fwd_act") action fwd_act(bit<16> port) {
standard_metadata.egress_spec = port; standard_metadata.egress_spec = port;
...@@ -37,9 +22,20 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_ ...@@ -37,9 +22,20 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_
} }
} }
@name(".dispatch_act") action dispatch_act(bit<32> dstAddr, bit<16> dstPort) { @name(".dispatch_act") action dispatch_act(bit<32> dstAddr, bit<16> dstPort, bit<48> ethernetAddr , bit<16> egress_port) {
hdr.ipv4.dstAddr = dstAddr; hdr.ipv4.dstAddr = dstAddr;
hdr.udp.dstPort = dstPort; hdr.udp.dstPort = dstPort;
hdr.map_hdr.data = 99;
hdr.ethernet.dstAddr = ethernetAddr;
// standard_metadata.egress_port = egress_port;
// bit<48> tempEth = hdr.ethernet.dstAddr;
// hdr.ethernet.dstAddr = hdr.ethernet.srcAddr;
// hdr.ethernet.srcAddr = tempEth;
standard_metadata.egress_spec = standard_metadata.ingress_port;
// hdr.ipv4.ttl = hdr.ipv4.ttl - 8w1;
} }
@name(".dispatch") table dispatch { @name(".dispatch") table dispatch {
...@@ -47,22 +43,18 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_ ...@@ -47,22 +43,18 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_
dispatch_act; dispatch_act;
} }
key = { key = {
hdr.map_hdr.chain_id : exact; hdr.map_hdr.function_id : exact;
} }
} }
apply { apply {
if (hdr.ipv4.isValid() && hdr.udp.dstPort == MDS_PORT) { if (hdr.ipv4.isValid() && hdr.udp.dstPort == DISPATCHER_PORT) {
// hdr.map_hdr.data = 32w100; hdr.map_hdr.data = 32w100;
dispatch.apply();
set_egress(); // fwd.apply();
} else { } else {
if (hdr.ipv4.isValid() && hdr.udp.dstPort == 8080) {
dispatch.apply();
}
fwd.apply(); fwd.apply();
} }
//fwd.apply();
} }
} }
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
"matchFields": [ "matchFields": [
{ {
"id": 1, "id": 1,
"name": "map_hdr.chain_id", "name": "map_hdr.function_id",
"bitwidth": 32, "bitwidth": 32,
"matchType": "EXACT" "matchType": "EXACT"
} }
...@@ -63,13 +63,6 @@ ...@@ -63,13 +63,6 @@
"alias": "NoAction" "alias": "NoAction"
} }
}, },
{
"preamble": {
"id": 16816129,
"name": "set_egress",
"alias": "set_egress"
}
},
{ {
"preamble": { "preamble": {
"id": 16805069, "id": 16805069,
...@@ -100,6 +93,16 @@ ...@@ -100,6 +93,16 @@
"id": 2, "id": 2,
"name": "dstPort", "name": "dstPort",
"bitwidth": 16 "bitwidth": 16
},
{
"id": 3,
"name": "ethernetAddr",
"bitwidth": 48
},
{
"id": 4,
"name": "egress_port",
"bitwidth": 16
} }
] ]
}, },
......
...@@ -33,12 +33,6 @@ ing_metadata: ...@@ -33,12 +33,6 @@ ing_metadata:
- _padding: 2 - _padding: 2
type: metadata type: metadata
ingress::set_egress::scalars:
fields:
- temp: 32
- tempEth: 48
type: metadata
ipv4: ipv4:
calculated_fields: calculated_fields:
- condition: valid(ipv4) - condition: valid(ipv4)
...@@ -68,7 +62,7 @@ map_hdr: ...@@ -68,7 +62,7 @@ map_hdr:
fields: fields:
- chain_id: 32 - chain_id: 32
- exec_id: 32 - exec_id: 32
- function_id: 8 - function_id: 32
- data: 32 - data: 32
- function_count: 8 - function_count: 8
- f0: 8 - f0: 8
...@@ -214,7 +208,7 @@ parser: ...@@ -214,7 +208,7 @@ parser:
start -> exit [value="default", mask="none", order="1"] start -> exit [value="default", mask="none", order="1"]
parse_ipv4 -> parse_udp [value="0x00000511", mask="0x00000fff", order="0"] parse_ipv4 -> parse_udp [value="0x00000511", mask="0x00000fff", order="0"]
parse_ipv4 -> exit [value="default", mask="none", order="1"] parse_ipv4 -> exit [value="default", mask="none", order="1"]
parse_udp -> parse_map_hdr [value="0x22b9", mask="none", order="0"] parse_udp -> parse_map_hdr [value="0x1f40", mask="none", order="0"]
parse_udp -> parse_map_hdr [value="0x2328", mask="none", order="1"] parse_udp -> parse_map_hdr [value="0x2328", mask="none", order="1"]
parse_udp -> exit [value="default", mask="none", order="2"] parse_udp -> exit [value="default", mask="none", order="2"]
parse_map_hdr -> exit [value="default", mask="none", order="0"] parse_map_hdr -> exit [value="default", mask="none", order="0"]
...@@ -223,25 +217,6 @@ parser: ...@@ -223,25 +217,6 @@ parser:
type: parser type: parser
##########################################
# External functions #
##########################################
prime__0:
name: prime
type: external_action
##########################################
# Action Expressions #
##########################################
_expression_set_egress_0:
expression: ((((ipv4.ttl) + (0xff))) & (0xff))
format: bracketed_expr
type: expression
########################################## ##########################################
# Action sets # # Action sets #
########################################## ##########################################
...@@ -249,18 +224,29 @@ _expression_set_egress_0: ...@@ -249,18 +224,29 @@ _expression_set_egress_0:
egress::fix_checksum: egress::fix_checksum:
implementation: modify_field(udp.checksum, 0x0000); implementation: modify_field(udp.checksum, 0x0000);
src_filename: p4src/echo.p4 src_filename: p4src/echo.p4
src_lineno: 84 src_lineno: 76
type: action
ingress::act:
implementation: modify_field(map_hdr.data, 0x00000064);
src_filename: ''
src_lineno: 1
type: action type: action
ingress::dispatch_act: ingress::dispatch_act:
implementation: |- implementation: |-
modify_field(ipv4.dstAddr, dstAddr); modify_field(ipv4.dstAddr, dstAddr);
modify_field(udp.dstPort, dstPort); modify_field(udp.dstPort, dstPort);
modify_field(map_hdr.data, 0x00000063);
modify_field(ethernet.dstAddr, ethernetAddr);
modify_field(standard_metadata.egress_spec, standard_metadata.ingress_port);
parameter_list: parameter_list:
- dstAddr: 32 - dstAddr: 32
- dstPort: 16 - dstPort: 16
- ethernetAddr: 48
- egress_port: 16
src_filename: p4src/echo.p4 src_filename: p4src/echo.p4
src_lineno: 40 src_lineno: 25
type: action type: action
ingress::fwd_act: ingress::fwd_act:
...@@ -268,22 +254,6 @@ ingress::fwd_act: ...@@ -268,22 +254,6 @@ ingress::fwd_act:
parameter_list: parameter_list:
- port: 16 - port: 16
src_filename: p4src/echo.p4 src_filename: p4src/echo.p4
src_lineno: 27
type: action
ingress::set_egress:
implementation: |-
prime();
modify_field(ingress::set_egress::scalars.temp, ipv4.dstAddr);
modify_field(ipv4.dstAddr, ipv4.srcAddr);
modify_field(ipv4.srcAddr, ingress::set_egress::scalars.temp);
modify_field(udp.dstPort, 0x2328);
modify_field(ingress::set_egress::scalars.tempEth, ethernet.dstAddr);
modify_field(ethernet.dstAddr, ethernet.srcAddr);
modify_field(ethernet.srcAddr, ingress::set_egress::scalars.tempEth);
modify_field(standard_metadata.egress_spec, standard_metadata.ingress_port);
modify_field(ipv4.ttl, _expression_set_egress_0);
src_filename: p4src/echo.p4
src_lineno: 12 src_lineno: 12
type: action type: action
...@@ -307,10 +277,10 @@ ingress::dispatch: ...@@ -307,10 +277,10 @@ ingress::dispatch:
allowed_actions: allowed_actions:
- ingress::dispatch_act - ingress::dispatch_act
match_on: match_on:
map_hdr.chain_id: exact map_hdr.function_id: exact
max_entries: 1025 max_entries: 1025
src_filename: p4src/echo.p4 src_filename: p4src/echo.p4
src_lineno: 45 src_lineno: 41
type: table type: table
ingress::fwd: ingress::fwd:
...@@ -320,14 +290,14 @@ ingress::fwd: ...@@ -320,14 +290,14 @@ ingress::fwd:
standard_metadata.ingress_port: exact standard_metadata.ingress_port: exact
max_entries: 1025 max_entries: 1025
src_filename: p4src/echo.p4 src_filename: p4src/echo.p4
src_lineno: 31 src_lineno: 16
type: table type: table
ingress::tbl_set_egress: ingress::tbl_act:
allowed_actions: allowed_actions:
- ingress::set_egress - ingress::act
default_entry: default_entry:
action: ingress::set_egress action: ingress::act
const: true const: true
max_entries: 1025 max_entries: 1025
src_filename: '' src_filename: ''
...@@ -340,17 +310,10 @@ ingress::tbl_set_egress: ...@@ -340,17 +310,10 @@ ingress::tbl_set_egress:
########################################## ##########################################
_condition_0: _condition_0:
condition: (((valid(ipv4))) and (((udp.dstPort) == (8889)))) condition: (((valid(ipv4))) and (((udp.dstPort) == (8000))))
format: bracketed_expr
src_filename: p4src/echo.p4
src_lineno: 55
type: conditional
_condition_1:
condition: (((valid(ipv4))) and (((udp.dstPort) == (8080))))
format: bracketed_expr format: bracketed_expr
src_filename: p4src/echo.p4 src_filename: p4src/echo.p4
src_lineno: 60 src_lineno: 51
type: conditional type: conditional
...@@ -363,13 +326,11 @@ ingress_flow: ...@@ -363,13 +326,11 @@ ingress_flow:
format: dot format: dot
implementation: |- implementation: |-
digraph { digraph {
"_condition_0" -> "_condition_1" [condition = false] "_condition_0" -> "ingress::fwd" [condition = false]
"_condition_0" -> "ingress::tbl_set_egress" [condition = true] "_condition_0" -> "ingress::tbl_act" [condition = true]
"_condition_1" -> "ingress::fwd" [condition = false]
"_condition_1" -> "ingress::dispatch" [condition = true]
"ingress::fwd" -> "exit_control_flow" [action = always] "ingress::fwd" -> "exit_control_flow" [action = always]
"ingress::dispatch" -> "ingress::fwd" [action = always] "ingress::tbl_act" -> "ingress::dispatch" [action = always]
"ingress::tbl_set_egress" -> "exit_control_flow" [action = always] "ingress::dispatch" -> "exit_control_flow" [action = always]
} }
start_state: _condition_0 start_state: _condition_0
type: control_flow type: control_flow
...@@ -421,7 +382,7 @@ layout: ...@@ -421,7 +382,7 @@ layout:
########################################## ##########################################
source_info: source_info:
date: 2021/02/22 11:12:59 date: 2021/03/01 20:59:20
output_file: p4src/echo.yml output_file: p4src/echo.yml
p4_version: '16' p4_version: '16'
source_files: source_files:
......
#define REPLY_PORT 9000 #define REPLY_PORT 9000
#define MDS_PORT 8889 #define DISPATCHER_PORT 8000
#define NUM_CACHE 128 #define NUM_CACHE 128
#define CLUSTER_COUNT 2 #define CLUSTER_COUNT 2
#define DEAD_EGRESS_PORT 9 #define DEAD_EGRESS_PORT 9
......
...@@ -43,7 +43,7 @@ header udp_t { ...@@ -43,7 +43,7 @@ header udp_t {
header map_hdr_t { header map_hdr_t {
bit<32> chain_id; bit<32> chain_id;
bit<32> exec_id; bit<32> exec_id;
bit<8> function_id; bit<32> function_id;
bit<32> data; bit<32> data;
bit<8> function_count; bit<8> function_count;
bit<8> f0; bit<8> f0;
......
...@@ -11,7 +11,7 @@ parser ParserImpl(packet_in packet, out headers hdr, inout metadata meta, inout ...@@ -11,7 +11,7 @@ parser ParserImpl(packet_in packet, out headers hdr, inout metadata meta, inout
@name(".parse_udp") state parse_udp { @name(".parse_udp") state parse_udp {
packet.extract<udp_t>(hdr.udp); packet.extract<udp_t>(hdr.udp);
transition select(hdr.udp.dstPort) { transition select(hdr.udp.dstPort) {
MDS_PORT: parse_map_hdr; DISPATCHER_PORT: parse_map_hdr;
REPLY_PORT: parse_map_hdr; REPLY_PORT: parse_map_hdr;
default: accept; default: accept;
} }
......
import socket
import struct
import time
import thread
import argparse
NC_PORT = 8000
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--client-ip', help='IP of client',
type=str, action="store", required=True)
args = parser.parse_args()
CLIENT_IP = args.client_ip
# CLIENT_IP = "192.168.0.105"
len_key = 16
counter = 0
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, NC_PORT))
while True:
packet, addr = s.recvfrom(1024)
# print packet
counter = counter + 1
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 1
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
print chain_id, exec_id, data, "function_id", function_id, function_count
data = []
for i in range(1, 6):
data.append(int(struct.unpack("B", packet[i+base])[0]))
print data
\ No newline at end of file
import socket
import struct
import time
import threading
import random
import time
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--client-port', help='Port of client',
type=int, action="store", required=True)
parser.add_argument('--send-data', help='Data to send',
type=int, action="store", required=False)
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--closed', help='Closed loop',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
# group.add_argument('--bandwidth', help='Bandwidth',
# type=int, action="store")
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--req-count', help='Number of requests to send',
type=int, action="store")
parser.add_argument('--offload', help='offload a portion of workloads',
type=float, action="store")
args = parser.parse_args()
print args.send_data
PORT = args.client_port
dataInt = args.send_data
fid = args.fid
SERVER_IP = "192.168.2.2"
egress_time = []
ingress_time = []
stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 7070))
print "listening to {} at port {}".format(CLIENT_IP, 9000)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
# print packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 1
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
print "rec", chain_id, exec_id, data, function_id, function_count
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 16384)
chain_id = 1
# data = 100
function_count = 5
function_id = fid if (fid) else 0
f0 = 0
f1 = 0
f2 = 2
f3 = 6
f4 = 2
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
offload_status = False
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
if args.offload is not None:
max_workload = 100
dataInt = random.randint(1, max_workload)
cutoff = max_workload * args.offload
if dataInt <= cutoff:
data = struct.pack(">I", dataInt * 256) # data
offload_status = True
else:
data = struct.pack(">I", dataInt) # data
else:
data = struct.pack(">I", dataInt) # data
# print "{0:b}".format(data)
function_count = struct.pack("B", function_count) # function count
function_id = struct.pack(">I", function_id) # function count
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id + data + function_count + \
f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, offload_status
def sendThread(start_time, runtime, sleep_time, s):
global ingress_time
while True:
packet, offload_status = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
# op = struct.unpack("B", packet[0])
packet, _ = genPacket()
if args.req_count is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
# print "%.20f" % time.time()
# time.sleep(2)
# break
elif args.offload is None and args.rps is not None:
runtime = 10
start_time = time.time()
sleep_time = 1 / float(args.rps)
print "calculated inter-arrival time", sleep_time
while True:
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
elif args.offload is not None:
runtime = 10
thread_count = 8
start_time = time.time()
sleep_time = 1 / float(args.rps) * thread_count
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time, s])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
# r.join()
if args.closed == 1:
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(1)
send()
python2 send.py --client-port 8000 --closed 1 --offload 0 --rps 1 --send-data 0 --closed 1 --fid 1
\ No newline at end of file
#! /bin/bash -ex
start_docker=0
copy_send=0
location=$(pwd)
while getopts 'sc' flag; do
case "${flag}" in
s) start_docker=1 ;;
c) copy_send=1 ;;
i) assign_ip_flag=1 ;;
esac
done
if [[ $start_docker -eq 1 ]]
then
# start python docker container
docker run --rm -dit --net pub_net --name send python
fi
if [[ $copy_send -eq 1 ]]
then
# start python docker container
docker cp send.py send:/
docker cp send.sh send:/
fi
docker exec -it send /bin/bash
\ No newline at end of file
...@@ -15,6 +15,24 @@ actions = '''{ "type" : "ingress::dispatch_act", ...@@ -15,6 +15,24 @@ actions = '''{ "type" : "ingress::dispatch_act",
"dstPort" : { "value" : "%d" } } }''' % ("192.168.2.2", 8080) "dstPort" : { "value" : "%d" } } }''' % ("192.168.2.2", 8080)
print actions print actions
# RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, actions) # RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, actions)
default_rule = False
ip = "192.168.2.4"
port = 7070
actions = '''{
"type" : "ingress::dispatch_act",
"data" : {
"dstAddr" : { "value" : "%s" },
"dstPort" : { "value" : "%d" } ,
"egress_port": { "value": "v0.1" },
"ethernetAddr": { "value": "02:42:c0:a8:02:04" }
}
}''' % (ip, \
int(port))
print actions
# % (data.param3)
match = '{ "map_hdr.function_id" : { "value" : %d} } ' % (1)
print match
RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, match, actions)
ruleList = RTEInterface.Tables.ListRules(tableId) ruleList = RTEInterface.Tables.ListRules(tableId)
print ruleList print ruleList
...@@ -28,7 +46,9 @@ for msg in consumer: ...@@ -28,7 +46,9 @@ for msg in consumer:
print worker, worker[u'node_id'] print worker, worker[u'node_id']
print str(worker[u'node_id']).strip() print str(worker[u'node_id']).strip()
print int(worker[u'portExternal']) print int(worker[u'portExternal'])
functionHash = worker[u'functionHash']
functionHash = int(functionHash[0:5], 16)
print functionHash
default_rule = False default_rule = False
actions = '''{ "type" : "ingress::dispatch_act", actions = '''{ "type" : "ingress::dispatch_act",
"data" : { "dstAddr" : { "value" : "%s" }, "data" : { "dstAddr" : { "value" : "%s" },
...@@ -36,10 +56,29 @@ for msg in consumer: ...@@ -36,10 +56,29 @@ for msg in consumer:
int(worker[u'portExternal'])) int(worker[u'portExternal']))
print actions print actions
# % (data.param3) # % (data.param3)
match = '{ "map_hdr.chain_id" : { "value" : "0"} } ' match = '{ "map_hdr.function_id" : { "value" : %d} } ' % (0)
print match
RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, match, actions) RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, match, actions)
ruleList = RTEInterface.Tables.ListRules(tableId) ruleList = RTEInterface.Tables.ListRules(tableId)
print ruleList print ruleList
elif msg.topic == "removeWorker": elif msg.topic == "removeWorker":
# msg = msg.value.decode('utf-8')
# # worker = str(msg.value)
# # worker = ast.literal_eval(json.dumps(worker))
# worker = json.loads(msg)
# print worker, worker[u'node_id']
# print str(worker[u'node_id']).strip()
# print int(worker[u'portExternal'])
# default_rule = False
# actions = '''{ "type" : "ingress::dispatch_act",
# "data" : { "dstAddr" : { "value" : "%s" },
# "dstPort" : { "value" : "%d" } } }''' % (str(worker[u'node_id']).strip(), \
# int(worker[u'portExternal']))
# print actions
# # % (data.param3)
# match = '{ "map_hdr.chain_id" : { "value" : "0"} } '
# RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, match, actions)
# ruleList = RTEInterface.Tables.ListRules(tableId)
print ruleList
...@@ -2,9 +2,6 @@ ...@@ -2,9 +2,6 @@
"folders": [ "folders": [
{ {
"path": "../.." "path": "../.."
},
{
"path": "../../../../test/offload"
} }
], ],
"settings": {} "settings": {}
......
const struct = require('./jspack/jspack') const struct = require('./jspack/jspack')
f0 = 0; f1 = 0; f2 = 2; f3 = 6; f4 = 2 function unpackPacket(packet) {
a = [] // let buffer = new Array(1024)
chain_id = struct.PackTo(">I", chain_id) let chain_id = null; exec_id = null, function_count = null, function_id = null
exec_id = struct.PackTo(">I", exec_id) let base = 0
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack("B", packet, base)
base += 1
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
dataInt = random.randint(1, 1000) console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
data = struct.PackTo(">I", dataInt) }
\ No newline at end of file
function_count = struct.PackTo("B", function_count)
function_id = struct.PackTo("B", function_id)
f0 = struct.PackTo("B", f0)
f1 = struct.PackTo("B", f1)
f2 = struct.PackTo("B", f2)
f3 = struct.PackTo("B", f3)
f4 = struct.PackTo("B", f4)
base = 0
// chain_id = struct.unpack(">I", )
// base += 4
// exec_id = struct.unpack(">I", packet[base: base + 4])[0]
// base += 4
// function_id = struct.unpack("B", packet[base])[0]
// base += 1
// data = struct.unpack(">I", packet[base: base + 4])[0]
base += 4
\ No newline at end of file
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
"express-fileupload": "^1.1.6", "express-fileupload": "^1.1.6",
"heap": "^0.2.6", "heap": "^0.2.6",
"isolated-vm": "^3.0.0", "isolated-vm": "^3.0.0",
"jspack": "^0.0.4",
"kafka-logger": "^7.1.0", "kafka-logger": "^7.1.0",
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
......
...@@ -9,6 +9,9 @@ const { spawnSync, execSync } = require('child_process'); ...@@ -9,6 +9,9 @@ const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib'); // const { logger } = require('../../lib');
const server = dgram.createSocket('udp4'); const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4'); const udpProxy = dgram.createSocket('udp4');
let struct = require('jspack')
struct = struct.jspack
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true
...@@ -125,14 +128,14 @@ server.on('error', (err) => { ...@@ -125,14 +128,14 @@ server.on('error', (err) => {
}); });
server.on('message', (msg, rinfo) => { server.on('message', (msg, rinfo) => {
console.log("message", msg) // console.log("message", msg)
let payload = {} let payload = unpackPacket(msg)
// console.log(payload, typeof payload); console.log(payload, typeof payload);
lastRequest = Date.now() lastRequest = Date.now()
totalRequest++ totalRequest++
executor(payload).then(result => { executor(payload).then(result => {
result = "" result = packPacket(payload)
console.log(result)
try { try {
udpProxy.send(result, 0, result.length, "8080", "192.168.2.2", function (err, bytes) { udpProxy.send(result, 0, result.length, "8080", "192.168.2.2", function (err, bytes) {
if (err) if (err)
...@@ -146,6 +149,50 @@ server.on('message', (msg, rinfo) => { ...@@ -146,6 +149,50 @@ server.on('message', (msg, rinfo) => {
}) })
}); });
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null, exec_id = null, function_count = null, function_id = null, data = null
let base = 0
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack(">I", packet, base)
base += 1
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0]
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
let base = 0, chain_id, exec_id, function_id, data, function_count
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
base += 1
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
server.on('listening', () => { server.on('listening', () => {
const address = server.address(); const address = server.address();
console.log(`server listening ${address.address}:${address.port}`); console.log(`server listening ${address.address}:${address.port}`);
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
"dependencies": { "dependencies": {
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"express": "^4.17.1", "express": "^4.17.1",
"jspack": "^0.0.4",
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"request": "^2.88.2" "request": "^2.88.2"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment