Commit 654c03a8 authored by kedar's avatar kedar

resolved multiple worker daemon issue

parent c1ccb55a
...@@ -9,6 +9,7 @@ resource_system/version.linux ...@@ -9,6 +9,7 @@ resource_system/version.linux
local_experiments/ local_experiments/
.vscode .vscode
p4src/Makefile-nfp4build p4src/Makefile-nfp4build
p4src/app_master.list/ p4src/app_master.list/
p4src/blm0.list/ p4src/blm0.list/
...@@ -33,3 +34,4 @@ client/Makefile-nfp4build ...@@ -33,3 +34,4 @@ client/Makefile-nfp4build
*.list *.list
p4src/out_dir p4src/out_dir
*.nffw *.nffw
*.csv
\ No newline at end of file
{ {
"registry_url": "10.129.2.201:5000/", "registry_url": "10.129.2.182:5000/",
"master_port": 8080, "master_port": 8082,
"master_address": "10.129.2.201", "master_address": "10.129.2.182",
"daemon_port": 9000, "daemon_port": 9000,
"daemon_mac": "00:22:22:22:22:22", "daemon_mac": "00:22:22:22:22:22",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt", "grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.2.201:5984", "couchdb_host": "10.129.2.182:5984",
"env": "env_udp2.js", "env": "env_udp.js",
"db": { "db": {
"function_meta": "serverless", "function_meta": "serverless",
"metrics": "metrics", "metrics": "metrics",
...@@ -17,10 +17,10 @@ ...@@ -17,10 +17,10 @@
"network_bridge": "xanadu_kafka-serverless", "network_bridge": "xanadu_kafka-serverless",
"use_bridge": false, "use_bridge": false,
"internal": { "internal": {
"kafka_host": "10.129.2.201:9092" "kafka_host": "10.129.2.182:9092"
}, },
"external": { "external": {
"kafka_host": "10.129.2.201:9092" "kafka_host": "10.129.2.182:9092"
} }
}, },
"topics": { "topics": {
......
{"id":"192.168.2.3","master_node":"192.168.2.3"} {"id":"1000","master_node":"10.129.2.182"}
\ No newline at end of file \ No newline at end of file
...@@ -85,6 +85,7 @@ function runContainer(metadata) { ...@@ -85,6 +85,7 @@ function runContainer(metadata) {
const process_checkImage = spawn('docker', ["inspect", registry_url + imageName]) const process_checkImage = spawn('docker', ["inspect", registry_url + imageName])
process_checkImage.on('close', (code) => { process_checkImage.on('close', (code) => {
console.log("process_checkImage code : ", code)
if (code != 0) { if (code != 0) {
const process_pullImage = spawn('docker', ["pull", registry_url + imageName]); const process_pullImage = spawn('docker', ["pull", registry_url + imageName]);
...@@ -94,6 +95,7 @@ function runContainer(metadata) { ...@@ -94,6 +95,7 @@ function runContainer(metadata) {
}); });
process_pullImage.on('close', (code) => { process_pullImage.on('close', (code) => {
console.log("process_pullImage code : ", code)
if (code != 0) if (code != 0)
reject("error") reject("error")
else { else {
...@@ -112,11 +114,11 @@ function runContainer(metadata) { ...@@ -112,11 +114,11 @@ function runContainer(metadata) {
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
logger.info(`stdout: ${data}`); logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference); logger.info("process container run time taken: ", timeDifference);
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id]) let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
// let add_network = spawn('docker', ['network', 'connect', 'macvlantest', resource_id]) // let add_network = spawn('docker', ['network', 'connect', 'macvlantest', resource_id])
let _ = spawn('docker', ['start', resource_id]) let _ = spawn('docker', ['start', resource_id, '-i'])
_.on('data', (data) => { _.on('data', (data) => {
console.log("container started", data); console.log("container started", data);
...@@ -164,41 +166,41 @@ function runContainer(metadata) { ...@@ -164,41 +166,41 @@ function runContainer(metadata) {
var container_id var container_id
process.stdout.on('data', (data) => { process.stdout.on('data', (data) => {
//container_id = data.toString //container_id = data.toString
logger.info(`stdout: ${data.toString()}`); logger.info(`process stdout: ${data.toString()}`);
console.log(data.toString()) console.log(data.toString())
let timeDifference = Math.ceil((Date.now() - timeStart)) let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference); logger.info("process container run time taken: ", timeDifference);
/** /**
* attach smartnic interface * attach smartnic interface
*/ */
// let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id]) // let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id]) let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id,'-i'])
let _ = spawn('docker', ['start', resource_id]) let _ = spawn('docker', ['start', resource_id])
_.stdout.on('data', (data) => { _.stdout.on('data', (data) => {
logger.info(data.toString()) logger.info("_ data : "+data.toString())
resolve(resource_id); resolve(resource_id);
}) })
_.stderr.on('data', (data) => { _.stderr.on('data', (data) => {
logger.info(data.toString()) logger.info("_ stderr : "+data.toString())
}) })
_.on('close', (data) => { _.on('close', (data) => {
logger.info("exit exit") logger.info(" _ : exit exit")
logger.info(data.toString()) logger.info(data.toString())
}) })
}); });
process.stderr.on('data', (data) => { process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`); logger.error(`process stderr: ${data}`);
reject(data); reject(data);
}); });
process.on('close', (code) => { process.on('close', (code) => {
logger.info("Exiting container"); logger.info("process close : Exiting container");
}) })
} }
......
...@@ -17,10 +17,10 @@ ...@@ -17,10 +17,10 @@
"kafka-node": "^5.0.0", "kafka-node": "^5.0.0",
"morgan": "^1.9.1", "morgan": "^1.9.1",
"mqtt": "^4.2.8", "mqtt": "^4.2.8",
"node-fetch": "^2.6.0", "node-fetch": "^2.6.7",
"redis": "^3.1.2", "redis": "^3.1.2",
"request": "^2.88.2", "request": "^2.88.2",
"usage": "^0.7.1", "usage": "^0.7.1",
"winston": "^3.2.1" "winston": "^3.7.2"
} }
} }
...@@ -108,7 +108,7 @@ app.post('/serverless/deploy', (req, res) => { ...@@ -108,7 +108,7 @@ app.post('/serverless/deploy', (req, res) => {
microcfile.mv(file_path_nicfunctions +req.files.nicfunction.name, function (err) { microcfile.mv(file_path_nicfunctions +req.files.nicfunction.name, function (err) {
functionHash = libSupport.generateExecutor(file_path, functionHash) functionHash = libSupport.generateExecutor(file_path, functionHash)
libSupport.generateMicrocExecutor( file_path_nicfunctions, req.files.nicfunction.name, functionHash ) // libSupport.generateMicrocExecutor( file_path_nicfunctions, req.files.nicfunction.name, functionHash )
/** /**
* Adding meta caching via couchdb * Adding meta caching via couchdb
* This will create / update function related metadata like resource limits etc * This will create / update function related metadata like resource limits etc
......
...@@ -497,14 +497,18 @@ udpProxy.on('message', async (msg, rinfo) => { ...@@ -497,14 +497,18 @@ udpProxy.on('message', async (msg, rinfo) => {
let result = unpackPacket(msg) let result = unpackPacket(msg)
console.log("udp received request !!", result) console.log("udp received request !!", result)
let funchash = idToFunchashmap.get(result.function_id) console.log("idtofucntion hash map : ", idToFunchashmap)
let func_id = "function_" + result.function_id
// let funchash = idToFunchashmap.get(func_id)
let funchash = func_id
try{ try{
const res2 = await axios({ const res2 = await axios({
method: 'post', method: 'post',
url: 'http://localhost:8080/serverless/execute/' + funchash, url: 'http://10.129.2.182:8082/serverless/execute/' + funchash,
headers: {}, headers: {},
data: { data: {
runtime: 'process' // This is the body part runtime: 'container' // This is the body part
}, },
}); });
......
#!/bin/bash -x #!/bin/bash -x
sudo ifconfig vf0_0 down # sudo ifconfig vf0_0 down
sudo ifconfig vf0_0 hw ether 00:11:11:11:11:11 # sudo ifconfig vf0_0 hw ether 00:11:11:11:11:11
sudo ifconfig vf0_1 down # sudo ifconfig vf0_1 down
sudo ifconfig vf0_1 hw ether 00:22:22:22:22:22 # sudo ifconfig vf0_1 hw ether 00:22:22:22:22:22
# sudo ifconfig vf0_2 down # sudo ifconfig vf0_2 down
# sudo ifconfig vf0_2 hw ether 00:33:33:33:33:33 # sudo ifconfig vf0_2 hw ether 00:33:33:33:33:33
sudo ifconfig vf0_0 192.168.2.2/24 up sudo ifconfig vf0_0 192.168.200.10/24 up
sudo ifconfig vf0_1 192.168.2.3/24 up sudo ifconfig vf0_1 192.168.210.10/24 up
# sudo ifconfig vf0_2 192.168.2.4/24 up # sudo ifconfig vf0_2 192.168.2.4/24 up
# create a MAC VLAN for docker attached to vf0_1 # create a MAC VLAN for docker attached to vf0_1
echo "y" | docker system prune # echo "y" | docker system prune
docker network create -d macvlan --subnet=192.168.2.0/24 --aux-address="vf0_0=192.168.2.2" --aux-address="vf0_1=192.168.2.3" --aux-address="vf0_2=192.168.2.4" -o parent=vf0_1 pub_net # docker network create -d macvlan --subnet=192.168.210.0/24 --aux-address="vf0_0=192.168.200.10" --aux-address="vf0_1=192.168.210.10" --aux-address="vf0_2=192.168.200.11" -o parent=vf0_1 pub_net
# move vf0_0 into its own namespace # move vf0_0 into its own namespace
# sudo ip netns exec ns_server ip link set vf0_0 netns 1 # sudo ip netns exec ns_server ip link set vf0_0 netns 1
...@@ -20,9 +20,9 @@ sudo ip netns delete ns_server ...@@ -20,9 +20,9 @@ sudo ip netns delete ns_server
sudo ip netns add ns_server sudo ip netns add ns_server
sudo ip link set vf0_0 netns ns_server sudo ip link set vf0_0 netns ns_server
sudo ip netns exec ns_server ip addr add dev vf0_0 192.168.2.2/24 sudo ip netns exec ns_server ip addr add dev vf0_0 192.168.200.10/24
sudo ip netns exec ns_server ip link set dev vf0_0 up sudo ip netns exec ns_server ip link set dev vf0_0 up
sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_0 sudo ip netns exec ns_server arp -s 192.168.210.10 00:15:4d:00:00:01 -i vf0_0
# sudo ip link set vf0_2 netns ns_server # sudo ip link set vf0_2 netns ns_server
# sudo ip netns exec ns_server ip addr add dev vf0_2 192.168.2.4/24 # sudo ip netns exec ns_server ip addr add dev vf0_2 192.168.2.4/24
...@@ -30,19 +30,21 @@ sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_0 ...@@ -30,19 +30,21 @@ sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_0
# sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_2 # sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_2
# sudo ip netns exec ns_server arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_2 # sudo ip netns exec ns_server arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_2
sudo arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_1 sudo arp -s 192.168.200.10 00:15:4d:00:00:00 -i vf0_1
# sudo arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_1 # sudo arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_1
# sudo arp -s 192.168.210.41 04:3f:72:da:6b:9c -i vf0_1
sudo ip netns exec ns_server ethtool --offload vf0_2 rx off tx off sudo ip netns exec ns_server ethtool --offload vf0_0 rx off tx off
# sudo ethtool --offload vf0_0 rx off tx off
sudo ethtool --offload vf0_1 rx off tx off sudo ethtool --offload vf0_1 rx off tx off
sudo ifconfig vf0_1 mtu 9000 # sudo ifconfig vf0_1 mtu 9000
sudo ip netns exec ns_server ifconfig vf0_2 mtu 9000 # sudo ip netns exec ns_server ifconfig vf0_2 mtu 9000
# sudo ifconfig vf0_0 mtu 9000
# ip link add vethd1 type veth peer name br-vethd1 # ip link add vethd1 type veth peer name br-vethd1
# ip link set vethd1 netns ns_server # ip link set vethd1 netns ns_server
# ip netns exec ns_server ip addr add 192.168.1.11/24 dev vethd1 # ip netns exec ns_server ip addr add 192.168.1.11/24 dev vethd1
# Create a bridge "bridgek0" # Create a bridge bridgek0
# sudo brctl addbr bridgek0 # sudo brctl addbr bridgek0
# sudo brctl addif bridgek0 br-vethd1 # sudo brctl addif bridgek0 br-vethd1
# sudo ip netns exec ns_server ip addr add 10.129.6.6/24 dev vethd1 # sudo ip netns exec ns_server ip addr add 10.129.6.6/24 dev vethd1
...@@ -53,10 +55,10 @@ sudo ip netns exec ns_server ifconfig vf0_2 mtu 9000 ...@@ -53,10 +55,10 @@ sudo ip netns exec ns_server ifconfig vf0_2 mtu 9000
# create veth cable for kafka # create veth cable for kafka
sudo ip link add veth_nnic0 type veth peer name veth_nnic1 # sudo ip link add veth_nnic0 type veth peer name veth_nnic1
sudo ip link set veth_nnic0 netns ns_server # sudo ip link set veth_nnic0 netns ns_server
sudo ip netns exec ns_server ip addr add 10.128.2.201/24 dev veth_nnic0 # sudo ip netns exec ns_server ip addr add 10.128.2.201/24 dev veth_nnic0
sudo ip netns exec ns_server ip link set dev veth_nnic0 up # sudo ip netns exec ns_server ip link set dev veth_nnic0 up
sudo ip addr add 10.128.2.200/24 dev veth_nnic1 # sudo ip addr add 10.128.2.200/24 dev veth_nnic1
sudo ip link set dev veth_nnic1 up # sudo ip link set dev veth_nnic1 up
...@@ -79,10 +79,10 @@ ...@@ -79,10 +79,10 @@
"action": { "action": {
"type" : "ingress::dispatch_act", "type" : "ingress::dispatch_act",
"data" : { "data" : {
"dstAddr" : { "value" : "192.168.2.3" }, "dstAddr" : { "value" : "192.168.210.10" },
"dstPort" : { "value" : "8080" }, "dstPort" : { "value" : "8082" },
"egress_port" : { "value" : "v0.1" }, "egress_port" : { "value" : "vf0.1" },
"ethernetAddr" : { "value" : "00:22:22:22:22:22" } "ethernetAddr" : { "value" : "00:15:4d:00:00:01" }
} }
}, },
"name": "default" "name": "default"
......
...@@ -402,7 +402,7 @@ ...@@ -402,7 +402,7 @@
"id" : 0, "id" : 0,
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 106, "line" : 107,
"column" : 8, "column" : 8,
"source_fragment" : "DeparserImpl" "source_fragment" : "DeparserImpl"
}, },
...@@ -479,7 +479,7 @@ ...@@ -479,7 +479,7 @@
"id" : 0, "id" : 0,
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 117, "line" : 118,
"column" : 8, "column" : 8,
"source_fragment" : "verify_checksum( ..." "source_fragment" : "verify_checksum( ..."
}, },
...@@ -537,7 +537,7 @@ ...@@ -537,7 +537,7 @@
"id" : 1, "id" : 1,
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 137, "line" : 138,
"column" : 8, "column" : 8,
"source_fragment" : "update_checksum( ..." "source_fragment" : "update_checksum( ..."
}, },
...@@ -732,6 +732,25 @@ ...@@ -732,6 +732,25 @@
"column" : 8, "column" : 8,
"source_fragment" : "hdr.ethernet.dstAddr = ethernetAddr" "source_fragment" : "hdr.ethernet.dstAddr = ethernetAddr"
} }
},
{
"op" : "assign",
"parameters" : [
{
"type" : "field",
"value" : ["standard_metadata", "egress_spec"]
},
{
"type" : "runtime_data",
"value" : 3
}
],
"source_info" : {
"filename" : "p4src/orchestrator.p4",
"line" : 35,
"column" : 8,
"source_fragment" : "standard_metadata.egress_spec = egress_port"
}
} }
], ],
"source_info" : { "source_info" : {
...@@ -764,7 +783,7 @@ ...@@ -764,7 +783,7 @@
], ],
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 72, "line" : 73,
"column" : 8, "column" : 8,
"source_fragment" : "fwd_checks.read(pc2,0)" "source_fragment" : "fwd_checks.read(pc2,0)"
} }
...@@ -806,7 +825,7 @@ ...@@ -806,7 +825,7 @@
], ],
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 73, "line" : 74,
"column" : 8, "column" : 8,
"source_fragment" : "pc2 = pc2 + 1" "source_fragment" : "pc2 = pc2 + 1"
} }
...@@ -829,7 +848,7 @@ ...@@ -829,7 +848,7 @@
], ],
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 74, "line" : 75,
"column" : 8, "column" : 8,
"source_fragment" : "fwd_checks.write(0,pc2)" "source_fragment" : "fwd_checks.write(0,pc2)"
} }
...@@ -855,7 +874,7 @@ ...@@ -855,7 +874,7 @@
], ],
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 94, "line" : 95,
"column" : 8, "column" : 8,
"source_fragment" : "hdr.udp.checksum = 16w0" "source_fragment" : "hdr.udp.checksum = 16w0"
} }
...@@ -863,7 +882,7 @@ ...@@ -863,7 +882,7 @@
], ],
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 93, "line" : 94,
"column" : 33, "column" : 33,
"source_fragment" : "fix_checksum" "source_fragment" : "fix_checksum"
} }
...@@ -886,7 +905,7 @@ ...@@ -886,7 +905,7 @@
"id" : 0, "id" : 0,
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 51, "line" : 52,
"column" : 29, "column" : 29,
"source_fragment" : "dispatch" "source_fragment" : "dispatch"
}, },
...@@ -905,49 +924,13 @@ ...@@ -905,49 +924,13 @@
"direct_meters" : null, "direct_meters" : null,
"action_ids" : [3, 1], "action_ids" : [3, 1],
"actions" : ["dispatch_act", "NoAction"], "actions" : ["dispatch_act", "NoAction"],
"base_default_next" : "fwd",
"next_tables" : {
"dispatch_act" : "fwd",
"NoAction" : "fwd"
},
"default_entry" : {
"action_id" : 1,
"action_const" : false,
"action_data" : [],
"action_entry_const" : false
}
},
{
"name" : "fwd",
"id" : 1,
"source_info" : {
"filename" : "p4src/orchestrator.p4",
"line" : 22,
"column" : 24,
"source_fragment" : "fwd"
},
"key" : [
{
"match_type" : "exact",
"target" : ["standard_metadata", "ingress_port"],
"mask" : null
}
],
"match_type" : "exact",
"type" : "simple",
"max_size" : 1024,
"with_counters" : false,
"support_timeout" : false,
"direct_meters" : null,
"action_ids" : [2, 0],
"actions" : ["fwd_act", "NoAction"],
"base_default_next" : "tbl_act", "base_default_next" : "tbl_act",
"next_tables" : { "next_tables" : {
"fwd_act" : "tbl_act", "dispatch_act" : "tbl_act",
"NoAction" : "tbl_act" "NoAction" : "tbl_act"
}, },
"default_entry" : { "default_entry" : {
"action_id" : 0, "action_id" : 1,
"action_const" : false, "action_const" : false,
"action_data" : [], "action_data" : [],
"action_entry_const" : false "action_entry_const" : false
...@@ -955,7 +938,7 @@ ...@@ -955,7 +938,7 @@
}, },
{ {
"name" : "fwd", "name" : "fwd",
"id" : 2, "id" : 1,
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 22, "line" : 22,
...@@ -991,7 +974,7 @@ ...@@ -991,7 +974,7 @@
}, },
{ {
"name" : "tbl_act", "name" : "tbl_act",
"id" : 3, "id" : 2,
"key" : [], "key" : [],
"match_type" : "exact", "match_type" : "exact",
"type" : "simple", "type" : "simple",
...@@ -1020,7 +1003,7 @@ ...@@ -1020,7 +1003,7 @@
"id" : 0, "id" : 0,
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 62, "line" : 63,
"column" : 12, "column" : 12,
"source_fragment" : "hdr.ipv4.isValid() && hdr.udp.dstPort == 8000" "source_fragment" : "hdr.ipv4.isValid() && hdr.udp.dstPort == 8000"
}, },
...@@ -1065,7 +1048,7 @@ ...@@ -1065,7 +1048,7 @@
"id" : 1, "id" : 1,
"source_info" : { "source_info" : {
"filename" : "p4src/orchestrator.p4", "filename" : "p4src/orchestrator.p4",
"line" : 78, "line" : 79,
"column" : 8, "column" : 8,
"source_fragment" : "egress" "source_fragment" : "egress"
}, },
...@@ -1073,7 +1056,7 @@ ...@@ -1073,7 +1056,7 @@
"tables" : [ "tables" : [
{ {
"name" : "tbl_fix_checksum", "name" : "tbl_fix_checksum",
"id" : 4, "id" : 3,
"key" : [], "key" : [],
"match_type" : "exact", "match_type" : "exact",
"type" : "simple", "type" : "simple",
......
...@@ -32,6 +32,7 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_ ...@@ -32,6 +32,7 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_
hdr.ipv4.dstAddr = dstAddr; hdr.ipv4.dstAddr = dstAddr;
hdr.udp.dstPort = dstPort; hdr.udp.dstPort = dstPort;
hdr.ethernet.dstAddr = ethernetAddr; hdr.ethernet.dstAddr = ethernetAddr;
standard_metadata.egress_spec = egress_port;
//prime(); //prime();
} }
@name(".prime1_act") action prime1_act(bit<32> dstAddr, bit<16> dstPort, bit<48> ethernetAddr , bit<16> egress_port) { @name(".prime1_act") action prime1_act(bit<32> dstAddr, bit<16> dstPort, bit<48> ethernetAddr , bit<16> egress_port) {
...@@ -65,7 +66,7 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_ ...@@ -65,7 +66,7 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_
//pc = hdr.map_hdr.function_id; //pc = hdr.map_hdr.function_id;
//function_id_check.write(0,pc); //function_id_check.write(0,pc);
dispatch.apply(); dispatch.apply();
fwd.apply(); //fwd.apply();
} else { } else {
fwd.apply(); fwd.apply();
} }
......
...@@ -269,7 +269,7 @@ _expression_act_0: ...@@ -269,7 +269,7 @@ _expression_act_0:
egress::fix_checksum: egress::fix_checksum:
implementation: modify_field(udp.checksum, 0x0000); implementation: modify_field(udp.checksum, 0x0000);
src_filename: p4src/orchestrator.p4 src_filename: p4src/orchestrator.p4
src_lineno: 93 src_lineno: 94
type: action type: action
ingress::act: ingress::act:
...@@ -286,6 +286,7 @@ ingress::dispatch_act: ...@@ -286,6 +286,7 @@ ingress::dispatch_act:
modify_field(ipv4.dstAddr, dstAddr); modify_field(ipv4.dstAddr, dstAddr);
modify_field(udp.dstPort, dstPort); modify_field(udp.dstPort, dstPort);
modify_field(ethernet.dstAddr, ethernetAddr); modify_field(ethernet.dstAddr, ethernetAddr);
modify_field(standard_metadata.egress_spec, egress_port);
parameter_list: parameter_list:
- dstAddr: 32 - dstAddr: 32
- dstPort: 16 - dstPort: 16
...@@ -326,7 +327,7 @@ ingress::dispatch: ...@@ -326,7 +327,7 @@ ingress::dispatch:
map_hdr.function_id: exact map_hdr.function_id: exact
max_entries: 1025 max_entries: 1025
src_filename: p4src/orchestrator.p4 src_filename: p4src/orchestrator.p4
src_lineno: 51 src_lineno: 52
type: table type: table
ingress::fwd: ingress::fwd:
...@@ -359,7 +360,7 @@ _condition_0: ...@@ -359,7 +360,7 @@ _condition_0:
condition: (((valid(ipv4))) and (((udp.dstPort) == (8000)))) condition: (((valid(ipv4))) and (((udp.dstPort) == (8000))))
format: bracketed_expr format: bracketed_expr
src_filename: p4src/orchestrator.p4 src_filename: p4src/orchestrator.p4
src_lineno: 62 src_lineno: 63
type: conditional type: conditional
...@@ -376,7 +377,7 @@ ingress_flow: ...@@ -376,7 +377,7 @@ ingress_flow:
"_condition_0" -> "ingress::dispatch" [condition = true] "_condition_0" -> "ingress::dispatch" [condition = true]
"ingress::fwd" -> "ingress::tbl_act" [action = always] "ingress::fwd" -> "ingress::tbl_act" [action = always]
"ingress::tbl_act" -> "exit_control_flow" [action = always] "ingress::tbl_act" -> "exit_control_flow" [action = always]
"ingress::dispatch" -> "ingress::fwd" [action = always] "ingress::dispatch" -> "ingress::tbl_act" [action = always]
} }
start_state: _condition_0 start_state: _condition_0
type: control_flow type: control_flow
...@@ -428,7 +429,7 @@ layout: ...@@ -428,7 +429,7 @@ layout:
########################################## ##########################################
source_info: source_info:
date: 2022/02/04 11:27:23 date: 2022/04/26 14:20:01
output_file: p4src/orchestrator.yml output_file: p4src/orchestrator.yml
p4_version: '16' p4_version: '16'
source_files: source_files:
......
...@@ -2,43 +2,33 @@ import sys ...@@ -2,43 +2,33 @@ import sys
import json, ast import json, ast
from RTEInterface import RTEInterface from RTEInterface import RTEInterface
from kafka import KafkaConsumer from kafka import KafkaConsumer
import json
constants_file = open("../../constants_local.json")
constants = json.load(constants_file)
print(constants)
#consumer = KafkaConsumer('deployed', 'removeWorker', #consumer = KafkaConsumer('deployed', 'removeWorker',
# "request", bootstrap_servers='10.129.6.5:9092') # "request", bootstrap_servers='10.129.6.5:9092')
consumer = KafkaConsumer('deployed', 'removeWorker', 'COLDSTART_WORKER', 'AUTOSCALE', consumer = KafkaConsumer(constants["topics"]["deployed"], constants["topics"]["remove_worker"],
"request", bootstrap_servers='10.129.2.201:9092') constants["topics"]["request_dm_2_rm"], bootstrap_servers=constants["network"]["external"]["kafka_host"])
RTEInterface.Connect('thrift', "10.129.2.201", 20206) RTEInterface.Connect('thrift', "10.129.2.182", 20206)
tableId = "ingress::dispatch" tableId = "ingress::dispatch"
tableId2 = "ingress::fwd" tableId2 = "ingress::fwd"
ruleDictionary = {} ruleDictionary = {}
def makeRule(ip, port, mac, functionHash, tableId, rule_name, default_rule): def makeRule(ip, port, mac, functionHash, tableId, rule_name, default_rule):
rule={} actions = '{ "type" : "ingress::dispatch_act", "data" : { "dstAddr" : { "value" : "%s" }, \
if default_rule: "dstPort" : { "value" : "%d" } , "egress_port": { "value": "p1" }, "ethernetAddr": { "value": "%s" } } }' \
actions = '{ "type" : "ingress::dispatch_act", "data" : { "dstAddr" : { "value" : "%s" }, \ % (ip, int(port), mac)
"dstPort" : { "value" : "%d" } , "egress_port": { "value": "v0.1" }, "ethernetAddr": { "value": "%s" } } }' \ match = '{ "map_hdr.function_id" : { "value" : %d} }' % (functionHash)
% (ip, int(port), mac) rule = {
match = '{ }' "tableId": tableId,
rule = { "rule_name": rule_name,
"tableId": tableId, "default_rule": default_rule,
"rule_name": rule_name, "match": match,
"default_rule": default_rule, "actions": actions
"match": match, }
"actions": actions
}
else:
actions = '{ "type" : "ingress::dispatch_act", "data" : { "dstAddr" : { "value" : "%s" }, \
"dstPort" : { "value" : "%d" } , "egress_port": { "value": "v0.1" }, "ethernetAddr": { "value": "%s" } } }' \
% (ip, int(port), mac)
match = '{ "map_hdr.function_id" : { "value" : %d} }' % (functionHash)
rule = {
"tableId": tableId,
"rule_name": rule_name,
"default_rule": default_rule,
"match": match,
"actions": actions
}
return rule return rule
def addRule(worker): def addRule(worker):
...@@ -123,13 +113,14 @@ def updateDefaultRule2(): ...@@ -123,13 +113,14 @@ def updateDefaultRule2():
RTEInterface.Tables.EditRule(rule["tableId"], rule["rule_name"], rule["default_rule"], rule["match"], rule["actions"]) RTEInterface.Tables.EditRule(rule["tableId"], rule["rule_name"], rule["default_rule"], rule["match"], rule["actions"])
for msg in consumer: for msg in consumer:
if msg.topic == "deployed": # print("message received : ", msg)
if msg.topic == constants["topics"]["deployed"]:
msg = msg.value.decode('utf-8') msg = msg.value.decode('utf-8')
worker = json.loads(msg) worker = json.loads(msg)
print("received message on deployed : ", worker) print("received message on deployed : ", worker)
addRule(worker) addRule(worker)
elif msg.topic == "removeWorker": elif msg.topic == constants["topics"]["remove_worker"]:
msg = msg.value.decode('utf-8') msg = msg.value.decode('utf-8')
worker = json.loads(msg) worker = json.loads(msg)
print("received message on removeWorker : ", worker) print("received message on removeWorker : ", worker)
......
'use strict'; 'use strict';
const express = require('express') const express = require('express')
let request = require('request') // let request = require('request')
const process = require('process') const process = require('process')
const dgram = require('dgram'); const dgram = require('dgram');
...@@ -9,23 +9,23 @@ const { spawnSync, execSync } = require('child_process'); ...@@ -9,23 +9,23 @@ const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib'); // const { logger } = require('../../lib');
const server = dgram.createSocket('udp4'); const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4'); const udpProxy = dgram.createSocket('udp4');
var usage = require('usage'); // var usage = require('usage');
const os = require('os') const os = require('os')
let struct = require('jspack') let struct = require('jspack')
struct = struct.jspack struct = struct.jspack
const app = express() const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 30000, flagFirstRequest = true let port = 5000, resource_id, functionHash, runtime, idleTime = 1000, flagFirstRequest = true
let waitTime let waitTime
resource_id = process.argv[2] resource_id = process.argv[2]
functionHash = process.argv[3] functionHash = process.argv[3]
port = process.argv[4] port = process.argv[4]
runtime = process.argv[5] runtime = process.argv[5]
request = request.defaults({ // request = request.defaults({
headers: { 'x-resource-id': resource_id } // headers: { 'x-resource-id': resource_id }
}); // });
let producer let producer
try { try {
...@@ -114,7 +114,7 @@ app.listen(port, () => { ...@@ -114,7 +114,7 @@ app.listen(port, () => {
let interfaces = os.networkInterfaces() let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) { for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface] networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) { if (networkInterface[0].address.startsWith("192.168.")) {
node_id = networkInterface[0].address node_id = networkInterface[0].address
mac_address = networkInterface[0].mac mac_address = networkInterface[0].mac
} }
......
let workerNodes = {}, timeline = {} let workerNodes = {}, timeline = {}
const constants = require('../constants_local.json') const constants = require('../constants_local.json')
const Heap = require('heap'); const Heap = require('heap');
const libSupport = require('./lib'); // const libSupport = require('./lib');
const dgram = require('dgram');
const udpProxy = dgram.createSocket('udp4'); const udpProxy = dgram.createSocket('udp4');
const loadThreashold = 1 const loadThreashold = 1
...@@ -10,8 +11,8 @@ var workerHeap = new Heap(function(worker1, worker2) { ...@@ -10,8 +11,8 @@ var workerHeap = new Heap(function(worker1, worker2) {
}); });
let coldstart_worker = {}; let coldstart_worker = {};
let resourceMap = Map(), let resourceMap = new Map(),
functionToResourceMap = Map(); functionToResourceMap = new Map();
...@@ -25,8 +26,8 @@ let kafka = require('kafka-node'), ...@@ -25,8 +26,8 @@ let kafka = require('kafka-node'),
Consumer = kafka.Consumer, Consumer = kafka.Consumer,
consumer = new Consumer(client, consumer = new Consumer(client,
[ [
{ topic: 'heartbeat' }, // receives heartbeat messages from workers, also acts as worker join message { topic: constants.topics.heartbeat }, // receives heartbeat messages from workers, also acts as worker join message
{ topic: "request" }, // receives deployment details from RM { topic: constants.topics.request_dm_2_rm }, // receives deployment details from RM
{ topic: constants.topics.check_autoscale }, { topic: constants.topics.check_autoscale },
{ topic: constants.topics.autoscale }, { topic: constants.topics.autoscale },
{ topic: constants.topics.coldstart_worker } // give the information about worker having low load { topic: constants.topics.coldstart_worker } // give the information about worker having low load
...@@ -83,9 +84,10 @@ consumer.on('message', function (message) { ...@@ -83,9 +84,10 @@ consumer.on('message', function (message) {
message = message.value message = message.value
// console.log("message ",message) // console.log("message ",message)
if (topic !== "heartbeat") if (topic !== constants.topics.heartbeat)
console.log(message); console.log(message);
if (topic === "heartbeat") {
if (topic === constants.topics.heartbeat) {
message = JSON.parse(message) message = JSON.parse(message)
if (Date.now() - message.timestamp < 1000) if (Date.now() - message.timestamp < 1000)
{ {
...@@ -94,10 +96,10 @@ consumer.on('message', function (message) { ...@@ -94,10 +96,10 @@ consumer.on('message', function (message) {
console.log("New worker discovered. Worker List: ") console.log("New worker discovered. Worker List: ")
console.log(workerNodes); console.log(workerNodes);
workerHeap.push(workerNodes[message.address]) workerHeap.push(workerNodes[message.address])
if(Object.keys(workerNodes).length === 1) // if(Object.keys(workerNodes).length === 1)
{ // {
updateColdstartOnNIC(); // updateColdstartOnNIC();
} // }
} }
else{ else{
// console.log("Got heartbeat updating load of wroker ", message.address) // console.log("Got heartbeat updating load of wroker ", message.address)
...@@ -107,7 +109,7 @@ consumer.on('message', function (message) { ...@@ -107,7 +109,7 @@ consumer.on('message', function (message) {
workerNodes[message.address].timestamp = message.timestamp workerNodes[message.address].timestamp = message.timestamp
console.log("updated wroker load : ", workerNodes[message.address]) console.log("updated wroker load : ", workerNodes[message.address])
workerHeap.updateItem(workerNodes[message.address]) workerHeap.updateItem(workerNodes[message.address])
updateColdstartOnNIC(); // updateColdstartOnNIC();
} }
else{ else{
console.log("Change in worker load is less than threshold") console.log("Change in worker load is less than threshold")
...@@ -117,12 +119,12 @@ consumer.on('message', function (message) { ...@@ -117,12 +119,12 @@ consumer.on('message', function (message) {
} }
// console.log("\nheap : ",workerHeap) // console.log("\nheap : ",workerHeap)
} }
} else if (topic === "request") { } else if (topic === constants.topics.request_dm_2_rm) {
message = JSON.parse(message) message = JSON.parse(message)
console.log(message); console.log(message);
let payload = [{ let payload = [{
topic: "RESPONSE_RM_2_DM_DUMMY", topic: constants.topics.response_rm_2_dm,
messages: JSON.stringify({ messages: JSON.stringify({
"resource_id": message.resource_id, "resource_id": message.resource_id,
"timestamp": Date.now(), "timestamp": Date.now(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment