Commit 2867ff9b authored by Shah Rinku's avatar Shah Rinku

Minimal NIC dispatcher tested.

TODO: handle coldstart
parent 23b21377
......@@ -440,7 +440,7 @@ function unpackPacket(packet) {
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack(">I", packet, base)
base += 1
base += 4
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
......@@ -464,7 +464,7 @@ function packPacket(dataPacket) {
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
base += 1
base += 4
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
......
......@@ -359,7 +359,7 @@
"id" : 0,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 89,
"line" : 88,
"column" : 8,
"source_fragment" : "DeparserImpl"
},
......@@ -400,7 +400,7 @@
"id" : 0,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 100,
"line" : 99,
"column" : 8,
"source_fragment" : "verify_checksum( ..."
},
......@@ -458,7 +458,7 @@
"id" : 1,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 120,
"line" : 119,
"column" : 8,
"source_fragment" : "update_checksum( ..."
},
......@@ -634,63 +634,6 @@
"column" : 8,
"source_fragment" : "hdr.udp.dstPort = dstPort"
}
},
{
"op" : "assign",
"parameters" : [
{
"type" : "field",
"value" : ["map_hdr", "data"]
},
{
"type" : "hexstr",
"value" : "0x00000063"
}
],
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 28,
"column" : 8,
"source_fragment" : "hdr.map_hdr.data = 99"
}
},
{
"op" : "assign",
"parameters" : [
{
"type" : "field",
"value" : ["ethernet", "dstAddr"]
},
{
"type" : "runtime_data",
"value" : 2
}
],
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 29,
"column" : 8,
"source_fragment" : "hdr.ethernet.dstAddr = ethernetAddr"
}
},
{
"op" : "assign",
"parameters" : [
{
"type" : "field",
"value" : ["standard_metadata", "egress_spec"]
},
{
"type" : "field",
"value" : ["standard_metadata", "ingress_port"]
}
],
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 35,
"column" : 8,
"source_fragment" : "standard_metadata.egress_spec = standard_metadata.ingress_port"
}
}
],
"source_info" : {
......@@ -719,7 +662,7 @@
],
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 52,
"line" : 51,
"column" : 9,
"source_fragment" : "hdr.map_hdr.data = 32w100"
}
......@@ -745,7 +688,7 @@
],
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 77,
"line" : 76,
"column" : 8,
"source_fragment" : "hdr.udp.checksum = 16w0"
}
......@@ -753,7 +696,7 @@
],
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 76,
"line" : 75,
"column" : 33,
"source_fragment" : "fix_checksum"
}
......@@ -799,7 +742,7 @@
"id" : 1,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 41,
"line" : 40,
"column" : 29,
"source_fragment" : "dispatch"
},
......@@ -818,10 +761,10 @@
"direct_meters" : null,
"action_ids" : [3, 1],
"actions" : ["dispatch_act", "NoAction"],
"base_default_next" : null,
"base_default_next" : "fwd",
"next_tables" : {
"dispatch_act" : null,
"NoAction" : null
"dispatch_act" : "fwd",
"NoAction" : "fwd"
},
"default_entry" : {
"action_id" : 1,
......@@ -865,6 +808,42 @@
"action_data" : [],
"action_entry_const" : false
}
},
{
"name" : "fwd",
"id" : 3,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 16,
"column" : 24,
"source_fragment" : "fwd"
},
"key" : [
{
"match_type" : "exact",
"target" : ["standard_metadata", "ingress_port"],
"mask" : null
}
],
"match_type" : "exact",
"type" : "simple",
"max_size" : 1024,
"with_counters" : false,
"support_timeout" : false,
"direct_meters" : null,
"action_ids" : [2, 0],
"actions" : ["fwd_act", "NoAction"],
"base_default_next" : null,
"next_tables" : {
"fwd_act" : null,
"NoAction" : null
},
"default_entry" : {
"action_id" : 0,
"action_const" : false,
"action_data" : [],
"action_entry_const" : false
}
}
],
"action_profiles" : [],
......@@ -874,7 +853,7 @@
"id" : 0,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 51,
"line" : 50,
"column" : 12,
"source_fragment" : "hdr.ipv4.isValid() && hdr.udp.dstPort == 8000"
},
......@@ -919,7 +898,7 @@
"id" : 1,
"source_info" : {
"filename" : "p4src/echo.p4",
"line" : 61,
"line" : 60,
"column" : 8,
"source_fragment" : "egress"
},
......@@ -927,7 +906,7 @@
"tables" : [
{
"name" : "tbl_fix_checksum",
"id" : 3,
"id" : 4,
"key" : [],
"match_type" : "exact",
"type" : "simple",
......
......@@ -25,14 +25,13 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_
@name(".dispatch_act") action dispatch_act(bit<32> dstAddr, bit<16> dstPort, bit<48> ethernetAddr , bit<16> egress_port) {
hdr.ipv4.dstAddr = dstAddr;
hdr.udp.dstPort = dstPort;
hdr.map_hdr.data = 99;
hdr.ethernet.dstAddr = ethernetAddr;
// standard_metadata.egress_port = egress_port;
// hdr.ethernet.dstAddr = ethernetAddr;
// standard_metadata.egress_spec = egress_port;
// bit<48> tempEth = hdr.ethernet.dstAddr;
// hdr.ethernet.dstAddr = hdr.ethernet.srcAddr;
// hdr.ethernet.srcAddr = tempEth;
standard_metadata.egress_spec = standard_metadata.ingress_port;
// standard_metadata.egress_spec = standard_metadata.ingress_port;
// hdr.ipv4.ttl = hdr.ipv4.ttl - 8w1;
......@@ -51,7 +50,7 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_
if (hdr.ipv4.isValid() && hdr.udp.dstPort == DISPATCHER_PORT) {
hdr.map_hdr.data = 32w100;
dispatch.apply();
// fwd.apply();
fwd.apply();
} else {
fwd.apply();
}
......
......@@ -75,7 +75,19 @@
"ingress::dispatch": {
"rules": [
]
],
"default_rule": {
"action": {
"type" : "ingress::dispatch_act",
"data" : {
"dstAddr" : { "value" : "192.168.2.2" },
"dstPort" : { "value" : "8081" },
"egress_port" : { "value" : "v0.1" },
"ethernetAddr" : { "value" : "02:42:c0:a8:02:06" }
}
},
"name": "default"
}
}
},
"multicast": {},
......
......@@ -224,7 +224,7 @@ parser:
egress::fix_checksum:
implementation: modify_field(udp.checksum, 0x0000);
src_filename: p4src/echo.p4
src_lineno: 76
src_lineno: 75
type: action
ingress::act:
......@@ -237,9 +237,6 @@ ingress::dispatch_act:
implementation: |-
modify_field(ipv4.dstAddr, dstAddr);
modify_field(udp.dstPort, dstPort);
modify_field(map_hdr.data, 0x00000063);
modify_field(ethernet.dstAddr, ethernetAddr);
modify_field(standard_metadata.egress_spec, standard_metadata.ingress_port);
parameter_list:
- dstAddr: 32
- dstPort: 16
......@@ -280,7 +277,7 @@ ingress::dispatch:
map_hdr.function_id: exact
max_entries: 1025
src_filename: p4src/echo.p4
src_lineno: 41
src_lineno: 40
type: table
ingress::fwd:
......@@ -313,7 +310,7 @@ _condition_0:
condition: (((valid(ipv4))) and (((udp.dstPort) == (8000))))
format: bracketed_expr
src_filename: p4src/echo.p4
src_lineno: 51
src_lineno: 50
type: conditional
......@@ -330,7 +327,7 @@ ingress_flow:
"_condition_0" -> "ingress::tbl_act" [condition = true]
"ingress::fwd" -> "exit_control_flow" [action = always]
"ingress::tbl_act" -> "ingress::dispatch" [action = always]
"ingress::dispatch" -> "exit_control_flow" [action = always]
"ingress::dispatch" -> "ingress::fwd" [action = always]
}
start_state: _condition_0
type: control_flow
......@@ -382,7 +379,7 @@ layout:
##########################################
source_info:
date: 2021/03/01 20:59:20
date: 2021/03/04 18:32:12
output_file: p4src/echo.yml
p4_version: '16'
source_files:
......
......@@ -47,8 +47,8 @@ def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 7070))
print "listening to {} at port {}".format(CLIENT_IP, 9000)
s.bind((CLIENT_IP, 8080))
print "listening to {} at port {}".format(CLIENT_IP, 8080)
run_status = {}
while True:
......@@ -62,7 +62,7 @@ def receive():
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 1
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
......
python2 send.py --client-port 8000 --closed 1 --offload 0 --rps 1 --send-data 0 --closed 1 --fid 1
\ No newline at end of file
echo $1
python2 send.py --client-port 8000 --closed 1 --offload 0 --rps 1 --send-data 0 --closed 1 --fid $1
\ No newline at end of file
......@@ -8,77 +8,58 @@ consumer = KafkaConsumer('deployed', 'removeWorker',
RTEInterface.Connect('thrift', "10.129.2.201", 20206)
tableId = "ingress::dispatch"
rule_name = "dispatch_to_worker"
# default_rule = True
actions = '''{ "type" : "ingress::dispatch_act",
"data" : { "dstAddr" : { "value" : "%s" },
"dstPort" : { "value" : "%d" } } }''' % ("192.168.2.2", 8080)
print actions
# RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, actions)
default_rule = False
ip = "192.168.2.4"
port = 7070
actions = '''{
"type" : "ingress::dispatch_act",
"data" : {
"dstAddr" : { "value" : "%s" },
"dstPort" : { "value" : "%d" } ,
"egress_port": { "value": "v0.1" },
"ethernetAddr": { "value": "02:42:c0:a8:02:04" }
}
}''' % (ip, \
int(port))
print actions
# % (data.param3)
match = '{ "map_hdr.function_id" : { "value" : %d} } ' % (1)
print match
RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, match, actions)
ruleList = RTEInterface.Tables.ListRules(tableId)
print ruleList
ruleDictionary = {}
def makeRule(ip, port, mac, functionHash, tableId, rule_name, default_rule):
actions = '{ "type" : "ingress::dispatch_act", "data" : { "dstAddr" : { "value" : "%s" }, \
"dstPort" : { "value" : "%d" } , "egress_port": { "value": "v0.1" }, "ethernetAddr": { "value": "%s" } } }' \
% (ip, int(port), mac)
match = '{ "map_hdr.function_id" : { "value" : %d} } ' % (functionHash)
rule = {
"tableId": tableId,
"rule_name": rule_name,
"default_rule": default_rule,
"match": match,
"actions": actions
}
return rule
def addRule(worker):
functionHash = worker[u'functionHash']
rule_name = "dispatch_to_worker" + functionHash
functionHash = int(functionHash[0:5], 16)
ip = str(worker[u'node_id']).strip()
port = int(worker[u'portExternal'])
mac = str(worker[u'mac']).strip()
default_rule = False
rule = makeRule(ip, port, mac, functionHash, tableId, rule_name, default_rule)
ruleDictionary[functionHash] = rule
print ruleDictionary
RTEInterface.Tables.AddRule(
rule["tableId"], rule["rule_name"], rule["default_rule"], rule["match"], rule["actions"])
ruleList = RTEInterface.Tables.ListRules(tableId)
print ruleList
return 0
def deleteRule(worker):
functionHash = worker[u'functionHash']
functionHash = int(functionHash[0:5], 16)
rule = ruleDictionary[functionHash]
RTEInterface.Tables.DeleteRule(
rule["tableId"], rule["rule_name"], rule["default_rule"], rule["match"], rule["actions"])
del ruleDictionary[functionHash]
ruleList = RTEInterface.Tables.ListRules(tableId)
print ruleList
return 0
for msg in consumer:
print msg, msg.topic
if msg.topic == "deployed":
msg = msg.value.decode('utf-8')
# worker = str(msg.value)
# worker = ast.literal_eval(json.dumps(worker))
worker = json.loads(msg)
print worker, worker[u'node_id']
print str(worker[u'node_id']).strip()
print int(worker[u'portExternal'])
functionHash = worker[u'functionHash']
functionHash = int(functionHash[0:5], 16)
print functionHash
default_rule = False
actions = '''{ "type" : "ingress::dispatch_act",
"data" : { "dstAddr" : { "value" : "%s" },
"dstPort" : { "value" : "%d" } } }''' % (str(worker[u'node_id']).strip(), \
int(worker[u'portExternal']))
print actions
# % (data.param3)
match = '{ "map_hdr.function_id" : { "value" : %d} } ' % (0)
print match
RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, match, actions)
ruleList = RTEInterface.Tables.ListRules(tableId)
print ruleList
addRule(worker)
elif msg.topic == "removeWorker":
# msg = msg.value.decode('utf-8')
# # worker = str(msg.value)
# # worker = ast.literal_eval(json.dumps(worker))
# worker = json.loads(msg)
# print worker, worker[u'node_id']
# print str(worker[u'node_id']).strip()
# print int(worker[u'portExternal'])
# default_rule = False
# actions = '''{ "type" : "ingress::dispatch_act",
# "data" : { "dstAddr" : { "value" : "%s" },
# "dstPort" : { "value" : "%d" } } }''' % (str(worker[u'node_id']).strip(), \
# int(worker[u'portExternal']))
# print actions
# # % (data.param3)
# match = '{ "map_hdr.chain_id" : { "value" : "0"} } '
# RTEInterface.Tables.AddRule(tableId, rule_name, default_rule, match, actions)
# ruleList = RTEInterface.Tables.ListRules(tableId)
print ruleList
msg = msg.value.decode('utf-8')
worker = json.loads(msg)
deleteRule(worker)
......@@ -9,12 +9,13 @@ const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const os = require('os')
let struct = require('jspack')
struct = struct.jspack
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true
let port = 5000, resource_id, functionHash, runtime, idleTime = 30, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
......@@ -76,12 +77,20 @@ async function executor(payload) {
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id = execSync('ip a | grep -oE "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep 192.168 | head -n 1').toString()
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
......@@ -90,7 +99,7 @@ app.listen(port, () => {
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid}),
runtime, resource_id, entity_id: process.pid, mac: mac_address}),
"status": true
}], () => { })
waitTime = Date.now()
......@@ -137,7 +146,7 @@ server.on('message', (msg, rinfo) => {
result = packPacket(payload)
console.log(result)
try {
udpProxy.send(result, 0, result.length, "8080", "192.168.2.2", function (err, bytes) {
udpProxy.send(result, 0, result.length, "8080", rinfo.address, function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
......@@ -158,7 +167,7 @@ function unpackPacket(packet) {
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack(">I", packet, base)
base += 1
base += 4
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
......@@ -184,7 +193,7 @@ function packPacket(dataPacket) {
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
base += 1
base += 4
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
......@@ -198,6 +207,7 @@ server.on('listening', () => {
console.log(`server listening ${address.address}:${address.port}`);
});
// server.bind(port, "192.168.2.3");
server.bind(port);
setInterval(shouldDie, 1000);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment