Commit 0212678f authored by Mahendra Patel's avatar Mahendra Patel

resolved bugs and added microC functionality

parent 0357774b

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.

This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
echo $1
# python2 send.py --client-port 8000 --closed 1 --offload 0 --req-count 50 --send-data 10 --fid $1
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid 369020 --c 1 --t 1 --n 2
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid $1 --c 1 --rps 2 --req_count 10
sudo ip netns exec ns_server python benchmark_dispatcher.py --fid $1 --c 20 --t 300 --rps $2
\ No newline at end of file
......@@ -6,6 +6,7 @@ import random
import time
import numpy as np
import argparse
import csv
parser = argparse.ArgumentParser(description='Mininet demo')
......@@ -31,7 +32,8 @@ runtime = args.t
concurrency = args.c
SERVER_IP = "192.168.2.3"
packet_holder = [None] * 11
# packet_holder = [None] * 11
packet_holder = [[] for i in range(12)]
ingress_time = {}
stop_thread = False
......@@ -40,16 +42,20 @@ def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 10000 + i
print i
#print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# s.setblocking(0)
s.bind((CLIENT_IP, port))
# s.setblocking(0)
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
print "stop thread r"
break
packet, addr = s.recvfrom(1024)
#print "packet received : ", packet
packet_holder[i].append((packet, time.time() ))
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
......@@ -68,7 +74,7 @@ def genPacket():
chain_id = struct.pack(">I", chain_id) # chain id
exec_id_packed = struct.pack(">I", exec_id) # execution id
dataInt = 0
dataInt =1
# print " dataInt", dataInt
data = struct.pack(">I", dataInt) # data
......@@ -112,7 +118,7 @@ def send():
packet, exec_id = genPacket()
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time() * 1000
print("s", "{0:f}".format(ingress_time[exec_id]))
print("send", "{0:f}".format(ingress_time[exec_id]))
elif args.rps is not None:
......@@ -127,7 +133,9 @@ def send():
t.start()
time.sleep(runtime)
print "stoppping thread"
stop_thread = True
print "thread stopped"
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
......@@ -143,17 +151,25 @@ def printStatistics():
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
print e2e_time
#print e2e_time
data = np.array(e2e_time, dtype=float)
np.savetxt("bm_static_1.csv", data, delimiter=' ', header='')
p50 = np.percentile(data, 50)
p95 = np.percentile(data, 95)
p99 = np.percentile(data, 99)
mean = np.mean(data)
print("mean \t p50 \t p95 \t p99")
print(mean, p50, p95, p99)
fields=[args.rps, mean, len(e2e_time) / runtime, len(ingress_time), p50, p95, p99]
with open('speedo_data_static2_1f_host.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
r=None
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
......@@ -161,7 +177,9 @@ for i in range(0, 11):
time.sleep(1)
send()
time.sleep(2)
time.sleep(170)
# r.join()
printStatistics()
#print "packet holder : ",packet_holder
#print "ingress_time : ",ingress_time
......@@ -38,15 +38,15 @@ def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 8080))
print "listening to {} at port {}".format(CLIENT_IP, 8080)
s.bind((CLIENT_IP, 10001))
print "listening to {} at port {}".format(CLIENT_IP, 10001)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
# print packet
print "received packet : ",packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
......@@ -59,8 +59,9 @@ def receive():
function_count = struct.unpack("B", packet[base])[0]
t = int(time.time() * 1000) % 1000000000
data = int(data) - t
print "rec", chain_id, exec_id, data, function_id, function_count,
# data = int(data) - t
print "recvied data : , chain_id, exec_id, data, function_id, function_count"
print "recvied data : ", chain_id, exec_id, data, function_id, function_count
def genPacket():
......@@ -72,13 +73,16 @@ def genPacket():
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
dataInt = int(time.time() * 1000) % 1000000000
# dataInt = int(time.time() * 1000) % 1000000000
dataInt = 21
print "data : ", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
......@@ -144,6 +148,7 @@ r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(1)
time.sleep(2)
send()
r.join()
time.sleep(5)
# r.join()
import socket
import struct
import time
import threading
import random
import time
import numpy as np
import argparse
import csv
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--t', help='Runtime',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
runtime = args.t
concurrency = args.c
SERVER_IP = "192.168.2.3"
# packet_holder = [None] * 11
packet_holder = [[] for i in range(12)]
ingress_time = {}
stop_thread = False
def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 20000 + i
#print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# s.setblocking(0)
s.bind((CLIENT_IP, port))
# s.setblocking(0)
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
print "stop thread r"
break
packet, addr = s.recvfrom(1024)
#print "packet received : ", packet
packet_holder[i].append((packet, time.time() ))
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
# print chain_id, exec_id, "function_id", function_id, function_count, \
# f0, f1, f2, f3, f4,
chain_id = struct.pack(">I", chain_id) # chain id
exec_id_packed = struct.pack(">I", exec_id) # execution id
dataInt =1
# print " dataInt", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id_packed + function_id + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, exec_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
if time.time() - start_time > runtime:
break
packet, exec_id = genPacket()
if exec_id in ingress_time:
continue
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time()
time.sleep(sleep_time)
def send():
global egress_time, ingress_time, concurrency, runtime, stop_thread
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("Sending packet to %s at port %s" % (SERVER_IP, PORT))
print("Runtime: %d Concurrency %d" % (runtime, concurrency))
print("chain id, exec id, data, function count, functions dependencies...")
# op = struct.unpack("B", packet[0])
if args.n is not None:
for i in range(args.n):
packet, exec_id = genPacket()
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time() * 1000
print("send", "{0:f}".format(ingress_time[exec_id]))
elif args.rps is not None:
start_time = time.time()
sleep_time = concurrency / float(args.rps)
print("calculated inter-arrival time, offload mode", sleep_time)
for i in range(concurrency):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
print "stoppping thread"
stop_thread = True
print "thread stopped"
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
def printStatistics():
global runtime
e2e_time = []
for packetThread in packet_holder:
for packetTuple in packetThread:
packet = packetTuple[0]
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
#print e2e_time
data = np.array(e2e_time, dtype=float)
np.savetxt("bm_static_1.csv", data, delimiter=' ', header='')
p50 = np.percentile(data, 50)
p95 = np.percentile(data, 95)
p99 = np.percentile(data, 99)
mean = np.mean(data)
print("mean \t p50 \t p95 \t p99")
print(mean, p50, p95, p99)
fields=[args.rps, mean, len(e2e_time) / runtime, len(ingress_time), p50, p95, p99]
with open('speedo_data_static2_func2_nic2.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
r=None
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
r.start()
time.sleep(1)
send()
time.sleep(170)
# r.join()
printStatistics()
#print "packet holder : ",packet_holder
#print "ingress_time : ",ingress_time
......@@ -31,6 +31,7 @@ def receive(i):
break
packet, addr = s.recvfrom(1024)
packet_holder[i].append((packet, time.time() ))
print("packet received : ", packet, addr)
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
......@@ -45,6 +46,9 @@ def printStatistics():
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
print "print stat : ",packetTuple[1]
# ,ingress_time[exec_id]
# print "e2e time : ",(packetTuple[1] - ingress_time[exec_id])* 1000
# e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
# data = np.array(e2e_time, dtype=float)
......@@ -56,6 +60,7 @@ def printStatistics():
# print(mean, p50, p95, p99)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
ri = []
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
......
echo $1
# python2 send.py --client-port 8000 --closed 1 --offload 0 --req-count 50 --send-data 10 --fid $1
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid 369020 --c 1 --t 1 --n 2
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid $1 --c 1 --rps 2 --req_count 10
sudo ip netns exec ns_server python benchmark_dispatcher_func2.py --fid $1 --c 20 --t 300 --rps $2
\ No newline at end of file
import socket
import struct
import time
import threading
import random
import time
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--req_count', help='request count',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
SERVER_IP = "192.168.2.3"
# egress_time = []
# ingress_time = []
egress_time={}
ingress_time={}
stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 10001))
print "listening to {} at port {}".format(CLIENT_IP, 10001)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
print "received packet : ",packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
t = int(time.time() * 1000) % 1000000000
# data = int(data) - t
egress_time[exec_id] = time.time()
print "recvied data : , chain_id, exec_id, data, function_id, function_count"
print "recvied data : ", chain_id, exec_id, data, function_id, function_count
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
print exec_id
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
# dataInt = int(time.time() * 1000) % 1000000000
dataInt = 21
print "data : ", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id_packed = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id_packed + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, function_id, exec_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
packet, function_id,exec_id = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id]=time.time()
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
# op = struct.unpack("B", packet[0])
packet, _ , exec_id = genPacket()
if args.n is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time()
elif args.rps is not None:
runtime = 10
thread_count = args.c
start_time = time.time()
sleep_time = thread_count / float(args.rps)
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(2)
send()
time.sleep(5)
# r.join()
This diff is collapsed.
{
"registry_url": "localhost:5000/",
"registry_url": "10.129.2.201:5000/",
"master_port": 8080,
"master_address": "localhost",
"master_address": "10.129.2.201",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "localhost:5984",
"env": "env_udp.js",
"couchdb_host": "10.129.2.201:5984",
"env": "env_udp2.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
......@@ -13,7 +13,7 @@
},
"network": {
"network_bridge": "xanadu_kafka-serverless",
"use_bridge": true,
"use_bridge": false,
"internal": {
"kafka_host": "kafka:9092"
},
......@@ -28,6 +28,7 @@
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"metrics_worker": "metrics_worker",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
......
{
"registry_url": "localhost:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "localhost:5984",
"env": "env_udp2.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "xanadu_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "10.129.2.201:9092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"metrics_worker": "metrics_worker",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
......@@ -49,6 +49,7 @@ function runProcess(local_repository, metadata) {
const process = spawn('node', [filename, resource_id, functionHash, port, "process",
constants.network.external.kafka_host, `--max-old-space-size=${memory}` ]);
console.log("pid of the process is ", process.pid);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
......@@ -112,10 +113,10 @@ function runContainer(metadata) {
logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
// let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let add_network = spawn('docker', ['network', 'connect', 'macvlantest', resource_id])
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
// let add_network = spawn('docker', ['network', 'connect', 'macvlantest', resource_id])
let _ = spawn('docker', ['start', resource_id])
let _ = spawn('docker', ['start', resource_id])
_.on('data', (data) => {
console.log("container started", data);
......@@ -159,8 +160,12 @@ let add_network = spawn('docker', ['network', 'connect', 'macvlantest', resource
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
console.log("resource id is: ",resource_id)
var container_id
process.stdout.on('data', (data) => {
//container_id = data.toString
logger.info(`stdout: ${data.toString()}`);
console.log(data.toString())
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
/**
......
......@@ -49,7 +49,7 @@ libSupport.makeTopic(node_id).then(() => {
/**
* Download necessary files (function file) and Start resource deployment
*/
if (message.type === "execute") {
if (message.type === "execute" && topic === node_id) {
logger.info("Received Deployment request for resource_id: " + resource_id);
fetch(metadataDB + functionHash).then(res => res.json())
.then(json => {
......@@ -143,7 +143,10 @@ function startWorker(local_repository, producer, metadata) {
}) }], () => { })
})
else if (runtime === "container")
{
console.log("rutime is container : ",metadata)
execute.runContainer(metadata)
}
else {
producer.send(
[{
......@@ -171,6 +174,7 @@ function heartbeat() {
"timestamp": Date.now()
})
}]
console.log("daemon system info : ", info)
producer.send(payload, function(cb) {})
}
......
......@@ -16,10 +16,11 @@
"jspack": "^0.0.4",
"kafka-node": "^5.0.0",
"morgan": "^1.9.1",
"mqtt": "^3.0.0",
"mqtt": "^4.2.8",
"node-fetch": "^2.6.0",
"redis": "^2.8.0",
"redis": "^3.1.2",
"request": "^2.88.2",
"usage": "^0.7.1",
"winston": "^3.2.1"
}
}
hash="0be552416301a14a3fe4023a0c85f209"
let fid = parseInt(hash.slice(0,5), 16)
console.log("fid : ",fid)
\ No newline at end of file
This diff is collapsed.
......@@ -13,7 +13,13 @@ const { createLogger, format, transports } = winston;
const heap = require('heap')
const dgram = require('dgram');
const udpProxy = dgram.createSocket('udp4');
let struct = require('jspack')
//const req_make = require('request')
//const https = require('https')
const axios = require('axios')
// const indexfile = require('./index')
let struct = require('jspack');
const { resourceLimits } = require('worker_threads');
struct = struct.jspack
......@@ -23,7 +29,10 @@ let db = sharedMeta.db, // queue holding request to be dispatched
// resources associated with the function
functionBranchTree = sharedMeta.functionBranchTree, // Holds the function path's and related probability distribution
timelineQueue = new Map(), // a temporary map holding request timestamps to be used for calulcating implicit chain invocation delays
requestFlightQueue = sharedMeta.requestFlightQueue
requestFlightQueue = sharedMeta.requestFlightQueue,
idToFunchashmap = sharedMeta.idToFunchashmap,
resource_to_cpu_util = sharedMeta.resource_to_cpu_util,
node_to_resource_mapping = sharedMeta.node_to_resource_mapping
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -56,6 +65,7 @@ function makeid(length) {
* @param {string Function Hash value} functionHash
*/
function generateExecutor(functionPath, functionHash) {
let input = fs.readFileSync(`./repository/worker_env/${constants.env}`)
let functionFile = fs.readFileSync(functionPath + functionHash)
let searchSize = "(resolve, reject) => {".length
......@@ -69,7 +79,52 @@ function generateExecutor(functionPath, functionHash) {
fs.writeFileSync(functionPath + hash + ".js", output)
return hash
}
}
/**
* generates the runtime executor after inserting the received function
* TODO: make this asynchronous
* @param {string Path from where to extract the function} functionPath
* @param {string Function Hash value} functionHash
*/
function generateMicrocExecutor(functionPath, functionName, jsfunctionhash) {
//creating function.c
let function_temp = fs.readFileSync(`./repository/worker_env/function_temp.c`)
let function_def = fs.readFileSync(functionPath + functionName)
let searchSize = "//ADD_FUNCTION".length
let fid = parseInt(jsfunctionhash.slice(0,5), 16)
let insertIndex = function_temp.indexOf("//ADD_FUNCTION") + searchSize
let function_name = "void function_"+ fid +"(PIF_PLUGIN_map_hdr_T *mapHdr)"
let full_function = function_temp.slice(0, insertIndex) +"\n"+ function_name + "{\n" +function_def +"\n}"+ function_temp.slice(insertIndex)
// let hash = crypto.createHash('md5').update(full_function).digest("hex");
// console.log(hash);
console.log(full_function);
fs.writeFileSync(functionPath +"offload/"+ jsfunctionhash + ".c", full_function)
//adding call to function when match with fid
return new Promise((resolve) => {
let main_function_temp = fs.readFileSync(functionPath +"offload/"+ "static_dispatch_function.c")
// let client_function = fs.readFileSync(functionPath + "offload/"+jsfunctionhash+".c")
searchSize = "//ADD_FUNCTION_EXTERNS".length
insertIndex = main_function_temp.indexOf("//ADD_FUNCTION_EXTERNS") + searchSize
let extern_name = "extern void function_"+fid +"(PIF_PLUGIN_map_hdr_T *mapHdr)"
let main_function = main_function_temp.slice(0, insertIndex) +"\n"+ extern_name+";\n"+ main_function_temp.slice(insertIndex)
console.log("MAIN FUNCTION : \n",main_function)
let hash = crypto.createHash('md5').update(full_function).digest("hex");
// console.log(hash);
searchSize = "//ADD_FUNCTION_CONDITION".length
insertIndex = main_function.indexOf("//ADD_FUNCTION_CONDITION") + searchSize
let inc_pkt_count = "function_packet_count["+fid+"-10000]++;"
let if_else_cond = "else if( fid == "+fid + " ) {\n "+inc_pkt_count +"\nfunction_"+fid+"(mapHdr);\n}"
let main_function_full = main_function.slice(0, insertIndex) +"\n"+ if_else_cond +"\n"+ main_function.slice(insertIndex)
console.log(main_function_full);
fs.writeFileSync(functionPath +"offload/"+ "static_dispatch_function.c", main_function_full)
return hash
});
}
/**
* Reverse proxy to take user requests and forward them to appropriate workers using a loadbalacer
......@@ -77,16 +132,21 @@ function generateExecutor(functionPath, functionHash) {
* @param {JSON} res Object to use to return the response to the user
*/
async function reverseProxy(req, res) {
//console.log("request is: ",req.body)
//console.log("response is: ",res)
console.log("reverseProxy called !!!")
res.reverse_ingress = Date.now()
if (req.headers['x-chain-type'] !== 'explicit' && req.body.type === "tcp")
branchChainPredictor(req)
let runtime = req.body.runtime
//console.log(runtime==="process")
let id = req.params.id + runtime
/**
* Bypass deployment pipeline if resource available
*/
console.log("functionToResource : ", functionToResource)
let functionHeap = functionToResource.get(id)
console.log("functionHeap : ", functionHeap)
// loadbalancing by choosing worker with lowest load
let forwardTo = functionHeap[0]
let resource = resourceMap.get(forwardTo.resource_id)
......@@ -95,7 +155,7 @@ async function reverseProxy(req, res) {
let url = `http://${resource.node_id}:${resource.port}/serverless/function/execute`
// logger.info("Request received at reverseproxy. Forwarding to: " + url);
// forwardTo.open_request_count += 1
// forwardTo.open_request_count += 1l,
// TODO: stopping loadbalancer
// heap.heapify(functionHeap, compare) // maintain loadbalancer by heapifying the Map
res.lookup_time = Date.now()
......@@ -105,15 +165,17 @@ async function reverseProxy(req, res) {
body: req.body,
json: true // Automatically stringifies the body to JSON
};
console.log("option created from reverse proxy : ", options, "body type : ", req.body , req.body.type) // new code
if (req.body.type === "tcp") {
console.log("tcp request to reverseproxy")
try {
// await new Promise(resolve => setTimeout(resolve, 5000));
let parsedBody = await rp(options)
let serviceTime = Date.now() - res.timestamp
res.json(parsedBody)
forwardTo.open_request_count -= 1
heap.heapify(functionHeap, compare)
heap.heapify(functionHeap, compare_uti)
let functionHash = req.params.id
let functionData = functionBranchTree.get(functionHash)
......@@ -175,6 +237,8 @@ async function reverseProxy(req, res) {
}
}
function getPort(usedPort) {
let port = -1, ctr = 0
do {
......@@ -219,6 +283,10 @@ function compare(a, b) {
return a.open_request_count - b.open_request_count
}
function compare_uti(a, b) {
return a.cpu_utilization - b.cpu_utilization
}
async function branchChainPredictor(req) {
// console.log(req.headers['x-resource-id']);
let destinationTimestamp = Date.now()
......@@ -420,11 +488,30 @@ udpProxy.on('error', (err) => {
udpProxy.close();
});
udpProxy.on('message', (msg, rinfo) => {
udpProxy.on('message', async (msg, rinfo) => {
//console.log("request has come?")
let result = unpackPacket(msg)
console.log("received request , result = ", result, "requestflighqueu = ", requestFlightQueue)
let res = requestFlightQueue.get(result.exec_id)
console.log("res = ",res)
console.log("udp received request !!", result)
let funchash = idToFunchashmap.get(result.function_id)
try{
const res2 = await axios({
method: 'post',
url: 'http://localhost:8080/serverless/execute/' + funchash,
headers: {},
data: {
runtime: 'process' // This is the body part
},
});
console.log(res2)
}
catch(err)
{
console.error(err)
}
// indexfile.dispatch()
// res.json(result)
// console.log("resource_lookup",
// res.dispatch_time - res.timestamp,
......@@ -488,13 +575,13 @@ function packPacket(dataPacket) {
let message = new Array(1024)
let base = 0, chain_id, exec_id, function_id, data, function_count
let f0, f1, f2, f3, f4, t1, t2, t3, t4
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
chain_id = struct.PackTo("I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
exec_id = struct.PackTo("I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
function_id = struct.PackTo("I", message, base, [dataPacket.function_id])
base += 4
data = struct.PackTo(">I", message, base, [dataPacket.data])
data = struct.PackTo("I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
base += 1
......@@ -510,7 +597,7 @@ function packPacket(dataPacket) {
f4 = struct.PackTo("B", message, base, [0])
base += 1
t1 = struct.PackTo(">I", message, base, [Date.now()])
t1 = struct.PackTo("I", message, base, [Date.now()])
base += 4
t2 = struct.PackTo("I", message, base, [1234])
base += 4
......@@ -526,8 +613,8 @@ function packPacket(dataPacket) {
udpProxy.bind(constants.master_port); // starting UDP server for offloaded endpoints
module.exports = {
makeid, generateExecutor, reverseProxy,
getPort, logger, compare,
makeid, generateExecutor, generateMicrocExecutor, reverseProxy,
getPort, logger, compare, compare_uti,
logBroadcast, fetchData, metrics,
producer
}
#
# Generated Makefile for orchestrator
# Generated Makefile for orchestrator_speedo
#
ifndef SDKDIR
......@@ -122,7 +122,7 @@ ifneq ($(NFAS_FOUND),found)
$(warning warning: nfas not found or not executable, on windows please run nfp4term.bat)
endif
$(OUTDIR)/orchestrator.nffw: $(OUTDIR)/nfd_pcie0_pd0.list/nfd_pcie0_pd0.list \
$(OUTDIR)/orchestrator_speedo.nffw: $(OUTDIR)/nfd_pcie0_pd0.list/nfd_pcie0_pd0.list \
$(OUTDIR)/nfd_pcie0_pci_in_issue1.list/nfd_pcie0_pci_in_issue1.list \
$(OUTDIR)/nfd_pcie0_pci_out_me0.list/nfd_pcie0_pci_out_me0.list \
$(OUTDIR)/nbi_init_csr.list/nbi_init_csr.list \
......@@ -186,17 +186,17 @@ $(PIFOUTDIR)/build_info.json: $(MAKEFILE_LIST)
# Generate IR from P4
#
$(OUTDIR)/orchestrator.yml: p4src/orchestrator.p4 \
$(OUTDIR)/orchestrator_speedo.yml: p4src/orchestrator_speedo.p4 \
$(MAKEFILE_LIST)
@echo ---------
@echo compiling p4 $@
@echo ---------
@mkdir -p $(PIFOUTDIR)
$(SDKP4DIR)/bin/nfp4c -o $(OUTDIR)/orchestrator.yml \
$(SDKP4DIR)/bin/nfp4c -o $(OUTDIR)/orchestrator_speedo.yml \
--p4-version 16 \
--p4-compiler p4c-nfp \
--source_info \
p4src/orchestrator.p4
p4src/orchestrator_speedo.p4
#
......@@ -229,16 +229,16 @@ $(PIFOUTDIR)/pif_pkt_clone%h \
$(PIFOUTDIR)/pif_flcalc%c \
$(PIFOUTDIR)/pif_flcalc%h \
$(PIFOUTDIR)/pif_field_lists%h \
$(PIFOUTDIR)/pif_parrep_pvs_sync%c : $(OUTDIR)/orchestrator%yml $(MAKEFILE_LIST)
$(PIFOUTDIR)/pif_parrep_pvs_sync%c : $(OUTDIR)/orchestrator_speedo%yml $(MAKEFILE_LIST)
@echo ---------
@echo generating pif $@
@echo ---------
@mkdir -p $(PIFOUTDIR)
$(SDKP4DIR)/bin/nfirc -o $(PIFOUTDIR)/ \
--p4info $(OUTDIR)/orchestrator.p4info.json \
--p4info $(OUTDIR)/orchestrator_speedo.p4info.json \
--debugpoints \
--mac_ingress_timestamp \
$(OUTDIR)/orchestrator.yml
$(OUTDIR)/orchestrator_speedo.yml
#
......@@ -707,6 +707,8 @@ $(OUTDIR)/pif_app_nfd.list/pif_app_nfd.list: $(SDKP4DIR)/components/nfp_pif/me/a
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_flcalc_algorithms.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_memops.c \
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c \
p4src/static_dispatch_function.c \
p4src/nic_function_test.c \
$(PIFOUTDIR)/pif_design.h \
$(MAKEFILE_LIST)
@echo ---------
......@@ -815,7 +817,9 @@ $(OUTDIR)/pif_app_nfd.list/pif_app_nfd.list: $(SDKP4DIR)/components/nfp_pif/me/a
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_init.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_flcalc_algorithms.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_memops.c \
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c \
p4src/static_dispatch_function.c \
p4src/nic_function_test.c
#
# APP_MASTER
......
......@@ -4,6 +4,8 @@ compile_flag=0
offload_flag=0
assign_ip_flag=0
location=$(pwd)
nic_function_loc="$location/../repository/nic_functions"
# cfiles=ls $nic_function_loc | egrep "*.c"
while getopts 'coi' flag; do
case "${flag}" in
c) compile_flag=1 ;;
......@@ -15,7 +17,14 @@ done
if [[ $compile_flag -eq 1 ]]
then
# compile the nfp code
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
#sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -c ./p4src/memory.c ./p4src/memory2.c -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
#cd $nic_function_loc
#files=$(./generate_names.sh)
#cd $location
#echo "files : $files"
#sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -c $files -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
fi
if [[ $offload_flag -eq 1 ]]
......@@ -36,5 +45,4 @@ then
docker stop $(docker ps -a -q) || true
#assigning IPs to network interfaces
sudo ./assign_ip.sh
fi
fi
\ No newline at end of file
#! /bin/bash -ex
compile_flag=0
offload_flag=0
assign_ip_flag=0
location=$(pwd)
nic_function_loc="$location/../repository/nic_functions"
# cfiles=ls $nic_function_loc | egrep "*.c"
while getopts 'coi' flag; do
case "${flag}" in
c) compile_flag=1 ;;
o) offload_flag=1 ;;
i) assign_ip_flag=1 ;;
esac
done
if [[ $compile_flag -eq 1 ]]
then
# compile the nfp code
# sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
# for function running in host
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator_speedo.nffw -p ./p4src/out -4 ./p4src/orchestrator_speedo.p4 -c ./p4src/static_dispatch_function.c ./p4src/nic_function_test.c -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
# for function running on nic
# sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator_static.nffw -p ./p4src/out -4 ./p4src/orchestrator_static.p4 -c ./p4src/static_dispatch_function.c ./p4src/nic_function_test.c -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
# sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator_dynamic.nffw -p ./p4src/out -4 ./p4src/orchestrator_dynamic.p4 -c ./p4src/static_dispatch_function.c ./p4src/nic_function_test.c -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
#cd $nic_function_loc
#files=$(./generate_names.sh)
#cd $location
#echo "files : $files"
#sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -c $files -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
fi
if [[ $offload_flag -eq 1 ]]
then
# move to p4 bin
cd /opt/netronome/p4/bin/
# offload
sudo ./rtecli design-load -f $location/p4src/orchestrator_speedo.nffw -c $location/p4src/echo2.p4cfg -p $location/p4src/out/pif_design.json
# sudo ./rtecli design-load -f $location/p4src/orchestrator_static.nffw -c $location/p4src/echo.p4cfg -p $location/p4src/out/pif_design.json
# sudo ./rtecli design-load -f $location/p4src/orchestrator_dynamic.nffw -c $location/p4src/echo.p4cfg -p $location/p4src/out/pif_design.json
# returning back to base
cd $location
fi
if [[ $assign_ip_flag -eq 1 ]]
then
#killing all running containers
docker stop $(docker ps -a -q) || true
#assigning IPs to network interfaces
sudo ./assign_ip.sh
fi
\ No newline at end of file
......@@ -108,7 +108,18 @@ if __name__ == '__main__':
dgdata['count'])
for flddesc, fielddata in zip(dgdata['desc'].fields, values[fldcnt * i:fldcnt * (i + 1)]):
print " %s : %s" % (flddesc.name, fielddata)
print " %s : %s" % (flddesc.name, int(fielddata,base=16))
#################
if(flddesc.name=="ingress::act::tmp_0.time_taken"):
sec = int(fielddata[0:10],base=16)
nsec = int("0x"+fielddata[10:18],base=16)
diff = int(fielddata,base=16)
print "time taken(s): ", sec
print "time taken(ns): ", nsec
print "Total time taken(ns): ", ((sec*1000000000)+nsec)
# print "Total time taken(ns): ", diff/1000000000
################
print "}\n"
dgdata['count'] += 1
......
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/dispatch.pcap
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/p4src
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/receive_reply.py
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/RTEGRPCInterface.py
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/RTEGRPCInterface.pyc
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/RTEInterface.py
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/RTEInterface.pyc
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/RTERPCInterface.py
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/RTERPCInterface.pyc
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/RTEThriftInterface.py
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/RTEThriftInterface.pyc
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/sdk6_cli_manual
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/sdk6_rte_cli.py
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/send_docker.sh
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/smartnic_dispatch_monitor.py
/home/pcube/nilanjan/xanadu/dispatch_system/dispatch_manager/repository/nic_functions/workspace.code-workspace
dispatch.pcap
p4src
receive_reply.py
RTEGRPCInterface.py
RTEGRPCInterface.pyc
RTEInterface.py
RTEInterface.pyc
RTERPCInterface.py
RTERPCInterface.pyc
RTEThriftInterface.py
RTEThriftInterface.pyc
sdk6_cli_manual
sdk6_rte_cli.py
send_docker.sh
smartnic_dispatch_monitor.py
workspace.code-workspace
//=============================================================================================================
#include <stdint.h>
#include <stdlib.h>
#include <nfp/me.h>
#include <nfp/mem_atomic.h>
#include <pif_common.h>
#include "pif_plugin.h"
//=============================================================================================================
extern void function_48725(PIF_PLUGIN_map_hdr_T *mapHdr, PIF_PLUGIN_udp_T *udpHdr);
void default_function(PIF_PLUGIN_map_hdr_T *mapHdr)
{
return ;
}
// __export __gpr uint8_t packet_count=0;
__export __emem uint8_t packet_count;
int pif_plugin_static_dispatch_function(EXTRACTED_HEADERS_T *headers, MATCH_DATA_T *match_data) {
// PIF_PLUGIN_ethernet_T *ipv4_hdr = pif_plugin_hdr_get_ethernet(headers);
// PIF_PLUGIN_ipv4_T *mapHdr = pif_plugin_hdr_get_ipv4(headers);
// PIF_PLUGIN_udp_T *mapHdr = pif_plugin_hdr_get_udp(headers);
PIF_PLUGIN_map_hdr_T *mapHdr = pif_plugin_hdr_get_map_hdr(headers);
PIF_PLUGIN_udp_T *udpHdr = pif_plugin_hdr_get_udp(headers);
packet_count++;
if(packet_count<127)
{
// __gpr uint32_t i = 1, j = 0;
uint32_t fid = mapHdr->function_id;
if(fid==48725)
{
function_48725(mapHdr,udpHdr);
}
else{
default_function(mapHdr);
}
mapHdr->data = 0;
}
// else if(packet_count>16){
// packet_count= packet_count%16;
// mapHdr->data = 21;
// }
else{
mapHdr->data = 21;
}
mapHdr->function_id=packet_count;
return PIF_PLUGIN_RETURN_FORWARD;
}
......@@ -74,7 +74,6 @@
},
"ingress::dispatch": {
"rules": [
],
"default_rule": {
"action": {
......
{
"registers": {
"configs": []
},
"tables": {
"ingress::fwd": {
"rules": [
{
"action": {
"type": "ingress::fwd_act",
"data": {
"port": {
"value": "p0"
}
}
},
"name": "host_to_net",
"match": {
"standard_metadata.ingress_port": {
"value": "v0.0"
}
}
},
{
"action": {
"type": "ingress::fwd_act",
"data": {
"port": {
"value": "v0.1"
}
}
},
"name": "net_to_host",
"match": {
"standard_metadata.ingress_port": {
"value": "p1"
}
}
},
{
"action": {
"type": "ingress::fwd_act",
"data": {
"port": {
"value": "v0.0"
}
}
},
"name": "net_to_host",
"match": {
"standard_metadata.ingress_port": {
"value": "p0"
}
}
},
{
"action": {
"type": "ingress::fwd_act",
"data": {
"port": {
"value": "p1"
}
}
},
"name": "net_to_host",
"match": {
"standard_metadata.ingress_port": {
"value": "v0.1"
}
}
}
]
},
"ingress::dispatch": {
"rules": [
{
"action": {
"type" : "ingress::dispatch_act2",
"data" : {
"dstAddr" : { "value" : "192.168.2.3" },
"ethernetAddr" : { "value" : "00:22:22:22:22:22" },
"dstPort" : { "value" : "30041" },
"egress_port" : { "value" : "v0.1" }
}
},
"name": "dispatch_to_worker5a1",
"match": {
"map_hdr.function_id" : {
"value" : "1"
}
}
},
{
"action": {
"type" : "ingress::dispatch_act2",
"data" : {
"dstAddr" : { "value" : "192.168.2.3" },
"ethernetAddr" : { "value" : "00:22:22:22:22:22" },
"dstPort" : { "value" : "30043" },
"egress_port" : { "value" : "v0.1" }
}
},
"name": "dispatch_to_worker5a1",
"match": {
"map_hdr.function_id" : {
"value" : "48725"
}
}
}
],
"default_rule": {
"action": {
"type" : "ingress::dispatch_act",
"data" : {
"dstAddr" : { "value" : "192.168.2.3" },
"dstPort" : { "value" : "8080" },
"egress_port" : { "value" : "v0.1" },
"ethernetAddr" : { "value" : "00:22:22:22:22:22" }
}
},
"name": "default"
}
}
},
"multicast": {},
"meters": {
"configs": []
}
}
......@@ -22,4 +22,6 @@
#define PKT_INSTANCE_TYPE_COALESCED 10
#define PKT_INSTANCE_TYPE_INGRESS_RECIRC 3
#define PKT_INSTANCE_TYPE_REPLICATION 5
#define PKT_INSTANCE_TYPE_RESUBMIT 4
\ No newline at end of file
#define PKT_INSTANCE_TYPE_RESUBMIT 4
#define EXEC_ON_NIC 0
\ No newline at end of file
......@@ -55,10 +55,11 @@ header tcp_t {
bit<16> urgent_ptr;
}
//changed function_id from 8bit->32bit
header map_hdr_t {
bit<32> chain_id;
bit<32> exec_id;
bit<8> function_id;
bit<32> function_id;
bit<32> data;
bit<8> function_count;
bit<8> f0;
......@@ -74,6 +75,13 @@ struct exec_hdr_t {
bit<8> function;
}
header intrinsic_metadata_t {
bit<64> ingress_global_timestamp;
bit<64> current_global_timestamp;
}
struct metadata {
@name(".ing_metadata")
ingress_metadata_t ing_metadata;
......@@ -81,6 +89,8 @@ struct metadata {
resubmit_meta_t resubmit_meta;
@name(".exec_hdr")
exec_hdr_t exec_hdr;
//@name(".intrinsic_metadata")
//intrinsic_metadata_t intrinsic_metadata;
}
struct headers {
......
//=============================================================================================================
#include <stdint.h>
#include<stdlib.h>
#include <stdlib.h>
#include <nfp/me.h>
#include <nfp/mem_atomic.h>
#include <pif_common.h>
......@@ -12,16 +12,25 @@ int pif_plugin_prime(EXTRACTED_HEADERS_T *headers, MATCH_DATA_T *match_data) {
PIF_PLUGIN_map_hdr_T *mapHdr = pif_plugin_hdr_get_map_hdr(headers);
__gpr uint32_t i = 1, j = 0;
uint32_t length = mapHdr->data;
// uint32_t length = mapHdr->data;
uint32_t length = mapHdr->function_id;
__gpr uint64_t ar[10000];
int idx = -1;
for (; j < length; j++) {
for (;i < 10000; i++) {
idx = rand() % (10001);
ar[idx] = rand();
}
}
mapHdr->data = idx;
unsigned randval;
// for (; j < length; j++) {
// for (;i < 1000; i++) {
// randval = local_csr_read(local_csr_pseudo_random_number);
// // idx = rand() % (10001);
// idx = randval % (10001);
// // ar[idx] = randval;
// // ar[idx] = rand();
// }
// }
// unsigned randval = local_csr_read(local_csr_pseudo_random_number);
// int idx = -1;
// length++;
// mapHdr->data = idx | length;
mapHdr->data = 0;
mapHdr->function_id = length+1;
return PIF_PLUGIN_RETURN_FORWARD;
}
\ No newline at end of file
}
//=============================================================================================================
#include <stdint.h>
#include <stdlib.h>
#include <nfp/me.h>
#include <nfp/mem_atomic.h>
#include <pif_common.h>
#include "pif_plugin.h"
//=============================================================================================================
int pif_plugin_prime2(EXTRACTED_HEADERS_T *headers, MATCH_DATA_T *match_data) {
PIF_PLUGIN_map_hdr_T *mapHdr = pif_plugin_hdr_get_map_hdr(headers);
__gpr uint32_t i = 1, j = 0;
// uint32_t length = mapHdr->data;
uint32_t length = mapHdr->function_id;
__gpr uint64_t ar[10000];
int idx = -1;
unsigned randval;
// for (; j < length; j++) {
// for (;i < 1000; i++) {
// randval = local_csr_read(local_csr_pseudo_random_number);
// // idx = rand() % (10001);
// idx = randval % (10001);
// // ar[idx] = randval;
// // ar[idx] = rand();
// }
// }
// unsigned randval = local_csr_read(local_csr_pseudo_random_number);
// int idx = -1;
// length++;
// mapHdr->data = idx | length;
mapHdr->data = 0;
mapHdr->function_id = length+2;
return PIF_PLUGIN_RETURN_FORWARD;
}
\ No newline at end of file
//=============================================================================================================
#include <stdint.h>
#include <stdlib.h>
#include <nfp/me.h>
#include <nfp/mem_atomic.h>
#include <pif_common.h>
#include "pif_plugin.h"
//=============================================================================================================
//ADD_FUNCTION
void function_48725(PIF_PLUGIN_map_hdr_T *mapHdr, PIF_PLUGIN_udp_T *udpHdr){
__gpr uint16_t i = 1;
unsigned randval;
for (;i < 1500000; i++);
mapHdr->function_id += 3;
// PIF_PLUGIN_map_hdr_T *mapHdr = pif_plugin_hdr_get_map_hdr(headers);
randval=local_csr_read(local_csr_pseudo_random_number);
i = randval % (10);
udpHdr->dstPort = 10000+i;
}
......@@ -5,10 +5,16 @@
#include "includes/headers.p4"
#include "includes/parsers.p4"
extern void prime();
//extern void prime();
//extern void prime2();
control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_t standard_metadata) {
register<bit<8>>(1) function_id_check;
register<bit<64>>(1) fwd_checks;
bit<8> pc;
bit<64> pc2;
@name(".fwd_act") action fwd_act(bit<16> port) {
standard_metadata.egress_spec = port;
}
......@@ -26,25 +32,46 @@ control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_
hdr.ipv4.dstAddr = dstAddr;
hdr.udp.dstPort = dstPort;
hdr.ethernet.dstAddr = ethernetAddr;
//prime();
}
@name(".prime1_act") action prime1_act(bit<32> dstAddr, bit<16> dstPort, bit<48> ethernetAddr , bit<16> egress_port) {
hdr.ipv4.dstAddr = dstAddr;
hdr.udp.dstPort = dstPort;
hdr.ethernet.dstAddr = ethernetAddr;
//prime();
}
@name(".prime2_act") action prime2_act(bit<32> dstAddr, bit<16> dstPort, bit<48> ethernetAddr , bit<16> egress_port) {
hdr.ipv4.dstAddr = dstAddr;
hdr.udp.dstPort = dstPort;
hdr.ethernet.dstAddr = ethernetAddr;
//prime2();
}
@name(".dispatch") table dispatch {
actions = {
dispatch_act;
//prime1_act;
//prime2_act;
}
key = {
hdr.map_hdr.function_id : exact;
}
}
apply {
if (hdr.ipv4.isValid() && hdr.udp.dstPort == DISPATCHER_PORT) {
//function_id_check.read(pc,0);
//pc = 8w2;
//pc = hdr.map_hdr.function_id;
//function_id_check.write(0,pc);
dispatch.apply();
fwd.apply();
} else {
fwd.apply();
}
fwd_checks.read(pc2,0);
pc2 = pc2 + 1;
fwd_checks.write(0,pc2);
}
}
......
......@@ -37,7 +37,7 @@
{
"id": 1,
"name": "map_hdr.function_id",
"bitwidth": 8,
"bitwidth": 32,
"matchType": "EXACT"
}
],
......
......@@ -33,6 +33,11 @@ ing_metadata:
- _padding: 2
type: metadata
ingress::act::scalars:
fields:
- pc2: 64
type: metadata
ipv4:
calculated_fields:
- condition: valid(ipv4)
......@@ -62,7 +67,7 @@ map_hdr:
fields:
- chain_id: 32
- exec_id: 32
- function_id: 8
- function_id: 32
- data: 32
- function_count: 8
- f0: 8
......@@ -126,6 +131,20 @@ dispatch_state:
instance_count: 16384
type: register
function_id_check:
class: global
fields:
- value: 8
instance_count: 1
type: register
fwd_checks:
class: global
fields:
- value: 64
instance_count: 1
type: register
##########################################
# Field list definitions #
......@@ -232,6 +251,16 @@ parser:
type: parser
##########################################
# Action Expressions #
##########################################
_expression_act_0:
expression: ((((ingress::act::scalars.pc2) + (0x0000000000000001))) & (0xffffffffffffffff))
format: bracketed_expr
type: expression
##########################################
# Action sets #
##########################################
......@@ -239,7 +268,16 @@ parser:
egress::fix_checksum:
implementation: modify_field(udp.checksum, 0x0000);
src_filename: p4src/orchestrator.p4
src_lineno: 66
src_lineno: 93
type: action
ingress::act:
implementation: |-
register_read(ingress::act::scalars.pc2, fwd_checks.value, 0x00000000);
modify_field(ingress::act::scalars.pc2, _expression_act_0);
register_write(fwd_checks.value, 0x00000000, ingress::act::scalars.pc2);
src_filename: ''
src_lineno: 1
type: action
ingress::dispatch_act:
......@@ -253,7 +291,7 @@ ingress::dispatch_act:
- ethernetAddr: 48
- egress_port: 16
src_filename: p4src/orchestrator.p4
src_lineno: 25
src_lineno: 31
type: action
ingress::fwd_act:
......@@ -261,7 +299,7 @@ ingress::fwd_act:
parameter_list:
- port: 16
src_filename: p4src/orchestrator.p4
src_lineno: 12
src_lineno: 18
type: action
......@@ -287,7 +325,7 @@ ingress::dispatch:
map_hdr.function_id: exact
max_entries: 1025
src_filename: p4src/orchestrator.p4
src_lineno: 31
src_lineno: 51
type: table
ingress::fwd:
......@@ -297,7 +335,18 @@ ingress::fwd:
standard_metadata.ingress_port: exact
max_entries: 1025
src_filename: p4src/orchestrator.p4
src_lineno: 16
src_lineno: 22
type: table
ingress::tbl_act:
allowed_actions:
- ingress::act
default_entry:
action: ingress::act
const: true
max_entries: 1025
src_filename: ''
src_lineno: 1
type: table
......@@ -309,7 +358,7 @@ _condition_0:
condition: (((valid(ipv4))) and (((udp.dstPort) == (8000))))
format: bracketed_expr
src_filename: p4src/orchestrator.p4
src_lineno: 41
src_lineno: 62
type: conditional
......@@ -324,7 +373,8 @@ ingress_flow:
digraph {
"_condition_0" -> "ingress::fwd" [condition = false]
"_condition_0" -> "ingress::dispatch" [condition = true]
"ingress::fwd" -> "exit_control_flow" [action = always]
"ingress::fwd" -> "ingress::tbl_act" [action = always]
"ingress::tbl_act" -> "exit_control_flow" [action = always]
"ingress::dispatch" -> "ingress::fwd" [action = always]
}
start_state: _condition_0
......@@ -377,7 +427,7 @@ layout:
##########################################
source_info:
date: 2021/09/11 07:07:44
date: 2021/12/15 05:37:07
output_file: p4src/orchestrator.yml
p4_version: '16'
source_files:
......
{
"tables": [
{
"preamble": {
"id": 33595533,
"name": "fwd",
"alias": "fwd"
},
"matchFields": [
{
"id": 1,
"name": "standard_metadata.ingress_port",
"bitwidth": 16,
"matchType": "EXACT"
}
],
"actionRefs": [
{
"id": 16805069
},
{
"id": 16800567,
"annotations": [
"@defaultonly()"
]
}
],
"size": "1024"
},
{
"preamble": {
"id": 33612818,
"name": "dispatch",
"alias": "dispatch"
},
"matchFields": [
{
"id": 1,
"name": "map_hdr.function_id",
"bitwidth": 32,
"matchType": "EXACT"
}
],
"actionRefs": [
{
"id": 16786857
},
{
"id": 16781343
},
{
"id": 16819805
},
{
"id": 16800567,
"annotations": [
"@defaultonly()"
]
}
],
"size": "1024"
}
],
"actions": [
{
"preamble": {
"id": 16800567,
"name": "NoAction",
"alias": "NoAction"
}
},
{
"preamble": {
"id": 16805069,
"name": "fwd_act",
"alias": "fwd_act"
},
"params": [
{
"id": 1,
"name": "port",
"bitwidth": 16
}
]
},
{
"preamble": {
"id": 16786857,
"name": "dispatch_act",
"alias": "dispatch_act"
},
"params": [
{
"id": 1,
"name": "dstAddr",
"bitwidth": 32
},
{
"id": 2,
"name": "dstPort",
"bitwidth": 16
},
{
"id": 3,
"name": "ethernetAddr",
"bitwidth": 48
},
{
"id": 4,
"name": "egress_port",
"bitwidth": 16
}
]
},
{
"preamble": {
"id": 16781343,
"name": "prime1_act",
"alias": "prime1_act"
},
"params": [
{
"id": 1,
"name": "dstAddr",
"bitwidth": 32
},
{
"id": 2,
"name": "dstPort",
"bitwidth": 16
},
{
"id": 3,
"name": "ethernetAddr",
"bitwidth": 48
},
{
"id": 4,
"name": "egress_port",
"bitwidth": 16
}
]
},
{
"preamble": {
"id": 16819805,
"name": "prime2_act",
"alias": "prime2_act"
},
"params": [
{
"id": 1,
"name": "dstAddr",
"bitwidth": 32
},
{
"id": 2,
"name": "dstPort",
"bitwidth": 16
},
{
"id": 3,
"name": "ethernetAddr",
"bitwidth": 48
},
{
"id": 4,
"name": "egress_port",
"bitwidth": 16
}
]
},
{
"preamble": {
"id": 16836513,
"name": "swap_addr",
"alias": "swap_addr"
}
},
{
"preamble": {
"id": 16841338,
"name": "fix_checksum",
"alias": "fix_checksum"
}
}
]
}
This diff is collapsed.
//=============================================================================================================
#include <stdint.h>
// #include <include/nfp6000/nfp_me.h>
// #include <lib/nfp/mem_atomic.h>
#include <nfp/me.h>
#include <nfp/mem_atomic.h>
#include <pif_common.h>
......
//=============================================================================================================
#include <stdint.h>
#include <stdlib.h>
#include <nfp/me.h>
#include <nfp/mem_atomic.h>
#include <pif_common.h>
#include "pif_plugin.h"
//=============================================================================================================
// __export __emem uint32_t pkt_cntrs;
__export __emem uint8_t pkt_cntrs[20];
int pif_plugin_countpacket(EXTRACTED_HEADERS_T *headers, MATCH_DATA_T *match_data) {
PIF_PLUGIN_map_hdr_T *mapHdr = pif_plugin_hdr_get_map_hdr(headers);
// uint32_t fid = mapHdr->function_id;
int fid = mapHdr->function_id;
// __gpr uint64_t ar[10000];
// int idx = -1;
pkt_cntrs[fid%20]++;
// pkt_cntrs++;
mapHdr->data = 0;
mapHdr->function_id = fid+1;
return PIF_PLUGIN_RETURN_FORWARD;
}
//=============================================================================================================
#include <stdint.h>
// #include <include/nfp6000/nfp_me.h>
// #include <lib/nfp/mem_atomic.h>
#include <nfp/me.h>
#include <nfp/mem_atomic.h>
#include <pif_common.h>
#include "pif_plugin.h"
//=============================================================================================================
int pif_plugin_simple_test(EXTRACTED_HEADERS_T *headers, MATCH_DATA_T *match_data) {
PIF_PLUGIN_map_hdr_T *mapHdr = pif_plugin_hdr_get_map_hdr(headers);
// __gpr uint32_t i = 1;
//uint32_t prime = mapHdr->data;
uint32_t prime = 3;
// uint32_t j = 2;
// uint32_t x = 1;
// __gpr uint32_t calc_prime = 1;
// short b = 0;
// for (;i < prime; i++) {
// b = 0;
// for (j = 2; j <= (i / 2); j++) {
// if (i % j == 0) {
// b = 1;
// break;
// }
// }
// if (b == 0)
// calc_prime = i;
// }
// mapHdr->data = prime;
return PIF_PLUGIN_RETURN_FORWARD;
}
\ No newline at end of file
......@@ -4,7 +4,7 @@ import time
import thread
import argparse
NC_PORT = 8081
NC_PORT = 30041
REPLY_PORT = 9000
SERVER_IP = "192.168.2.2"
......@@ -42,8 +42,8 @@ while True:
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack("B", packet[base])[0]
base += 1
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
......
This diff is collapsed.
......@@ -74,7 +74,7 @@ def receive():
def genPacket():
global fid
global fid, dataInt
packet = None
exec_id = random.randint(0, 2 ** 30)
chain_id = 1
......@@ -86,7 +86,7 @@ def genPacket():
f2 = 2
f3 = 0
f4 = 0
print "genp_Data : ",chain_id, exec_id, "function_id", function_id, function_count, \
print "genp_Data : ",chain_id, exec_id, dataInt, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
offload_status = False
chain_id = struct.pack(">I", chain_id) # chain id
......
echo $1
# python2 send.py --client-port 8000 --closed 1 --offload 0 --req-count 50 --send-data 10 --fid $1
sudo ip netns exec ns_server python2 send.py --client-port 8000 --closed 0 --offload 0 --req-count 10 --send-data 0 --fid $1
sudo ip netns exec ns_server python2 send.py --client-port 8000 --closed 0 --req-count 10 --send-data 10 --fid $1
......@@ -16,7 +16,7 @@ def makeRule(ip, port, mac, functionHash, tableId, rule_name, default_rule):
actions = '{ "type" : "ingress::dispatch_act", "data" : { "dstAddr" : { "value" : "%s" }, \
"dstPort" : { "value" : "%d" } , "egress_port": { "value": "v0.1" }, "ethernetAddr": { "value": "%s" } } }' \
% (ip, int(port), mac)
match = '{ "map_hdr.function_id" : { "value" : %d} } ' % (functionHash)
match = '{ "map_hdr.function_id" : { "value" : %d} }' % (functionHash)
rule = {
"tableId": tableId,
"rule_name": rule_name,
......
../detect-libc/bin/detect-libc.js
\ No newline at end of file
../mime/cli.js
\ No newline at end of file
../mkdirp/bin/cmd.js
\ No newline at end of file
../mqtt/mqtt.js
\ No newline at end of file
../mqtt/bin/pub.js
\ No newline at end of file
../mqtt/bin/sub.js
\ No newline at end of file
../prebuild-install/bin.js
\ No newline at end of file
../rc/cli.js
\ No newline at end of file
../semver/bin/semver
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment