Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
X
xanadu
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
SYNERG
xanadu
Commits
c1ccb55a
Commit
c1ccb55a
authored
Apr 22, 2022
by
kedar
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
resolved bug with multiple worker nodes
parent
0212678f
Changes
48
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
48 changed files
with
3426 additions
and
1140305 deletions
+3426
-1140305
dispatch_system/benchmark.sh
dispatch_system/benchmark.sh
+23
-2
dispatch_system/bm_static_1.csv
dispatch_system/bm_static_1.csv
+7
-1140189
dispatch_system/constants_local.json
dispatch_system/constants_local.json
+9
-3
dispatch_system/dispatch_daemon/config.json
dispatch_system/dispatch_daemon/config.json
+1
-1
dispatch_system/dispatch_daemon/index.js
dispatch_system/dispatch_daemon/index.js
+142
-2
dispatch_system/dispatch_daemon/lib.js
dispatch_system/dispatch_daemon/lib.js
+15
-0
dispatch_system/dispatch_daemon/shared_meta.js
dispatch_system/dispatch_daemon/shared_meta.js
+21
-0
dispatch_system/dispatch_manager/checkheap.js
dispatch_system/dispatch_manager/checkheap.js
+29
-0
dispatch_system/dispatch_manager/index.js
dispatch_system/dispatch_manager/index.js
+13
-10
dispatch_system/dispatch_manager/lib.js
dispatch_system/dispatch_manager/lib.js
+39
-35
dispatch_system/dispatch_manager/nic/Makefile-nfp4build
dispatch_system/dispatch_manager/nic/Makefile-nfp4build
+9
-13
dispatch_system/dispatch_manager/nic/assign_ip.sh
dispatch_system/dispatch_manager/nic/assign_ip.sh
+9
-0
dispatch_system/dispatch_manager/nic/build_offload.sh
dispatch_system/dispatch_manager/nic/build_offload.sh
+3
-3
dispatch_system/dispatch_manager/nic/p4src/includes/headers.p4
...tch_system/dispatch_manager/nic/p4src/includes/headers.p4
+1
-0
dispatch_system/dispatch_manager/nic/p4src/orchestrator.bmv2.json
..._system/dispatch_manager/nic/p4src/orchestrator.bmv2.json
+2
-1
dispatch_system/dispatch_manager/nic/p4src/orchestrator.yml
dispatch_system/dispatch_manager/nic/p4src/orchestrator.yml
+2
-1
dispatch_system/dispatch_manager/nic/p4src/orchestrator_coldstart.p4
...stem/dispatch_manager/nic/p4src/orchestrator_coldstart.p4
+255
-0
dispatch_system/dispatch_manager/nic/smartnic_dispatch_monitor.py
..._system/dispatch_manager/nic/smartnic_dispatch_monitor.py
+102
-15
dispatch_system/dispatch_manager/node_modules_2/.bin/detect-libc
...h_system/dispatch_manager/node_modules_2/.bin/detect-libc
+18
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/detect-libc
...h_system/dispatch_manager/node_modules_2/.bin/detect-libc
+18
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mime
dispatch_system/dispatch_manager/node_modules_2/.bin/mime
+8
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mime
dispatch_system/dispatch_manager/node_modules_2/.bin/mime
+8
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mkdirp
dispatch_system/dispatch_manager/node_modules_2/.bin/mkdirp
+33
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mkdirp
dispatch_system/dispatch_manager/node_modules_2/.bin/mkdirp
+33
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mqtt
dispatch_system/dispatch_manager/node_modules_2/.bin/mqtt
+41
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mqtt
dispatch_system/dispatch_manager/node_modules_2/.bin/mqtt
+41
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mqtt_pub
...atch_system/dispatch_manager/node_modules_2/.bin/mqtt_pub
+146
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mqtt_pub
...atch_system/dispatch_manager/node_modules_2/.bin/mqtt_pub
+146
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mqtt_sub
...atch_system/dispatch_manager/node_modules_2/.bin/mqtt_sub
+123
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/mqtt_sub
...atch_system/dispatch_manager/node_modules_2/.bin/mqtt_sub
+123
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/prebuild-install
...tem/dispatch_manager/node_modules_2/.bin/prebuild-install
+85
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/prebuild-install
...tem/dispatch_manager/node_modules_2/.bin/prebuild-install
+85
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/rc
dispatch_system/dispatch_manager/node_modules_2/.bin/rc
+4
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/rc
dispatch_system/dispatch_manager/node_modules_2/.bin/rc
+4
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/semver
dispatch_system/dispatch_manager/node_modules_2/.bin/semver
+160
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/semver
dispatch_system/dispatch_manager/node_modules_2/.bin/semver
+160
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/sshpk-conv
...ch_system/dispatch_manager/node_modules_2/.bin/sshpk-conv
+243
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/sshpk-conv
...ch_system/dispatch_manager/node_modules_2/.bin/sshpk-conv
+243
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/sshpk-sign
...ch_system/dispatch_manager/node_modules_2/.bin/sshpk-sign
+191
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/sshpk-sign
...ch_system/dispatch_manager/node_modules_2/.bin/sshpk-sign
+191
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/sshpk-verify
..._system/dispatch_manager/node_modules_2/.bin/sshpk-verify
+167
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/sshpk-verify
..._system/dispatch_manager/node_modules_2/.bin/sshpk-verify
+167
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/uuid
dispatch_system/dispatch_manager/node_modules_2/.bin/uuid
+65
-1
dispatch_system/dispatch_manager/node_modules_2/.bin/uuid
dispatch_system/dispatch_manager/node_modules_2/.bin/uuid
+65
-1
dispatch_system/dispatch_manager/repository/worker_env/node_modules/.bin/mime
...atch_manager/repository/worker_env/node_modules/.bin/mime
+8
-1
dispatch_system/dispatch_manager/repository/worker_env/node_modules/.bin/mime
...atch_manager/repository/worker_env/node_modules/.bin/mime
+8
-1
dispatch_system/dispatch_manager/rm.js
dispatch_system/dispatch_manager/rm.js
+146
-2
dispatch_system/speedo_data_static2_1f_host.csv
dispatch_system/speedo_data_static2_1f_host.csv
+14
-0
No files found.
dispatch_system/benchmark.sh
View file @
c1ccb55a
echo
$1
# python2 send.py --client-port 8000 --closed 1 --offload 0 --req-count 50 --send-data 10 --fid $1
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid 369020 --c 1 --t 1 --n 2
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid $1 --c 1 --rps 2 --req_count 10
sudo
ip netns
exec
ns_server python benchmark_dispatcher.py
--fid
$1
--c
20
--t
300
--rps
$2
\ No newline at end of file
#! /bin/bash -ex
rps_flag
=
0
n_flag
=
0
while
getopts
'rn'
flag
;
do
case
"
${
flag
}
"
in
r
)
rps_flag
=
1
;;
n
)
n_flag
=
1
;;
esac
done
echo
$1
,
$2
,
$3
if
[[
$rps_flag
-eq
1
]]
then
sudo
ip netns
exec
ns_server python benchmark_dispatcher.py
--fid
$2
--c
50
--t
30
--rps
$3
fi
if
[[
$n_flag
-eq
1
]]
then
sudo
ip netns
exec
ns_server python benchmark_dispatcher.py
--fid
$2
--c
50
--t
100
--n
$3
fi
dispatch_system/bm_static_1.csv
View file @
c1ccb55a
This diff is collapsed.
Click to expand it.
dispatch_system/constants_local.json
View file @
c1ccb55a
...
...
@@ -2,6 +2,8 @@
"registry_url"
:
"10.129.2.201:5000/"
,
"master_port"
:
8080
,
"master_address"
:
"10.129.2.201"
,
"daemon_port"
:
9000
,
"daemon_mac"
:
"00:22:22:22:22:22"
,
"grunt_host"
:
"https://www.namandixit.net/lovecraftian_nightmares/grunt"
,
"couchdb_host"
:
"10.129.2.201:5984"
,
"env"
:
"env_udp2.js"
,
...
...
@@ -15,7 +17,7 @@
"network_bridge"
:
"xanadu_kafka-serverless"
,
"use_bridge"
:
false
,
"internal"
:
{
"kafka_host"
:
"
kafka
:9092"
"kafka_host"
:
"
10.129.2.201
:9092"
},
"external"
:
{
"kafka_host"
:
"10.129.2.201:9092"
...
...
@@ -29,10 +31,14 @@
"response_rm_2_dm"
:
"RESPONSE_RM_2_DM_DUMMY"
,
"hscale"
:
"hscale"
,
"metrics_worker"
:
"metrics_worker"
,
"log_channel"
:
"LOG_COMMON"
"log_channel"
:
"LOG_COMMON"
,
"coldstart_worker"
:
"COLDSTART_WORKER"
,
"check_autoscale"
:
"CHECK_AUTOSCALE"
,
"autoscale"
:
"AUTOSCALE"
},
"autoscalar_metrics"
:
{
"open_request_threshold"
:
100
"open_request_threshold"
:
100
,
"function_load_threshold"
:
5
},
"metrics"
:
{
"alpha"
:
0.7
...
...
dispatch_system/dispatch_daemon/config.json
View file @
c1ccb55a
{
"id"
:
"10.129.2.201"
,
"master_node"
:
"192.168.0.105"
}
\ No newline at end of file
{
"id"
:
"192.168.2.3"
,
"master_node"
:
"192.168.2.3"
}
\ No newline at end of file
dispatch_system/dispatch_daemon/index.js
View file @
c1ccb55a
...
...
@@ -10,6 +10,11 @@ const execute = require('./execute')
const
fs
=
require
(
'
fs
'
)
const
fetch
=
require
(
'
node-fetch
'
);
const
os
=
require
(
'
os
'
);
const
dgram
=
require
(
'
dgram
'
);
const
server
=
dgram
.
createSocket
(
'
udp4
'
);
let
struct
=
require
(
'
jspack
'
);
struct
=
struct
.
jspack
let
metadataDB
=
`http://
${
secrets
.
couchdb_username
}
:
${
secrets
.
couchdb_password
}
@
${
constants
.
couchdb_host
}
`
metadataDB
=
metadataDB
+
"
/
"
+
constants
.
db
.
function_meta
+
"
/
"
...
...
@@ -69,7 +74,6 @@ libSupport.makeTopic(node_id).then(() => {
logger
.
error
(
"
something went wrong
"
+
err
.
toString
())
});
}
})
...
...
@@ -170,13 +174,149 @@ function heartbeat() {
topic
:
"
heartbeat
"
,
messages
:
JSON
.
stringify
({
"
address
"
:
node_id
,
"
port
"
:
constants
.
daemon_port
,
"
mac
"
:
constants
.
daemon_mac
,
"
system_info
"
:
info
,
"
timestamp
"
:
Date
.
now
()
})
}]
console
.
log
(
"
daemon system info :
"
,
info
)
//
console.log("daemon system info : ", info)
producer
.
send
(
payload
,
function
(
cb
)
{})
}
// TODO 2: implement packer deparser for the udp packet
// TODO 3: create UPD server to get the coldstart request
server
.
on
(
'
error
'
,
(
err
)
=>
{
console
.
log
(
`server error:\n
${
err
.
stack
}
`
);
server
.
close
();
});
server
.
on
(
'
message
'
,
(
msg
,
rinfo
)
=>
{
console
.
log
(
"
message
"
,
msg
)
let
payload
=
unpackPacket
(
msg
)
console
.
log
(
payload
,
typeof
payload
);
// get the coldstart request and start the function
// logger.info("Received Deployment UDP request for resource_id: " + resource_id);
let
functionHash
=
"
function_
"
+
payload
.
function_id
let
resource_id
=
'
aaa
'
let
runtime
=
'
process
'
let
port
=
9920
let
mac
=
constants
.
daemon_mac
logger
.
info
(
"
Received Deployment UPD request
"
)
fetch
(
metadataDB
+
functionHash
).
then
(
res
=>
res
.
json
())
.
then
(
json
=>
{
console
.
log
(
"
metadata
"
,
json
);
libSupport
.
download
(
host_url
+
"
/repository/
"
+
functionHash
+
"
.js
"
,
local_repository
+
functionHash
+
"
.js
"
).
then
(()
=>
{
let
metadata
=
{
resource_id
,
functionHash
,
runtime
,
port
,
mac
,
resources
:
{
memory
:
json
.
memory
}
}
startWorker
(
local_repository
,
producer
,
metadata
)
})
}).
catch
(
err
=>
{
logger
.
error
(
"
something went wrong
"
+
err
.
toString
())
});
// lastRequest = Date.now()
// console.log("network stack time", lastRequest - payload.t1)
// totalRequest++
// executor(msg).then(result => {
// result = packPacket(msg)
// let port = 10000 + getRandomInt(0, 10)
// try {
// udpProxy.send(msg, 0, msg.length, port, rinfo.address, function (err, bytes) {
// if (err)
// console.log(err)
// console.log("response via UDP")
// })
// } catch (e) {
// console.log(e)
// }
// })
});
function
unpackPacket
(
packet
)
{
// let buffer = new Array(1024)
let
chain_id
=
null
,
exec_id
=
null
,
function_count
=
null
,
function_id
=
null
,
data
=
null
let
base
=
0
,
f0
,
f1
,
f2
,
f3
,
f4
,
t1
,
t2
,
t3
,
t4
chain_id
=
struct
.
Unpack
(
"
>I
"
,
packet
,
base
)
base
+=
4
exec_id
=
struct
.
Unpack
(
"
>I
"
,
packet
,
base
)
base
+=
4
function_id
=
struct
.
Unpack
(
"
>I
"
,
packet
,
base
)
base
+=
4
data
=
struct
.
Unpack
(
"
>I
"
,
packet
,
base
)
base
+=
4
function_count
=
struct
.
Unpack
(
"
I
"
,
packet
,
base
)
base
+=
4
f0
=
struct
.
Unpack
(
"
B
"
,
packet
,
base
)
base
+=
1
f1
=
struct
.
Unpack
(
"
B
"
,
packet
,
base
)
base
+=
1
f2
=
struct
.
Unpack
(
"
B
"
,
packet
,
base
)
base
+=
1
f3
=
struct
.
Unpack
(
"
B
"
,
packet
,
base
)
base
+=
1
f4
=
struct
.
Unpack
(
"
B
"
,
packet
,
base
)
base
+=
1
t1
=
struct
.
Unpack
(
"
I
"
,
packet
,
base
)
base
+=
8
t2
=
struct
.
Unpack
(
"
I
"
,
packet
,
base
)
base
+=
8
t3
=
struct
.
Unpack
(
"
I
"
,
packet
,
base
)
base
+=
8
t4
=
struct
.
Unpack
(
"
I
"
,
packet
,
base
)
console
.
log
(
"
chain_id
"
,
chain_id
,
"
exec_id
"
,
exec_id
,
"
data
"
,
data
,
"
function_count
"
,
function_count
,
"
function_id
"
,
function_id
)
return
{
chain_id
:
chain_id
[
0
],
exec_id
:
exec_id
[
0
],
data
:
data
[
0
],
function_count
:
function_count
[
0
],
function_id
:
function_id
[
0
],
f0
,
f1
,
f2
,
f3
,
f4
,
t1
,
t2
,
t3
,
t4
}
}
function
packPacket
(
dataPacket
)
{
let
message
=
new
Array
(
1024
)
let
base
=
0
,
chain_id
,
exec_id
,
function_id
,
data
,
function_count
chain_id
=
struct
.
PackTo
(
"
>I
"
,
message
,
base
,
[
dataPacket
.
chain_id
])
base
+=
4
exec_id
=
struct
.
PackTo
(
"
>I
"
,
message
,
base
,
[
dataPacket
.
exec_id
])
base
+=
4
function_id
=
struct
.
PackTo
(
"
>I
"
,
message
,
base
,
[
dataPacket
.
function_id
])
base
+=
4
data
=
struct
.
PackTo
(
"
>I
"
,
message
,
base
,
[
dataPacket
.
data
])
base
+=
4
function_count
=
struct
.
PackTo
(
"
B
"
,
message
,
base
,
[
dataPacket
.
function_count
])
message
=
Buffer
.
from
(
message
)
return
message
}
server
.
on
(
'
listening
'
,
()
=>
{
const
address
=
server
.
address
();
console
.
log
(
`server listening
${
address
.
address
}
:
${
address
.
port
}
`
);
});
// server.bind(port, "192.168.2.3");
server
.
bind
(
constants
.
daemon_port
);
setInterval
(
heartbeat
,
1000
);
dispatch_system/dispatch_daemon/lib.js
View file @
c1ccb55a
...
...
@@ -134,6 +134,21 @@ const logger = winston.createLogger({
});
function
getPort
(
usedPort
)
{
let
port
=
-
1
,
ctr
=
0
do
{
let
min
=
Math
.
ceil
(
30000
);
let
max
=
Math
.
floor
(
60000
);
port
=
Math
.
floor
(
Math
.
random
()
*
(
max
-
min
+
1
))
+
min
;
ctr
+=
1
;
if
(
ctr
>
30000
)
{
port
=
-
1
break
}
}
while
(
usedPort
.
has
(
port
))
return
port
}
module
.
exports
=
{
download
,
makeid
,
updateConfig
,
makeTopic
,
returnPort
,
logger
}
dispatch_system/dispatch_daemon/shared_meta.js
0 → 100644
View file @
c1ccb55a
const
secrets
=
require
(
'
./secrets.json
'
)
const
constants
=
require
(
'
.././constants_local.json
'
)
let
db
=
new
Map
(),
// queue holding request to be dispatched
resourceMap
=
new
Map
(),
// map between resource_id and resource details like node_id, port, associated function etc
functionToResource
=
new
Map
(),
// a function to resource map. Each map contains a minheap of
// resources associated with the function
workerNodes
=
new
Map
(),
// list of worker nodes currently known to the DM
functionBranchTree
=
new
Map
(),
// a tree to store function branch predictions
conditionProbabilityExplicit
=
new
Map
(),
// tree holding conditional probabilities for explicit chains
requestFlightQueue
=
new
Map
()
// map to store in flight requests
/**
* URL to the couchdb database server used to store data
*/
module
.
exports
=
{
db
,
functionBranchTree
,
functionToResource
,
workerNodes
,
resourceMap
,
conditionProbabilityExplicit
,
requestFlightQueue
}
dispatch_system/dispatch_manager/checkheap.js
0 → 100644
View file @
c1ccb55a
const
Heap
=
require
(
'
heap
'
);
var
heap
=
new
Heap
(
function
(
a
,
b
)
{
return
a
.
foo
-
b
.
foo
;
});
let
map
=
new
Map
();
// a = {foo : 3};
// b = {foo : 4};
// c = {foo : 2};
arr
=
[{
foo
:
4
},{
foo
:
5
},{
foo
:
2
}]
// map.set("foo1", a);
// map.set("foo2", b);
// map.set("foo3", c);
// heap.push({foo: 3});
// heap.push({foo: 1});
// heap.push({foo: 2});
heap
.
push
(
arr
[
0
]);
console
.
log
(
heap
)
heap
.
push
(
arr
[
1
]);
console
.
log
(
heap
)
heap
.
push
(
arr
[
2
]);
console
.
log
(
heap
)
arr
[
0
].
foo
=
1
;
// heap.pop(b);
console
.
log
(
heap
)
heap
.
updateItem
(
arr
[
0
])
console
.
log
(
heap
)
heap
.
pop
();
console
.
log
(
heap
)
dispatch_system/dispatch_manager/index.js
View file @
c1ccb55a
...
...
@@ -144,7 +144,8 @@ app.post('/serverless/deploy', (req, res) => {
res
.
send
(
"
error
"
).
status
(
400
)
}
else
{
let
func_id
=
parseInt
(
functionHash
.
slice
(
0
,
5
),
16
)
let
func_id
=
functionHash
// let func_id = parseInt(functionHash.slice(0,5),16)
//console.log(func_id)
console
.
log
(
"
Function id to be used is:
"
,
func_id
)
idToFunchashMap
.
set
(
func_id
,
functionHash
)
...
...
@@ -422,11 +423,11 @@ consumer.on('message', function (message) {
// console.log(topic, message)
if
(
topic
===
"
response
"
)
{
logger
.
info
(
"
response
"
+
message
);
}
else
if
(
topic
===
constants
.
topics
.
heartbeat
)
{
message
=
JSON
.
parse
(
message
)
// console.log(message)
console
.
log
(
"
node_to_resource_mapping :
"
,
node_to_resource_mapping
)
if
(
Date
.
now
()
-
message
.
timestamp
<
1000
)
if
(
!
workerNodes
.
has
(
message
.
address
))
{
workerNodes
.
set
(
message
.
address
,
message
.
timestamp
)
...
...
@@ -436,10 +437,10 @@ consumer.on('message', function (message) {
else
{
if
(
node_to_resource_mapping
.
has
(
message
.
address
))
{
console
.
log
(
""
)
let
resource_id
=
node_to_resource_mapping
.
get
(
message
.
address
)
resource_to_cpu_util
.
set
(
resource_id
,
message
.
system_info
.
loadavg
)
}
}
}
else
if
(
topic
==
constants
.
topics
.
deployed
)
{
try
{
...
...
@@ -487,12 +488,11 @@ consumer.on('message', function (message) {
}
}
else
if
(
topic
==
constants
.
topics
.
hscale
)
{
message
=
JSON
.
parse
(
message
)
let
resource_id
=
libSupport
.
makeid
(
constants
.
id_size
),
// each function resource request is associated with an unique ID
runtime
=
message
.
runtime
,
functionHash
=
message
.
functionHash
runtime
=
message
.
runtime
,
functionHash
=
message
.
functionHash
logger
.
info
(
`Generated new resource ID:
${
resource_id
}
for runtime:
${
runtime
}
`
);
console
.
log
(
"
Resource Status:
"
,
functionToResource
);
if
(
!
functionToResource
.
has
(
functionHash
+
runtime
)
&&
!
db
.
has
(
functionHash
+
runtime
))
{
...
...
@@ -587,13 +587,16 @@ function autoscalar() {
}
function
heapUpdate
()
{
console
.
log
(
"
functionToResource :
"
,
functionToResource
)
console
.
log
(
"
resource_to_cpu_util :
"
,
resource_to_cpu_util
)
functionToResource
.
forEach
((
resourceArray
,
functionKey
)
=>
{
//resourceArray = resourceList.toArray()
console
.
log
(
"
Function being updated:
"
,
functionKey
)
//
console.log("Function being updated: ",functionKey)
for
(
let
i
=
0
;
i
<
resourceArray
.
length
;
i
++
)
{
let
res_i
=
resourceArray
[
i
].
resource_id
;
resourceArray
[
i
].
cpu_utilization
=
resource_to_cpu_util
.
get
(
res_i
);
console
.
log
(
"
Avg load on resource-worker
"
,
i
,
"
:
"
,
resourceArray
[
i
].
cpu_utilization
)
console
.
log
(
"
Avg load on resource-worker
"
,
i
,
"
:
"
,
resourceArray
[
i
])
}
heap
.
heapify
(
resourceArray
,
libSupport
.
compare_uti
)
...
...
@@ -713,8 +716,8 @@ async function speculative_deployment(req, runtime) {
}
}
}
setInterval
(
libSupport
.
metrics
.
broadcastMetrics
,
5000
)
//
setInterval(libSupport.metrics.broadcastMetrics, 5000)
// setInterval(autoscalar, 1000);
setInterval
(
dispatch
,
1000
);
// setInterval(heapUpdate,
1
000);
// setInterval(heapUpdate,
5
000);
app
.
listen
(
port
,
()
=>
logger
.
info
(
`Server listening on port
${
port
}
!`
))
dispatch_system/dispatch_manager/lib.js
View file @
c1ccb55a
...
...
@@ -75,10 +75,13 @@ function generateExecutor(functionPath, functionHash) {
let
output
=
input
.
slice
(
0
,
insertIndex
)
+
functionFile
+
input
.
slice
(
insertIndex
)
let
hash
=
crypto
.
createHash
(
'
md5
'
).
update
(
output
).
digest
(
"
hex
"
);
console
.
log
(
hash
);
let
func_id
=
parseInt
(
hash
.
slice
(
0
,
5
),
16
)
console
.
log
(
func_id
);
fs
.
writeFileSync
(
functionPath
+
hash
+
"
.js
"
,
output
)
return
hash
// fs.writeFileSync(functionPath + hash + ".js", output)
fs
.
writeFileSync
(
functionPath
+
"
function_
"
+
func_id
+
"
.js
"
,
output
)
return
"
function_
"
+
func_id
// return hash
}
/**
...
...
@@ -89,41 +92,42 @@ function generateExecutor(functionPath, functionHash) {
*/
function
generateMicrocExecutor
(
functionPath
,
functionName
,
jsfunctionhash
)
{
//creating function.c
let
function_temp
=
fs
.
readFileSync
(
`./repository/worker_env/function_temp.c`
)
let
function_def
=
fs
.
readFileSync
(
functionPath
+
functionName
)
let
searchSize
=
"
//ADD_FUNCTION
"
.
length
let
fid
=
parseInt
(
jsfunctionhash
.
slice
(
0
,
5
),
16
)
let
insertIndex
=
function_temp
.
indexOf
(
"
//ADD_FUNCTION
"
)
+
searchSize
let
function_name
=
"
void function_
"
+
fid
+
"
(PIF_PLUGIN_map_hdr_T *mapHdr)
"
//
let function_temp = fs.readFileSync(`./repository/worker_env/function_temp.c`)
//
let function_def = fs.readFileSync(functionPath + functionName)
//
let searchSize = "//ADD_FUNCTION".length
//
let fid = parseInt(jsfunctionhash.slice(0,5), 16)
//
let insertIndex = function_temp.indexOf("//ADD_FUNCTION") + searchSize
//
let function_name = "void function_"+ fid +"(PIF_PLUGIN_map_hdr_T *mapHdr)"
let
full_function
=
function_temp
.
slice
(
0
,
insertIndex
)
+
"
\n
"
+
function_name
+
"
{
\n
"
+
function_def
+
"
\n
}
"
+
function_temp
.
slice
(
insertIndex
)
//
let full_function = function_temp.slice(0, insertIndex) +"\n"+ function_name + "{\n" +function_def +"\n}"+ function_temp.slice(insertIndex)
// let hash = crypto.createHash('md5').update(full_function).digest("hex");
// console.log(hash);
console
.
log
(
full_function
);
fs
.
writeFileSync
(
functionPath
+
"
offload/
"
+
jsfunctionhash
+
"
.c
"
,
full_function
)
//
// let hash = crypto.createHash('md5').update(full_function).digest("hex");
//
// console.log(hash);
//
console.log(full_function);
//
fs.writeFileSync(functionPath +"offload/"+ jsfunctionhash + ".c", full_function)
//adding call to function when match with fid
return
new
Promise
((
resolve
)
=>
{
let
main_function_temp
=
fs
.
readFileSync
(
functionPath
+
"
offload/
"
+
"
static_dispatch_function.c
"
)
// let client_function = fs.readFileSync(functionPath + "offload/"+jsfunctionhash+".c")
searchSize
=
"
//ADD_FUNCTION_EXTERNS
"
.
length
insertIndex
=
main_function_temp
.
indexOf
(
"
//ADD_FUNCTION_EXTERNS
"
)
+
searchSize
let
extern_name
=
"
extern void function_
"
+
fid
+
"
(PIF_PLUGIN_map_hdr_T *mapHdr)
"
let
main_function
=
main_function_temp
.
slice
(
0
,
insertIndex
)
+
"
\n
"
+
extern_name
+
"
;
\n
"
+
main_function_temp
.
slice
(
insertIndex
)
console
.
log
(
"
MAIN FUNCTION :
\n
"
,
main_function
)
let
hash
=
crypto
.
createHash
(
'
md5
'
).
update
(
full_function
).
digest
(
"
hex
"
);
// console.log(hash);
searchSize
=
"
//ADD_FUNCTION_CONDITION
"
.
length
insertIndex
=
main_function
.
indexOf
(
"
//ADD_FUNCTION_CONDITION
"
)
+
searchSize
let
inc_pkt_count
=
"
function_packet_count[
"
+
fid
+
"
-10000]++;
"
let
if_else_cond
=
"
else if( fid ==
"
+
fid
+
"
) {
\n
"
+
inc_pkt_count
+
"
\n
function_
"
+
fid
+
"
(mapHdr);
\n
}
"
let
main_function_full
=
main_function
.
slice
(
0
,
insertIndex
)
+
"
\n
"
+
if_else_cond
+
"
\n
"
+
main_function
.
slice
(
insertIndex
)
console
.
log
(
main_function_full
);
fs
.
writeFileSync
(
functionPath
+
"
offload/
"
+
"
static_dispatch_function.c
"
,
main_function_full
)
return
hash
});
// //adding call to function when match with fid
// return new Promise((resolve) => {
// let main_function_temp = fs.readFileSync(functionPath +"offload/"+ "static_dispatch_function.c")
// // let client_function = fs.readFileSync(functionPath + "offload/"+jsfunctionhash+".c")
// searchSize = "//ADD_FUNCTION_EXTERNS".length
// insertIndex = main_function_temp.indexOf("//ADD_FUNCTION_EXTERNS") + searchSize
// let extern_name = "extern void function_"+fid +"(PIF_PLUGIN_map_hdr_T *mapHdr)"
// let main_function = main_function_temp.slice(0, insertIndex) +"\n"+ extern_name+";\n"+ main_function_temp.slice(insertIndex)
// console.log("MAIN FUNCTION : \n",main_function)
// let hash = crypto.createHash('md5').update(full_function).digest("hex");
// // console.log(hash);
// searchSize = "//ADD_FUNCTION_CONDITION".length
// insertIndex = main_function.indexOf("//ADD_FUNCTION_CONDITION") + searchSize
// let inc_pkt_count = "function_packet_count["+fid+"-10000]++;"
// let if_else_cond = "else if( fid == "+fid + " ) {\n "+inc_pkt_count +"\nfunction_"+fid+"(mapHdr);\n}"
// let main_function_full = main_function.slice(0, insertIndex) +"\n"+ if_else_cond +"\n"+ main_function.slice(insertIndex)
// console.log(main_function_full);
// fs.writeFileSync(functionPath +"offload/"+ "static_dispatch_function.c", main_function_full)
// return 'xyz';
// return hash
// });
}
/**
...
...
dispatch_system/dispatch_manager/nic/Makefile-nfp4build
View file @
c1ccb55a
#
# Generated Makefile for orchestrator
_speedo
# Generated Makefile for orchestrator
#
ifndef SDKDIR
...
...
@@ -122,7 +122,7 @@ ifneq ($(NFAS_FOUND),found)
$(warning warning: nfas not found or not executable, on windows please run nfp4term.bat)
endif
$(OUTDIR)/orchestrator
_speedo
.nffw: $(OUTDIR)/nfd_pcie0_pd0.list/nfd_pcie0_pd0.list \
$(OUTDIR)/orchestrator.nffw: $(OUTDIR)/nfd_pcie0_pd0.list/nfd_pcie0_pd0.list \
$(OUTDIR)/nfd_pcie0_pci_in_issue1.list/nfd_pcie0_pci_in_issue1.list \
$(OUTDIR)/nfd_pcie0_pci_out_me0.list/nfd_pcie0_pci_out_me0.list \
$(OUTDIR)/nbi_init_csr.list/nbi_init_csr.list \
...
...
@@ -186,17 +186,17 @@ $(PIFOUTDIR)/build_info.json: $(MAKEFILE_LIST)
# Generate IR from P4
#
$(OUTDIR)/orchestrator
_speedo.yml: p4src/orchestrator_speedo
.p4 \
$(OUTDIR)/orchestrator
.yml: p4src/orchestrator
.p4 \
$(MAKEFILE_LIST)
@echo ---------
@echo compiling p4 $@
@echo ---------
@mkdir -p $(PIFOUTDIR)
$(SDKP4DIR)/bin/nfp4c -o $(OUTDIR)/orchestrator
_speedo
.yml \
$(SDKP4DIR)/bin/nfp4c -o $(OUTDIR)/orchestrator.yml \
--p4-version 16 \
--p4-compiler p4c-nfp \
--source_info \
p4src/orchestrator
_speedo
.p4
p4src/orchestrator.p4
#
...
...
@@ -229,16 +229,16 @@ $(PIFOUTDIR)/pif_pkt_clone%h \
$(PIFOUTDIR)/pif_flcalc%c \
$(PIFOUTDIR)/pif_flcalc%h \
$(PIFOUTDIR)/pif_field_lists%h \
$(PIFOUTDIR)/pif_parrep_pvs_sync%c : $(OUTDIR)/orchestrator
_speedo
%yml $(MAKEFILE_LIST)
$(PIFOUTDIR)/pif_parrep_pvs_sync%c : $(OUTDIR)/orchestrator%yml $(MAKEFILE_LIST)
@echo ---------
@echo generating pif $@
@echo ---------
@mkdir -p $(PIFOUTDIR)
$(SDKP4DIR)/bin/nfirc -o $(PIFOUTDIR)/ \
--p4info $(OUTDIR)/orchestrator
_speedo
.p4info.json \
--p4info $(OUTDIR)/orchestrator.p4info.json \
--debugpoints \
--mac_ingress_timestamp \
$(OUTDIR)/orchestrator
_speedo
.yml
$(OUTDIR)/orchestrator.yml
#
...
...
@@ -707,8 +707,6 @@ $(OUTDIR)/pif_app_nfd.list/pif_app_nfd.list: $(SDKP4DIR)/components/nfp_pif/me/a
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_flcalc_algorithms.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_memops.c \
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c \
p4src/static_dispatch_function.c \
p4src/nic_function_test.c \
$(PIFOUTDIR)/pif_design.h \
$(MAKEFILE_LIST)
@echo ---------
...
...
@@ -817,9 +815,7 @@ $(OUTDIR)/pif_app_nfd.list/pif_app_nfd.list: $(SDKP4DIR)/components/nfp_pif/me/a
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_init.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_flcalc_algorithms.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_memops.c \
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c \
p4src/static_dispatch_function.c \
p4src/nic_function_test.c
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c
#
# APP_MASTER
...
...
dispatch_system/dispatch_manager/nic/assign_ip.sh
View file @
c1ccb55a
...
...
@@ -51,3 +51,12 @@ sudo ip netns exec ns_server ifconfig vf0_2 mtu 9000
# sudo ip addr add 10.129.6.5/24 dev bridgek0
# sudo ip link set bridgek0 up
# create veth cable for kafka
sudo
ip
link
add veth_nnic0
type
veth peer name veth_nnic1
sudo
ip
link set
veth_nnic0 netns ns_server
sudo
ip netns
exec
ns_server ip addr add 10.128.2.201/24 dev veth_nnic0
sudo
ip netns
exec
ns_server ip
link set
dev veth_nnic0 up
sudo
ip addr add 10.128.2.200/24 dev veth_nnic1
sudo
ip
link set
dev veth_nnic1 up
dispatch_system/dispatch_manager/nic/build_offload.sh
View file @
c1ccb55a
...
...
@@ -30,10 +30,10 @@ fi
if
[[
$offload_flag
-eq
1
]]
then
# move to p4 bin
cd
/opt/netronome
/p4/bin/
# cd /home/ub-01/mahendra/nfp-sdk-6.1.0-preview
/p4/bin/
# offload
sudo
.
/rtecli design-load
-f
$location
/p4src/orchestrator.nffw
-c
$location
/p4src/echo.p4cfg
-p
$location
/p4src/out/pif_design.json
sudo
/opt/netronome/p4/bin
/rtecli design-load
-f
$location
/p4src/orchestrator.nffw
-c
$location
/p4src/echo.p4cfg
-p
$location
/p4src/out/pif_design.json
# returning back to base
cd
$location
...
...
@@ -45,4 +45,4 @@ then
docker stop
$(
docker ps
-a
-q
)
||
true
#assigning IPs to network interfaces
sudo
./assign_ip.sh
fi
\ No newline at end of file
fi
dispatch_system/dispatch_manager/nic/p4src/includes/headers.p4
View file @
c1ccb55a
...
...
@@ -67,6 +67,7 @@ header map_hdr_t {
bit<8> f2;
bit<8> f3;
bit<8> f4;
bit<8> autoscaling;
// bit<8> batch_count;
}
...
...
dispatch_system/dispatch_manager/nic/p4src/orchestrator.bmv2.json
View file @
c1ccb55a
...
...
@@ -62,7 +62,8 @@
[
"f1"
,
8
,
false
],
[
"f2"
,
8
,
false
],
[
"f3"
,
8
,
false
],
[
"f4"
,
8
,
false
]
[
"f4"
,
8
,
false
],
[
"autoscaling"
,
8
,
false
]
]
},
{
...
...
dispatch_system/dispatch_manager/nic/p4src/orchestrator.yml
View file @
c1ccb55a
...
...
@@ -75,6 +75,7 @@ map_hdr:
-
f2
:
8
-
f3
:
8
-
f4
:
8
-
autoscaling
:
8
type
:
header
resubmit_meta
:
...
...
@@ -427,7 +428,7 @@ layout:
##########################################
source_info
:
date
:
202
1/12/15 05:37:07
date
:
202
2/02/04 11:27:23
output_file
:
p4src/orchestrator.yml
p4_version
:
'
16'
source_files
:
...
...
dispatch_system/dispatch_manager/nic/p4src/orchestrator_coldstart.p4
0 → 100644
View file @
c1ccb55a
#include <core.p4>
#define V1MODEL_VERSION 20200408
#include <v1model.p4>
#include "includes/defines.p4"
#include "includes/headers.p4"
#include "includes/parsers.p4"
//extern void prime();
//extern void prime2();
extern void static_dispatch_function();
//extern void countpacket();
//struct digest_t {
// bit<32> index;
// bit<48> dstAddr;
// bit<48> srcAddr;
//bit<16> etherType;
//}
//struct digest_time_t {
// bit<64> igt;
// bit<64> cgt;
// bit<64> time_taken;
//}
//struct digest_t2 {
// bit<32> req_fid;
//}
struct digest_check_udp_port{
bit<16> udp_port;
bit<32> fid;
bit<4> packet_count;
bit<32> src_ip;
bit<32> dst_ip;
}
control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_t standard_metadata) {
//register<bit<8>>(10000) function_id_check;
register<bit<4>>(1) fwd_checks;
//bit<8> pc;
bit<4> pc2=0;
bit<1> static=1w1;
@name(".fwd_act") action fwd_act(bit<16> port) {
standard_metadata.egress_spec = port;
}
@name(".fwd") table fwd {
actions = {
fwd_act;
}
key = {
standard_metadata.ingress_port : exact;
}
}
@name(".dispatch_act") action dispatch_act(bit<32> dstAddr, bit<16> dstPort, bit<48> ethernetAddr , bit<16> egress_port) {
hdr.ipv4.dstAddr = dstAddr;
hdr.udp.dstPort = dstPort;
hdr.ethernet.dstAddr = ethernetAddr;
//digest_t d0;