Commit 10d7b446 authored by Mahendra Patel's avatar Mahendra Patel

rechecking workenv

parent 27a14505

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.

*repository/*.js
bitnami*
node_modules
package-lock.json
firecracker*
secrets.json
resource_system/bin/**
resource_system/version.linux
local_experiments/
.vscode
p4src/Makefile-nfp4build
p4src/app_master.list/
p4src/blm0.list/
p4src/echo.nffw
p4src/echo.yml
p4src/flowcache_timeout_emu0.list/
p4src/gro0.list/
p4src/gro1.list/
p4src/nbi_init_csr.list/
p4src/nfd_pcie0_notify.list/
p4src/nfd_pcie0_pci_in_gather.list/
p4src/nfd_pcie0_pci_in_issue0.list/
p4src/nfd_pcie0_pci_in_issue1.list/
p4src/nfd_pcie0_pci_out_me0.list/
p4src/nfd_pcie0_pd0.list/
p4src/nfd_pcie0_pd1.list/
p4src/nfd_pcie0_sb.list/
p4src/nfd_svc.list/
p4src/out/
p4src/pif_app_nfd.list/
client/Makefile-nfp4build
*.list
p4src/out_dir
*.nffw
[submodule "resource_system/src/common/cJSON"]
path = resource_system/src/common/cJSON
url = https://github.com/DaveGamble/cJSON
[submodule "resource_system/src/common/nlib"]
path = resource_system/src/common/nlib
url = https://github.com/namandixit/nlib
[submodule "resource_system/src/common/inih"]
path = resource_system/src/common/inih
url = https://github.com/benhoyt/inih
This diff is collapsed.
[Kafka]
Address = 10.129.6.5:9092
[Arbiter]
MessageReadGap = 10
GruntTimeToDie = 10000
GruntResponseWaitTime = 100
[Grunt]
MessageReadGap = 10
HeartbeatGap = 1000
\ No newline at end of file
Design decisions to be considered:
1. Container: copy executable file inside prebuilt images.
2. Container: run warm request based container server instead of current arch
possibly will require request redirection.
3. Move away from mosquitto to Apache Kafka / Zookeeper
4. Implement heartbeat based daemon monitor.
<mxfile host="www.draw.io" modified="2020-02-16T14:55:40.736Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36" etag="T5VdX2IN53B3V4cbXLc2" version="12.7.0" type="device"><diagram id="Pc5nJqTTz6jn2qa6debb" name="Page-1">7V1bc6M4Fv41qZp5SApx53Fym85MZ9fb2Z2ePMog20xj5BVyLvPrVwJhg6Q4dhuQSW+6qssIhM13vnPT5XDmXC1ffiVwtbjHCcrObCt5OXOuz2w7sB32P294rRo8y7OrljlJk6oNbBse0r+RaLRE6zpNUNG6kGKc0XTVboxxnqOYttogIfi5fdkMZ+1vXcE5UhoeYpiprV/ThC6qVsexrO2JTyidL8RXu34ouixhfbW4tFjABD83mpybM+eKYEyrT8uXK5Rx9Gpgqn63b5zd/DKCcrpPh9+vXkDk58/3698ew/nk8Z/nk7/OgRVW93mC2Vo8s/i59LUGgeB1niB+G3DmXD4vUooeVjDmZ5+Z2Fnbgi4zcbqgBH/bgMVbElgsNr35wQRSikhettiWy1rVZxGP94QIRS+NJvFsvyK8RJS8skvE2SgUONdUq46et1ILXHHFoimwSDRCwZT55s5bLNkHAedB0PoKtABcsIa7fIbJsmCfvtzzb57iNeXkXOcxTXHO9QWtMvy65GiMUhhMQVrCqHWgIY0N8C1pBH1Jw1WARAnTdHGICV3gOc5hdrNtvdxCbbGj7TWfMV4JBP9ClL4KswXXFLfhZwCS1z9F//LgkR9cePXh9Uvz5PVrffSS0j+3V7Kjx8aZbSd+UPdpi9qWRf2mUAu8JjHaxWNhdiGZI7rjOmHqOaw7KUJQBmn61DawOnmLrhOccjWoqeVK1HLs9h2q5xGdtqT5hRD42rhsxS8o9v+ac2FQthys7rhl5OYRv5+kwfu2eADWGmeS1zWTjrPjilCu02IFabxgrfcwZxEAYZ9+ui5tNvf8OFln6OfjLPcszbIrnGFS9nUSD4WJuxFO40xoTx3f78hsB5JuqU4U2Bqz7ffmQ50xme2Wmqha9P2KY++pOHXUfCKaE4Vjkl5vZm9f6YVhLx70YM8H2kYAeJbEhB48HwDACFe+I9Da8uvC84M2xywQvMOy8miCSMoQQ+TjGo5OqGe7UkorXR844a7r+6GqvWc8cPOC4jVFvLGHgGA2m9lxrAsIEn/qe10FBJ4UEAA1kRs2InAU9G/TjIP8QDHhgzn819yy/z8xnSrhFs1dYg+TAGqxn7q+11UO7ckZiGc6GvMU7D/jGPJud8sK+i9onhb1V40Lbj84NbjV4SP7QhCdYy14D/OE5xyIwgRSqADPnp620YVZOufDPTHDinugS45RyuT4izixTJOkcq+oSP+G0/JW3E0J883u612eedf8XsyjFpVz3Rkj7S8GW7I45zqLE2rEYPclBt0AqZ9xUJP0iX2c848Ol8zlOs24MFKhDc8pXZT2qLp8Suqr6xb2exr30Nw2QSuUoDhFxXH3qUhSVMzR9xk5bzzLa/Em0NBGN/7r9UWbenjTVPKzzXcem+eGTl1r9X0/+3FPIgQNJK/rertDUGCFOzv0E4NG0f/JdQi57NPgliVF09FubsnpkHR9T5m4pfi7L0hIopHfdDm2CVE408Z3fhyi6ayf+M4BpuM74CpIexdlEP3fNSo4D8rpQdXtzzApubqRigg0EuZ3CUq0fe4KzKldzi5m6IlP04/c47uS4bVDjcsPhnT5vplJRjGGVX9ujHYeMFl4hBH29jTCfudW+Dj1U9PZSkm4Rv1x36mJm82Q/8ZoTRBNLasbjQDytHtg2sQFhoPgY2cAjtAKf1+t6HzU/zitUEcdJgTHqCj60IohxjCZVrRTQ2B8ZMdWYyxp9rhbnMMY6XGehh7PV/rBmaVCpnHWzN1/AJxdCWfzfLY/JM6ehLNvHGd19uMj4OxLOBsPW2w1M7vCOYVpjkgxSogDCeLQNMTOsC5wmPVTigt0jFPZGdYFDoWz5AId83we1gUOhbPkAp3IOM7DusChcJZcoGsZx1l1gR8BZ8kPusYHgR11FOo2JSgmMP5WriP6477biGMopEMJac2GnIGXU6iesLfxvmGmNIAnZyjGMVa94Ogxlkyz8TXxvuoBBcYdG4qB8JVNsnF8B5wXQICZ40CHceQHDuzKGMt2wvwIqK8OM48eZMlQAOPDRb66n2z0IEvWAhhPsH11faCYMOnYHg+FsBS4AeOpdTBg4Dbz+D+t0yv/eA+c00Z79dcN+FG9aLDG3ni6HQxop81iDyxpdZZtPAcPIwV8BXAj+4HfhPvdafFAuCQDm+GO04MBXalpPZA3YRkfIwGWEd4ftTlwsw7F8NbhWo9Gtuk+VKOqxi47lpou+UrGDpVuCuMwcXRKZzuu6yU9KRcwrlz1qskfb4lWsK9qgM51o+x66PJxYMtVdbxWTZL3OwBrgCImdQGs018FOy4S+idCQu9QEkod6oSyXxKaiRjGRCj7NDZcHU+oeidHv4QaVSUaI4RyPoqbdL0hCKVOM5hgmBGmdL/Xc98Q2L/JJ4t/Wb/ffj7/isjlb7eTyV2d+hjKLg8IWPbKEiVQjxHlsaqrqFpYd5GXLte3qKhzdPW+t76n3+p9as76CRe04wWjhgeHbMm2DrnkQKu8RrKN4RSxNpV7lnAa2tlGcjXOdyo2vXN9T65WXctW7qwd8TCSrQ+BzQ0jReoknc83M/8bZt/41jGmQWI/c426VW1j5iayxL8cS5dEMbY9yYG0Llm36s3WlSHprXpNNJo9yY3qEo0yelVPJ3TeyXi0dfW6DG4jZ19D3HVs+12GWc5pvLB/OxuZCaRPIEfemxx21+Q4zjioSwn/U3Rc3cNwwCpPZtZqYc5RqisqNAW7yvcEVLKwCpQnR9b/arxa4Ii7lD58yme8q/cT8FIjH7KYmDJvUr+1pMkaoGFNf9XE1PUHboMipCoZs5slDQnF7Gfxr+yMEKgqdvpRCQGkZYeb97+8Z0Z6IwSof0GDEUFVPWhTFmhZF2vagxEE0TXJOYFgkjCIi5+Kn9kBnvEHqkYxDiFFUu8Vqi1Pebt9iTZytrhu1GaLbhmfroZlj2xR0zON6MKLu/yJOelCCJ1rNyzeqCi1NyE0CzeqdPCIe6JNceWta/tR2CUv0NU5p0FLXQJLjSOjyhbFiAWxxU4xiWqpI5cKsKT6cq7GQ+ik4vQnFXXYSxdpWlxS9d7Og+rJbpTwB9E8sBHgjsorutda9ShjNS68hwUtg8J/4GT8euX4klpFGr2yNJj3NqIGNBUyv2LybYO5OuEzNtCVZYqebi18X6hrp3lsBXP/gj2E9YBKU3SF81lKlvDNJLcMZBslM9Ur4BNMMzhNs5S+noIEFXFphPqmBEO51KxuW6XTU5CgFaDmZYhWKcGN7jxQSOhWluOXgbwOxdeYLq8bJWKH2zeJVuO52xeyOjf/Aw==</diagram></mxfile>
\ No newline at end of file
bitnami*
node_modules
package-lock.json
firecracker*
secrets.json
grunt
.clinic
rm_dummy.js
metrics_gatherer.js
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
echo $1
# python2 send.py --client-port 8000 --closed 1 --offload 0 --req-count 50 --send-data 10 --fid $1
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid 369020 --c 1 --t 1 --n 2
#! /bin/bash -ex
rps_flag=0
n_flag=0
while getopts 'rn' flag; do
case "${flag}" in
r) rps_flag=1 ;;
n) n_flag=1 ;;
esac
done
echo $1, $2, $3
if [[ $rps_flag -eq 1 ]]
then
sudo ip netns exec ns_server python benchmark_dispatcher.py --fid $2 --c 50 --t 30 --rps $3
fi
if [[ $n_flag -eq 1 ]]
then
sudo ip netns exec ns_server python benchmark_dispatcher.py --fid $2 --c 50 --t 100 --n $3
fi
import socket
import struct
import time
import threading
import random
import time
import numpy as np
import argparse
import csv
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--t', help='Runtime',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
runtime = args.t
concurrency = args.c
SERVER_IP = "192.168.2.3"
# packet_holder = [None] * 11
packet_holder = [[] for i in range(12)]
ingress_time = {}
stop_thread = False
def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 10000 + i
#print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# s.setblocking(0)
s.bind((CLIENT_IP, port))
# s.setblocking(0)
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
print "stop thread r"
break
packet, addr = s.recvfrom(1024)
#print "packet received : ", packet
packet_holder[i].append((packet, time.time() ))
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
# print chain_id, exec_id, "function_id", function_id, function_count, \
# f0, f1, f2, f3, f4,
dataInt =1
autoscaling = 1; fno = 255
print(chain_id , exec_id , function_id , dataInt , function_count , autoscaling , fno)
chain_id = struct.pack(">I", chain_id) # chain id
exec_id_packed = struct.pack(">I", exec_id) # execution id
# print " dataInt", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
autoscaling = struct.pack("B", autoscaling) # f2 -> f0
fno = struct.pack("B", fno) # f3 -> f1 f2
# packet = chain_id + exec_id_packed + function_id + data + function_count + autoscaling + fno + f0 + f1 + f2 + f3 + f4
packet = chain_id + exec_id_packed + function_id + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, exec_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
if time.time() - start_time > runtime:
break
packet, exec_id = genPacket()
if exec_id in ingress_time:
continue
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time()
time.sleep(sleep_time)
def send():
global egress_time, ingress_time, concurrency, runtime, stop_thread
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("Sending packet to %s at port %s" % (SERVER_IP, PORT))
print("Runtime: %d Concurrency %d" % (runtime, concurrency))
print("chain id, exec id, data, function count, functions dependencies...")
# op = struct.unpack("B", packet[0])
cnt = 0
if args.n is not None:
for i in range(args.n):
packet, exec_id = genPacket()
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time() * 1000
print("send", "{0:f}".format(ingress_time[exec_id]))
cnt +=1
print("cnt request send : ", cnt)
elif args.rps is not None:
start_time = time.time()
sleep_time = concurrency / float(args.rps)
print("calculated inter-arrival time, offload mode", sleep_time)
for i in range(concurrency):
t = threading.Thread(target=sendThread, args=[start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
print "stoppping thread"
stop_thread = True
print "thread stopped"
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
def printStatistics():
global runtime
e2e_time = []
for packetThread in packet_holder:
for packetTuple in packetThread:
packet = packetTuple[0]
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
#print e2e_time
data = np.array(e2e_time, dtype=float)
np.savetxt("bm_static_1.csv", data, delimiter=' ', header='')
p50 = np.percentile(data, 50)
p95 = np.percentile(data, 95)
p99 = np.percentile(data, 99)
mean = np.mean(data)
print("mean \t p50 \t p95 \t p99")
print(mean, p50, p95, p99)
fields=[args.rps, mean, len(e2e_time) / runtime, len(ingress_time), p50, p95, p99]
with open('speedo_data_static2_1f_host.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
r=None
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
r.start()
time.sleep(1)
send()
time.sleep(170)
# r.join()
printStatistics()
#print "packet holder : ",packet_holder
#print "ingress_time : ",ingress_time
import socket
import struct
import time
import threading
import random
import time
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--req_count', help='request count',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
SERVER_IP = "192.168.2.3"
egress_time = []
ingress_time = []
stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 10001))
print "listening to {} at port {}".format(CLIENT_IP, 10001)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
print "received packet : ",packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
t = int(time.time() * 1000) % 1000000000
# data = int(data) - t
print "recvied data : , chain_id, exec_id, data, function_id, function_count"
print "recvied data : ", chain_id, exec_id, data, function_id, function_count
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
print exec_id
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
# dataInt = int(time.time() * 1000) % 1000000000
dataInt = 21
print "data : ", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id_packed = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id_packed + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, function_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
packet, function_id = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
# op = struct.unpack("B", packet[0])
packet, _ = genPacket()
if args.n is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
elif args.rps is not None:
runtime = 10
thread_count = args.c
start_time = time.time()
sleep_time = thread_count / float(args.rps)
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(2)
send()
time.sleep(5)
# r.join()
import socket
import struct
import time
import threading
import random
import time
import numpy as np
import argparse
import csv
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--t', help='Runtime',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
runtime = args.t
concurrency = args.c
SERVER_IP = "192.168.2.3"
# packet_holder = [None] * 11
packet_holder = [[] for i in range(12)]
ingress_time = {}
stop_thread = False
def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 20000 + i
#print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# s.setblocking(0)
s.bind((CLIENT_IP, port))
# s.setblocking(0)
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
print "stop thread r"
break
packet, addr = s.recvfrom(1024)
#print "packet received : ", packet
packet_holder[i].append((packet, time.time() ))
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
# print chain_id, exec_id, "function_id", function_id, function_count, \
# f0, f1, f2, f3, f4,
chain_id = struct.pack(">I", chain_id) # chain id
exec_id_packed = struct.pack(">I", exec_id) # execution id
dataInt =1
# print " dataInt", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id_packed + function_id + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, exec_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
if time.time() - start_time > runtime:
break
packet, exec_id = genPacket()
if exec_id in ingress_time:
continue
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time()
time.sleep(sleep_time)
def send():
global egress_time, ingress_time, concurrency, runtime, stop_thread
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("Sending packet to %s at port %s" % (SERVER_IP, PORT))
print("Runtime: %d Concurrency %d" % (runtime, concurrency))
print("chain id, exec id, data, function count, functions dependencies...")
# op = struct.unpack("B", packet[0])
if args.n is not None:
for i in range(args.n):
packet, exec_id = genPacket()
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time() * 1000
print("send", "{0:f}".format(ingress_time[exec_id]))
elif args.rps is not None:
start_time = time.time()
sleep_time = concurrency / float(args.rps)
print("calculated inter-arrival time, offload mode", sleep_time)
for i in range(concurrency):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
print "stoppping thread"
stop_thread = True
print "thread stopped"
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
def printStatistics():
global runtime
e2e_time = []
for packetThread in packet_holder:
for packetTuple in packetThread:
packet = packetTuple[0]
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
#print e2e_time
data = np.array(e2e_time, dtype=float)
np.savetxt("bm_static_1.csv", data, delimiter=' ', header='')
p50 = np.percentile(data, 50)
p95 = np.percentile(data, 95)
p99 = np.percentile(data, 99)
mean = np.mean(data)
print("mean \t p50 \t p95 \t p99")
print(mean, p50, p95, p99)
fields=[args.rps, mean, len(e2e_time) / runtime, len(ingress_time), p50, p95, p99]
with open('speedo_data_static2_func2_nic2.csv', 'a') as f:
writer = csv.writer(f)
writer.writerow(fields)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
r=None
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
r.start()
time.sleep(1)
send()
time.sleep(170)
# r.join()
printStatistics()
#print "packet holder : ",packet_holder
#print "ingress_time : ",ingress_time
import socket
import struct
import time
import threading
import random
import time
import numpy as np
import argparse
import signal
parser = argparse.ArgumentParser(description='Mininet demo')
packet_holder = [None] * 11
ingress_time = {}
stop_thread = False
runtime = 10
def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 10000 + i
print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, port))
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
packet_holder[i].append((packet, time.time() ))
print("packet received : ", packet, addr)
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
def printStatistics():
global runtime
e2e_time = []
for packetThread in packet_holder:
for packetTuple in packetThread:
packet = packetTuple[0]
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
print "print stat : ",packetTuple[1]
# ,ingress_time[exec_id]
# print "e2e time : ",(packetTuple[1] - ingress_time[exec_id])* 1000
# e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
# data = np.array(e2e_time, dtype=float)
# p50 = np.percentile(data, 50)
# p95 = np.percentile(data, 95)
# p99 = np.percentile(data, 99)
# mean = np.mean(data)
# print("mean \t p50 \t p95 \t p99")
# print(mean, p50, p95, p99)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
ri = []
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
r.start()
ri.append(r)
def signal_handler(sig, frame):
global stop_thread
print "sigint"
stop_thread = True
print "here"
time.sleep(15)
printStatistics()
echo $1
# python2 send.py --client-port 8000 --closed 1 --offload 0 --req-count 50 --send-data 10 --fid $1
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid 369020 --c 1 --t 1 --n 2
# sudo ip netns exec ns_server python benchmark_dispatcher2.py --fid $1 --c 1 --rps 2 --req_count 10
sudo ip netns exec ns_server python benchmark_dispatcher_func2.py --fid $1 --c 20 --t 300 --rps $2
\ No newline at end of file
import socket
import struct
import time
import threading
import random
import time
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--req_count', help='request count',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
SERVER_IP = "192.168.2.3"
# egress_time = []
# ingress_time = []
egress_time={}
ingress_time={}
stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 10001))
print "listening to {} at port {}".format(CLIENT_IP, 10001)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
print "received packet : ",packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
t = int(time.time() * 1000) % 1000000000
# data = int(data) - t
egress_time[exec_id] = time.time()
print "recvied data : , chain_id, exec_id, data, function_id, function_count"
print "recvied data : ", chain_id, exec_id, data, function_id, function_count
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
print exec_id
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
# dataInt = int(time.time() * 1000) % 1000000000
dataInt = 21
print "data : ", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id_packed = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id_packed + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, function_id, exec_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
packet, function_id,exec_id = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id]=time.time()
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
# op = struct.unpack("B", packet[0])
packet, _ , exec_id = genPacket()
if args.n is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time()
elif args.rps is not None:
runtime = 10
thread_count = args.c
start_time = time.time()
sleep_time = thread_count / float(args.rps)
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(2)
send()
time.sleep(5)
# r.join()
-1.646746577144457500e+15
-1.646746577144559500e+15
{
"registry_url": "10.129.6.5:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"env": "env_cpp.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "127.0.0.1:9092"
},
"external": {
"kafka_host": "127.0.0.1:29092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON",
"test": "test"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"aggressivity": 1,
"id_size": 20
}
\ No newline at end of file
{
"registry_url": "10.129.2.201:5000/",
"master_port": 8080,
"master_address": "10.129.2.201",
"daemon_port": 9000,
"daemon_mac": "00:22:22:22:22:22",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.2.201:5984",
"env": "env_udp2.js",
"runtime": "process",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "xanadu_kafka_serverless",
"use_bridge": false,
"internal": {
"kafka_host": "10.129.2.201:9092"
},
"external": {
"kafka_host": "10.129.2.201:9092"
}
},
"topics": {
"request_dm_2_rm": "request2",
"heartbeat": "heartbeat4",
"deployed": "deployed3",
"remove_worker": "removeWorker2",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY2",
"hscale": "hscale2",
"metrics_worker": "metrics_worker2",
"log_channel": "LOG_COMMON2",
"coldstart_worker": "COLDSTART_WORKER2",
"check_autoscale": "CHECK_AUTOSCALE2",
"autoscale": "AUTOSCALE2",
"function_load": "FUNCTION_LOAD3",
"update_function_instance_nic": "UPDATE_FUNCTION_INSTANCE_NIC",
"remove_function_intstance": "REMOVE_FUNCTION_INSTANCE2"
},
"autoscalar_metrics": {
"high_open_request_threshold": 10,
"low_open_request_threshold": 1,
"function_load_threshold": 5,
"low_load_count":5,
"high_load_count":5
},
"metrics": {
"alpha": 0.7
},
"heartbeat_threshold": 5000,
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
{
"registry_url": "localhost:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "localhost:5984",
"env": "env_udp2.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "xanadu_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "10.129.2.201:9092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"metrics_worker": "metrics_worker",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
{
"registry_url": "10.129.6.5:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "10.129.6.5:9092"
},
"external": {
"kafka_host": "10.129.6.5:9092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
\ No newline at end of file
#! /bin/bash -x
echo "before deletetion list of kafka topic"
echo "--------------------------------------"
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --list --zookeeper 10.129.2.201:2181
echo "\n========================================================================"
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic AUTOSCALE
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic CHECK_AUTOSCALE
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic COLDSTART_WORKER
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic FUNCTION_LOAD
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic LOG_COMMON
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic 10.129.2.201
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic 192.168.2.3
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic RESPONSE_RM_2_DM_DUMMY
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic deployed
# /home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic heartbeat
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic hscale
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic metrics_worker
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic removeWorker
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --delete --zookeeper 10.129.2.201:2181 --topic request
echo "after deletetion list of kafka topic"
echo "-------------------------"
/home/pcube/mahendra/downloads/kafka/bin/kafka-topics.sh --list --zookeeper 10.129.2.201:2181
\ No newline at end of file
{"id":"192.168.2.3","master_node":"192.168.2.3"}
\ No newline at end of file
'use strict';
// const isolateBackend = require('./isolate')
const fs = require('fs')
const { spawn } = require('child_process');
const constants = require("../constants_local.json")
const libSupport = require('./lib')
const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = constants.registry_url
const logger = libSupport.logger
function runIsolate(local_repository, metadata) {
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => {
const worker = new Worker(filename, {
argv: [resource_id, functionHash, port, "isolate", constants.network.external.kafka_host],
resourceLimits: {
maxOldGenerationSizeMb: memory
}
});
worker.on('message', resolve);
worker.on('error', (err) => {
logger.error("Isolate failed with error", err)
reject(err)
});
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
logger.info(`Isolate Worker with resource_id ${resource_id} blown`);
resolve()
})
});
}
function runProcess(local_repository, metadata) {
console.log("inside run process : ",metadata, local_repository)
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('node', [filename, resource_id, functionHash, port, "process",
constants.network.external.kafka_host, `--max-old-space-size=${memory}` ]);
console.log("pid of the process is ", process.pid);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference);
});
process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
resolve(process.pid);
logger.info(`Process Environment with resource_id ${resource_id} blown`);
});
})
}
function runContainer(metadata) {
let imageName = metadata.functionHash,
port = metadata.port,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
logger.info(imageName);
console.log('run contianer function : ', metadata, imageName, port, resource_id, memory)
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process_checkImage = spawn('docker', ["inspect", registry_url + imageName])
process_checkImage.on('close', (code) => {
console.log("\ncode : ", code)
if (code != 0) {
const process_pullImage = spawn('docker', ["pull", registry_url + imageName]);
process_pullImage.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
reject(data);
});
process_pullImage.on('close', (code) => {
if (code != 0)
reject("error")
else {
let process = null;
if (constants.network.use_bridge)
process = spawn('docker', ["create", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`,
"-p", `${port}:${port}/udp`, "--mac-address","00:22:22:22:22:22","--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
else
process = spawn('docker', ["create", "--rm", "-p", `${port}:${port}`,
"-p", `${port}:${port}/udp`, "--mac-address","00:22:22:22:22:22", "--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
// let add_network = spawn('docker', ['network', 'connect', 'macvlantest', resource_id])
let _ = spawn('docker', ['start', resource_id])
_.on('data', (data) => {
console.log("container started", data);
})
// add_network.stderr.on('data', (data) => {
// // console.log("network add error", data);
// })
add_network.on('close', (code) => {
logger.info("Ran command");
})
result += data;
resolve(resource_id);
});
process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
logger.info("Exiting container");
})
}
})
} else {
logger.info("container starting at port", port,"to check");
console.log(port, "no to check!!")
let process = null;
/**
* create docker on the default bridge
*/
let docker_args = null;
if (constants.network.use_bridge)
docker_args = ["create", "--rm", `--network=${constants.network.network_bridge}`,
"-p", `${port}:${port}/tcp`, "-p", `${port}:${port}/udp`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host];
else
docker_args = ["create",
"-p", `${port}:${port}/tcp`, "-p", `${port}:${port}/udp`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host];
console.log("docker args :: ", docker_args)
process = spawn('docker',docker_args);
let result = "";
// timeStart = Date.now()
console.log("resource id is: ",resource_id)
var container_id
process.stdout.on('data', (data) => {
//container_id = data.toString
logger.info(`stdout: ${data.toString()}`);
console.log(data.toString())
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
/**
* attach smartnic interface
*/
// let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
// let add_network = spawn('docker', ['network', 'connect', constants.network.network_bridge, resource_id])
let _ = spawn('docker', ['start', resource_id,'-i'])
_.stdout.on('data', (data) => {
logger.info(data.toString())
resolve(resource_id);
})
_.stderr.on('data', (data) => {
logger.info(data.toString())
})
_.on('close', (data) => {
logger.info("exit exit")
logger.info(data.toString())
})
});
process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
logger.info("Exiting container");
})
}
})
})
}
module.exports.runContainer = runContainer;
module.exports.runProcess = runProcess;
module.exports.runIsolate = runIsolate;
This diff is collapsed.
// Create a new isolate limited to 128MB
const ivm = require('isolated-vm');
function createIsolate() {
let context;
const isolate = new ivm.Isolate({ memoryLimit: 128 });
// Create a new context within this isolate. Each context has its own copy of all the builtin
// Objects. So for instance if one context does Object.prototype.foo = 1 this would not affect any
// other contexts.
context = isolate.createContextSync();
// Get a Reference{} to the global object within the context.
const jail = context.global;
// This make the global object available in the context as `global`. We use `derefInto()` here
// because otherwise `global` would actually be a Reference{} object in the new isolate.
jail.setSync('global', jail.derefInto());
// We will create a basic `log` function for the new isolate to use.
const logCallback = function(...args) {
console.log(...args);
};
context.evalClosureSync(`global.console.log = function(...args) {
$0.applyIgnored(undefined, args, { arguments: { copy: true } });
}`, [ logCallback ], { arguments: { reference: true } });
// And let's test it out:
// context.evalSync('logging sync test.');
return {isolate, context};
}
module.exports.createIsolate = createIsolate;
const fetch = require('node-fetch');
const fs = require('fs');
const process = require('process')
const { spawnSync } = require('child_process');
const constants = require(".././constants_local.json")
const kafka = require('kafka-node')
const winston = require('winston')
const { createLogger, format, transports } = winston;
function updateConfig() {
console.log("Retrieving primary IP");
let file = JSON.parse(fs.readFileSync('./config.json', { encoding: 'utf-8' }))
const getIP = spawnSync("ip", ["route", "get", file.master_node]);
let err = getIP.stderr.toString().trim()
if (err !== '') {
console.log(err);
process.exit(1);
}
let data = getIP.stdout.toString().trim()
data = data.substr(0, data.indexOf("\n")).trim()
data = data.split(' ')
file.id = data[data.length - 1]
fs.writeFileSync('./config.json', JSON.stringify(file));
console.log("Updated Config file");
console.log("updateconfig file ", file)
}
function makeTopic(id) {
console.log("Using Primary IP", id, "as topic", "publishing to:", constants.network.external.kafka_host);
let client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
Producer = kafka.Producer,
producer = new Producer(client)
return new Promise((resolve, reject) => {
producer.send([{
topic: id,
messages: JSON.stringify({
status: "success",
})
}], (err, data) => {
if (err)
reject();
else
resolve();
})
})
}
// var download = function (url, dest, check = true, cb) {
// return new Promise((resolve, reject) => {
// console.log(url);
// if (!check || !fs.existsSync(dest)) {
// var file = fs.createWriteStream(dest);
// var request = https.get(url, function (response) {
// response.pipe(file);
// file.on('finish', function () {
// file.close(cb); // close() is async, call cb after close completes.
// resolve();
// });
// }).on('error', function (err) { // Handle errors
// fs.unlink(dest); // Delete the file async. (But we don't check the result)
// logger.error("download failed" + err.message);
// if (cb) cb(err.message);
// reject(err);
// });
// } else {
// resolve();
// }
// })
// };
const download = (async (url, path, check = true) => {
if (!check || !fs.existsSync(path)) {
console.log(url);
const res = await fetch(url);
const fileStream = fs.createWriteStream(path);
await new Promise((resolve, reject) => {
res.body.pipe(fileStream);
res.body.on("error", (err) => {
reject(err);
});
fileStream.on("finish", function () {
resolve();
});
});
}
});
function makeid(length) {
var result = '';
var characters = 'abcdefghijklmnopqrstuvwxyz0123456789';
var charactersLength = characters.length;
for (var i = 0; i < length; i++) {
result += characters.charAt(Math.floor(Math.random() * charactersLength));
}
return result;
}
function returnPort(port, usedPort) {
usedPort.delete((port))
}
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
format.timestamp(),
format.json()
),
defaultMeta: { module: 'Dispatch Agent' },
transports: [
//
// - Write to all logs with level `info` and below to `combined.log`
// - Write all logs error (and below) to `error.log`.
//
new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
new winston.transports.File({ filename: 'log/combined.log' }),
new winston.transports.Console({
format: winston.format.combine(
format.colorize({ all: true }),
format.timestamp(),
format.simple()
)
})
]
});
function getPort(usedPort) {
let port = -1, ctr = 0
do {
let min = Math.ceil(30000);
let max = Math.floor(60000);
port = Math.floor(Math.random() * (max - min + 1)) + min;
ctr += 1;
if (ctr > 30000) {
port = -1
break
}
} while (usedPort.has(port))
return port
}
module.exports = {
download, makeid, updateConfig, makeTopic, returnPort, logger, getPort
}
'use strict';
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
var childProcess = require('child_process');
var spawn = childProcess.spawn;
const os = require('os')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 70, flagFirstRequest = true
let waitTime, isDead = false
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch (e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (!isDead && Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{ topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
server.kill(0);
})
cleanup()
console.log("server killed");
isDead = true;
}
}
const server = spawn('./server', ['0.0.0.0', port, 2]);
server.stdout.on('data', (data) => {
console.log(`stdout: ${data.toString()}`);
});
server.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});
server.on('close', (code) => {
console.log(`child process exited with code ${code}`);
process.exit(0)
});
process.on('SIGINT', () => {
console.log('Received SIGINT. Press Control-D to exit.');
// server.kill(0);
// server.emit()
});
function handle(signal) {
console.log(`Received ${signal}`);
}
function cleanup() {
server.kill('SIGINT');
}
process.on('SIGINT', handle);
process.on('SIGTERM', handle);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id = execSync('ip a | grep -oE "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep 192.168 | head -n 1').toString()
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
// console.log("message", msg)
let payload = unpackPacket(msg)
// console.log(payload, typeof payload);
lastRequest = Date.now()
totalRequest++
executor(payload).then(result => {
result = packPacket(payload)
try {
udpProxy.send(result, 0, result.length, "8080", "192.168.2.2", function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null; exec_id = null, function_count = null, function_id = null
let base = 0
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack("B", packet, base)
base += 1
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0]
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
base = 0
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo("B", message, base, [dataPacket.function_id])
base += 1
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id = execSync('ip a | grep -oE "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep 192.168 | head -n 1').toString()
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
console.log("message", msg)
let payload = {}
// console.log(payload, typeof payload);
lastRequest = Date.now()
totalRequest++
executor(payload).then(result => {
result = {}
try {
udpProxy.send(result, 0, result.length, "8080", "192.168.2.2", function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
var childProcess = require('child_process');
var spawn = childProcess.spawn;
const os = require('os')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 10, flagFirstRequest = true
let waitTime, isDead = false
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch (e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (!isDead && Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{ topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
server.kill(0);
})
cleanup()
console.log("server killed");
isDead = true;
}
}
const server = spawn('./server', ['0.0.0.0', port, 2]);
server.stdout.on('data', (data) => {
console.log(`stdout: ${data.toString()}`);
});
server.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});
server.on('close', (code) => {
console.log(`child process exited with code ${code}`);
process.exit(0)
});
process.on('SIGINT', () => {
console.log('Received SIGINT. Press Control-D to exit.');
// server.kill(0);
// server.emit()
});
function handle(signal) {
console.log(`Received ${signal}`);
}
function cleanup() {
server.kill('SIGINT');
}
process.on('SIGINT', handle);
process.on('SIGTERM', handle);
setInterval(shouldDie, 1000);
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
jspack @ 4753fb1a
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment