Commit 67da58af authored by Mahendra Patel's avatar Mahendra Patel

debugging speedo

parent 14bb7284
[Kafka]
Address = 10.129.6.5:9092
[Arbiter]
MessageReadGap = 10
GruntTimeToDie = 10000
GruntResponseWaitTime = 100
[Grunt]
MessageReadGap = 10
HeartbeatGap = 1000
\ No newline at end of file
Design decisions to be considered:
1. Container: copy executable file inside prebuilt images.
2. Container: run warm request based container server instead of current arch
possibly will require request redirection.
3. Move away from mosquitto to Apache Kafka / Zookeeper
4. Implement heartbeat based daemon monitor.
<mxfile host="www.draw.io" modified="2020-02-16T14:55:40.736Z" agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36" etag="T5VdX2IN53B3V4cbXLc2" version="12.7.0" type="device"><diagram id="Pc5nJqTTz6jn2qa6debb" name="Page-1">7V1bc6M4Fv41qZp5SApx53Fym85MZ9fb2Z2ePMog20xj5BVyLvPrVwJhg6Q4dhuQSW+6qssIhM13vnPT5XDmXC1ffiVwtbjHCcrObCt5OXOuz2w7sB32P294rRo8y7OrljlJk6oNbBse0r+RaLRE6zpNUNG6kGKc0XTVboxxnqOYttogIfi5fdkMZ+1vXcE5UhoeYpiprV/ThC6qVsexrO2JTyidL8RXu34ouixhfbW4tFjABD83mpybM+eKYEyrT8uXK5Rx9Gpgqn63b5zd/DKCcrpPh9+vXkDk58/3698ew/nk8Z/nk7/OgRVW93mC2Vo8s/i59LUGgeB1niB+G3DmXD4vUooeVjDmZ5+Z2Fnbgi4zcbqgBH/bgMVbElgsNr35wQRSikhettiWy1rVZxGP94QIRS+NJvFsvyK8RJS8skvE2SgUONdUq46et1ILXHHFoimwSDRCwZT55s5bLNkHAedB0PoKtABcsIa7fIbJsmCfvtzzb57iNeXkXOcxTXHO9QWtMvy65GiMUhhMQVrCqHWgIY0N8C1pBH1Jw1WARAnTdHGICV3gOc5hdrNtvdxCbbGj7TWfMV4JBP9ClL4KswXXFLfhZwCS1z9F//LgkR9cePXh9Uvz5PVrffSS0j+3V7Kjx8aZbSd+UPdpi9qWRf2mUAu8JjHaxWNhdiGZI7rjOmHqOaw7KUJQBmn61DawOnmLrhOccjWoqeVK1HLs9h2q5xGdtqT5hRD42rhsxS8o9v+ac2FQthys7rhl5OYRv5+kwfu2eADWGmeS1zWTjrPjilCu02IFabxgrfcwZxEAYZ9+ui5tNvf8OFln6OfjLPcszbIrnGFS9nUSD4WJuxFO40xoTx3f78hsB5JuqU4U2Bqz7ffmQ50xme2Wmqha9P2KY++pOHXUfCKaE4Vjkl5vZm9f6YVhLx70YM8H2kYAeJbEhB48HwDACFe+I9Da8uvC84M2xywQvMOy8miCSMoQQ+TjGo5OqGe7UkorXR844a7r+6GqvWc8cPOC4jVFvLGHgGA2m9lxrAsIEn/qe10FBJ4UEAA1kRs2InAU9G/TjIP8QDHhgzn819yy/z8xnSrhFs1dYg+TAGqxn7q+11UO7ckZiGc6GvMU7D/jGPJud8sK+i9onhb1V40Lbj84NbjV4SP7QhCdYy14D/OE5xyIwgRSqADPnp620YVZOufDPTHDinugS45RyuT4izixTJOkcq+oSP+G0/JW3E0J883u612eedf8XsyjFpVz3Rkj7S8GW7I45zqLE2rEYPclBt0AqZ9xUJP0iX2c848Ol8zlOs24MFKhDc8pXZT2qLp8Suqr6xb2exr30Nw2QSuUoDhFxXH3qUhSVMzR9xk5bzzLa/Em0NBGN/7r9UWbenjTVPKzzXcem+eGTl1r9X0/+3FPIgQNJK/rertDUGCFOzv0E4NG0f/JdQi57NPgliVF09FubsnpkHR9T5m4pfi7L0hIopHfdDm2CVE408Z3fhyi6ayf+M4BpuM74CpIexdlEP3fNSo4D8rpQdXtzzApubqRigg0EuZ3CUq0fe4KzKldzi5m6IlP04/c47uS4bVDjcsPhnT5vplJRjGGVX9ujHYeMFl4hBH29jTCfudW+Dj1U9PZSkm4Rv1x36mJm82Q/8ZoTRBNLasbjQDytHtg2sQFhoPgY2cAjtAKf1+t6HzU/zitUEcdJgTHqCj60IohxjCZVrRTQ2B8ZMdWYyxp9rhbnMMY6XGehh7PV/rBmaVCpnHWzN1/AJxdCWfzfLY/JM6ehLNvHGd19uMj4OxLOBsPW2w1M7vCOYVpjkgxSogDCeLQNMTOsC5wmPVTigt0jFPZGdYFDoWz5AId83we1gUOhbPkAp3IOM7DusChcJZcoGsZx1l1gR8BZ8kPusYHgR11FOo2JSgmMP5WriP6477biGMopEMJac2GnIGXU6iesLfxvmGmNIAnZyjGMVa94Ogxlkyz8TXxvuoBBcYdG4qB8JVNsnF8B5wXQICZ40CHceQHDuzKGMt2wvwIqK8OM48eZMlQAOPDRb66n2z0IEvWAhhPsH11faCYMOnYHg+FsBS4AeOpdTBg4Dbz+D+t0yv/eA+c00Z79dcN+FG9aLDG3ni6HQxop81iDyxpdZZtPAcPIwV8BXAj+4HfhPvdafFAuCQDm+GO04MBXalpPZA3YRkfIwGWEd4ftTlwsw7F8NbhWo9Gtuk+VKOqxi47lpou+UrGDpVuCuMwcXRKZzuu6yU9KRcwrlz1qskfb4lWsK9qgM51o+x66PJxYMtVdbxWTZL3OwBrgCImdQGs018FOy4S+idCQu9QEkod6oSyXxKaiRjGRCj7NDZcHU+oeidHv4QaVSUaI4RyPoqbdL0hCKVOM5hgmBGmdL/Xc98Q2L/JJ4t/Wb/ffj7/isjlb7eTyV2d+hjKLg8IWPbKEiVQjxHlsaqrqFpYd5GXLte3qKhzdPW+t76n3+p9as76CRe04wWjhgeHbMm2DrnkQKu8RrKN4RSxNpV7lnAa2tlGcjXOdyo2vXN9T65WXctW7qwd8TCSrQ+BzQ0jReoknc83M/8bZt/41jGmQWI/c426VW1j5iayxL8cS5dEMbY9yYG0Llm36s3WlSHprXpNNJo9yY3qEo0yelVPJ3TeyXi0dfW6DG4jZ19D3HVs+12GWc5pvLB/OxuZCaRPIEfemxx21+Q4zjioSwn/U3Rc3cNwwCpPZtZqYc5RqisqNAW7yvcEVLKwCpQnR9b/arxa4Ii7lD58yme8q/cT8FIjH7KYmDJvUr+1pMkaoGFNf9XE1PUHboMipCoZs5slDQnF7Gfxr+yMEKgqdvpRCQGkZYeb97+8Z0Z6IwSof0GDEUFVPWhTFmhZF2vagxEE0TXJOYFgkjCIi5+Kn9kBnvEHqkYxDiFFUu8Vqi1Pebt9iTZytrhu1GaLbhmfroZlj2xR0zON6MKLu/yJOelCCJ1rNyzeqCi1NyE0CzeqdPCIe6JNceWta/tR2CUv0NU5p0FLXQJLjSOjyhbFiAWxxU4xiWqpI5cKsKT6cq7GQ+ik4vQnFXXYSxdpWlxS9d7Og+rJbpTwB9E8sBHgjsorutda9ShjNS68hwUtg8J/4GT8euX4klpFGr2yNJj3NqIGNBUyv2LybYO5OuEzNtCVZYqebi18X6hrp3lsBXP/gj2E9YBKU3SF81lKlvDNJLcMZBslM9Ur4BNMMzhNs5S+noIEFXFphPqmBEO51KxuW6XTU5CgFaDmZYhWKcGN7jxQSOhWluOXgbwOxdeYLq8bJWKH2zeJVuO52xeyOjf/Aw==</diagram></mxfile>
\ No newline at end of file
bitnami*
node_modules
package-lock.json
firecracker*
secrets.json
grunt
.clinic
rm_dummy.js
metrics_gatherer.js
\ No newline at end of file
{
"python.pythonPath": "/usr/bin/python"
}
\ No newline at end of file
import socket
import struct
import time
import threading
import random
import time
import numpy as np
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--t', help='Runtime',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
runtime = args.t
concurrency = args.c
SERVER_IP = "192.168.2.3"
packet_holder = [None] * 11
ingress_time = {}
stop_thread = False
def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 10000 + i
print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, port))
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
packet_holder[i].append((packet, time.time() ))
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
# print chain_id, exec_id, "function_id", function_id, function_count, \
# f0, f1, f2, f3, f4,
chain_id = struct.pack(">I", chain_id) # chain id
exec_id_packed = struct.pack(">I", exec_id) # execution id
dataInt = 0
# print " dataInt", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id_packed + function_id + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, exec_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
if time.time() - start_time > runtime:
break
packet, exec_id = genPacket()
if exec_id in ingress_time:
continue
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time()
time.sleep(sleep_time)
def send():
global egress_time, ingress_time, concurrency, runtime, stop_thread
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("Sending packet to %s at port %s" % (SERVER_IP, PORT))
print("Runtime: %d Concurrency %d" % (runtime, concurrency))
print("chain id, exec id, data, function count, functions dependencies...")
# op = struct.unpack("B", packet[0])
if args.n is not None:
for i in range(args.n):
packet, exec_id = genPacket()
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time() * 1000
print("s", "{0:f}".format(ingress_time[exec_id]))
elif args.rps is not None:
start_time = time.time()
sleep_time = concurrency / float(args.rps)
print("calculated inter-arrival time, offload mode", sleep_time)
for i in range(concurrency):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
def printStatistics():
global runtime
e2e_time = []
for packetThread in packet_holder:
for packetTuple in packetThread:
packet = packetTuple[0]
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
print e2e_time
data = np.array(e2e_time, dtype=float)
p50 = np.percentile(data, 50)
p95 = np.percentile(data, 95)
p99 = np.percentile(data, 99)
mean = np.mean(data)
print("mean \t p50 \t p95 \t p99")
print(mean, p50, p95, p99)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
r.start()
time.sleep(1)
send()
time.sleep(2)
# r.join()
printStatistics()
import socket
import struct
import time
import threading
import random
import time
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--req_count', help='request count',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
SERVER_IP = "192.168.2.3"
egress_time = []
ingress_time = []
stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 8080))
print "listening to {} at port {}".format(CLIENT_IP, 8080)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
# print packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
t = int(time.time() * 1000) % 1000000000
data = int(data) - t
print "rec", chain_id, exec_id, data, function_id, function_count,
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
print exec_id
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
dataInt = int(time.time() * 1000) % 1000000000
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id_packed = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id_packed + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, function_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
packet, function_id = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
# op = struct.unpack("B", packet[0])
packet, _ = genPacket()
if args.n is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
elif args.rps is not None:
runtime = 10
thread_count = args.c
start_time = time.time()
sleep_time = thread_count / float(args.rps)
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(1)
send()
r.join()
import socket
import struct
import time
import threading
import random
import time
import numpy as np
import argparse
import signal
parser = argparse.ArgumentParser(description='Mininet demo')
packet_holder = [None] * 11
ingress_time = {}
stop_thread = False
runtime = 10
def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 10000 + i
print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, port))
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
packet_holder[i].append((packet, time.time() ))
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
def printStatistics():
global runtime
e2e_time = []
for packetThread in packet_holder:
for packetTuple in packetThread:
packet = packetTuple[0]
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
# e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
# data = np.array(e2e_time, dtype=float)
# p50 = np.percentile(data, 50)
# p95 = np.percentile(data, 95)
# p99 = np.percentile(data, 99)
# mean = np.mean(data)
# print("mean \t p50 \t p95 \t p99")
# print(mean, p50, p95, p99)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
ri = []
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
r.start()
ri.append(r)
def signal_handler(sig, frame):
global stop_thread
print "sigint"
stop_thread = True
print "here"
time.sleep(15)
printStatistics()
{
"registry_url": "10.129.6.5:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"env": "env_cpp.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "127.0.0.1:9092"
},
"external": {
"kafka_host": "127.0.0.1:29092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON",
"test": "test"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"aggressivity": 1,
"id_size": 20
}
\ No newline at end of file
{
"registry_url": "localhost:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "localhost:5984",
"env": "env_udp.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "xanadu_kafka-serverless",
"use_bridge": true,
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "10.129.2.201:9092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
{
"registry_url": "10.129.6.5:5000/",
"master_port": 8080,
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
"implicit_chain_meta": "implicit_chain",
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "10.129.6.5:9092"
},
"external": {
"kafka_host": "10.129.6.5:9092"
}
},
"topics": {
"request_dm_2_rm": "request",
"heartbeat": "heartbeat",
"deployed": "deployed",
"remove_worker": "removeWorker",
"response_rm_2_dm": "RESPONSE_RM_2_DM_DUMMY",
"hscale": "hscale",
"log_channel": "LOG_COMMON"
},
"autoscalar_metrics": {
"open_request_threshold": 100
},
"metrics": {
"alpha": 0.7
},
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
\ No newline at end of file
{"id":"10.129.2.201","master_node":"192.168.0.105"}
\ No newline at end of file
'use strict';
// const isolateBackend = require('./isolate')
const fs = require('fs')
const { spawn } = require('child_process');
const constants = require("../constants_local.json")
const libSupport = require('./lib')
const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = constants.registry_url
const logger = libSupport.logger
function runIsolate(local_repository, metadata) {
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => {
const worker = new Worker(filename, {
argv: [resource_id, functionHash, port, "isolate", constants.network.external.kafka_host],
resourceLimits: {
maxOldGenerationSizeMb: memory
}
});
worker.on('message', resolve);
worker.on('error', (err) => {
logger.error("Isolate failed with error", err)
reject(err)
});
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
logger.info(`Isolate Worker with resource_id ${resource_id} blown`);
resolve()
})
});
}
function runProcess(local_repository, metadata) {
console.log("inside run process : ",metadata, local_repository)
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
let filename = local_repository + functionHash + ".js"
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process = spawn('node', [filename, resource_id, functionHash, port, "process",
constants.network.external.kafka_host, `--max-old-space-size=${memory}` ]);
process.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
console.log("process time taken: ", timeDifference);
});
process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
resolve(process.pid);
logger.info(`Process Environment with resource_id ${resource_id} blown`);
});
})
}
function runContainer(metadata) {
let imageName = metadata.functionHash,
port = metadata.port,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
logger.info(imageName);
console.log('run contianer function : ', metadata, imageName, port, resource_id, memory)
return new Promise((resolve, reject) => {
let timeStart = Date.now()
const process_checkImage = spawn('docker', ["inspect", registry_url + imageName])
process_checkImage.on('close', (code) => {
if (code != 0) {
const process_pullImage = spawn('docker', ["pull", registry_url + imageName]);
process_pullImage.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
reject(data);
});
process_pullImage.on('close', (code) => {
if (code != 0)
reject("error")
else {
let process = null;
if (constants.network.use_bridge)
process = spawn('docker', ["create", "--rm", `--network=${constants.network.network_bridge}`, "-p", `${port}:${port}`,
"-p", `${port}:${port}/udp`, "--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
else
process = spawn('docker', ["create", "--rm", "-p", `${port}:${port}`,
"-p", `${port}:${port}/udp`, "--name", resource_id, registry_url + imageName,
resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
// let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let add_network = spawn('docker', ['network', 'connect', 'macvlantest', resource_id])
let _ = spawn('docker', ['start', resource_id])
_.on('data', (data) => {
console.log("container started", data);
})
// add_network.stderr.on('data', (data) => {
// // console.log("network add error", data);
// })
add_network.on('close', (code) => {
logger.info("Ran command");
})
result += data;
resolve(resource_id);
});
process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
logger.info("Exiting container");
})
}
})
} else {
logger.info("container starting at port", port,"to check");
console.log(port, "no to check!!")
let process = null;
/**
* create docker on the default bridge
*/
if (constants.network.use_bridge)
process = spawn('docker', ["create", "--rm", `--network=${constants.network.network_bridge}`,
"-p", `${port}:${port}`, "-p", `${port}:${port}/udp`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
else
process = spawn('docker', ["create",
"-p", `${port}:${port}`, "-p", `${port}:${port}/udp`, "--name", resource_id,
registry_url + imageName, resource_id, imageName, port, "container", constants.network.internal.kafka_host]);
let result = "";
// timeStart = Date.now()
process.stdout.on('data', (data) => {
logger.info(`stdout: ${data.toString()}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
/**
* attach smartnic interface
*/
// let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let _ = spawn('docker', ['start', resource_id])
_.stdout.on('data', (data) => {
logger.info(data.toString())
resolve(resource_id);
})
_.stderr.on('data', (data) => {
logger.info(data.toString())
})
_.on('close', (data) => {
logger.info("exit exit")
logger.info(data.toString())
})
});
process.stderr.on('data', (data) => {
logger.error(`stderr: ${data}`);
reject(data);
});
process.on('close', (code) => {
logger.info("Exiting container");
})
}
})
})
}
module.exports.runContainer = runContainer;
module.exports.runProcess = runProcess;
module.exports.runIsolate = runIsolate;
'use strict';
const constants = require(".././constants_local.json")
const secrets = require('./secrets.json')
const config = require('./config.json')
const libSupport = require('./lib')
libSupport.updateConfig()
const node_id = config.id
const {spawn } = require('child_process')
const execute = require('./execute')
const fs = require('fs')
const fetch = require('node-fetch');
const os = require('os');
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.db.function_meta + "/"
const kafka = require('kafka-node')
const logger = libSupport.logger
const local_repository = __dirname + "/local_repository/"
const host_url = "http://" + constants.master_address + ":" + constants.master_port
let Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
producer = new Producer(client),
Consumer = kafka.Consumer
libSupport.makeTopic(node_id).then(() => {
logger.info("node topic created")
let consumer = new Consumer(client,
[
{ topic: node_id, partition: 0, offset: 0 }
],
[
{ autoCommit: true }
])
consumer.on('message', function (message) {
// logger.info(message);
let topic = message.topic
message = message.value
message = JSON.parse(message)
let runtime = message.runtime
let functionHash = message.functionHash
let resource_id = message.resource_id
let port = message.port
/**
* Download necessary files (function file) and Start resource deployment
*/
if (message.type === "execute") {
logger.info("Received Deployment request for resource_id: " + resource_id);
fetch(metadataDB + functionHash).then(res => res.json())
.then(json => {
console.log("metadata", json);
libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
let metadata = {
resource_id, functionHash,
runtime, port,
resources: {
memory: json.memory
}
}
startWorker(local_repository, producer, metadata)
})
}).catch(err => {
logger.error("something went wrong" + err.toString())
});
}
})
})
/**
* download and start grunt
*/
libSupport.download(constants.grunt_host, "grunt", false).then(() => {
logger.info("Downloaded grunt binary from repository")
fs.chmod('grunt', 0o755, (err) => {
logger.info("grunt made executable. Starting grunt")
let grunt = spawn('./grunt', [node_id])
grunt.stdout.on('data', data => {
// logger.info(data.toString());
})
grunt.stderr.on('data', data => {
// logger.info(data.toString());
})
grunt.on('close', (code) => {
logger.info("Grunt exited with exit code", code);
})
})
})
/**
* Start a worker executor of the runtime type
* @param {String} local_repository
* @param {String} functionHash
* @param {String} resource_id
* @param {String} producer
* @param {String} runtime
* @param {Number} port
*/
function startWorker(local_repository, producer, metadata) {
let runtime = metadata.runtime
console.log(metadata);
logger.info(`Using port ${metadata.port} for functionHash ${metadata.functionHash}`)
if (runtime === "isolate")
execute.runIsolate(local_repository, metadata)
.catch(err => {
logger.error("=====================deployment failed=========================");
logger.error(err)
producer.send([{
topic: "deployed",
messages: JSON.stringify({
"status": false,
resource_id: metadata.resource_id,
"reason": "isolate exit"
})
}], () => { })
})
else if (runtime === "process")
// console.log("rutime is process : ",metadata)
execute.runProcess(local_repository, metadata)
.catch(err => {
logger.error("=====================deployment failed=========================");
producer.send([{ topic: "deployed",
messages: JSON.stringify({
"status": false,
resource_id: metadata.resource_id,
"reason": "process exit"
}) }], () => { })
})
else if (runtime === "container")
execute.runContainer(metadata)
else {
producer.send(
[{
topic: "response",
messages: JSON.stringify({ status: "unknown runtime" })
}], () => { })
return
}
}
function heartbeat() {
let info = {
free_mem: os.freemem(),
cpu_count: os.cpus().length,
total_mem: os.totalmem(),
avg_load: os.loadavg()
}
let payload = [{
topic: "heartbeat",
messages: JSON.stringify({
"address": node_id,
"system_info": info,
"timestamp": Date.now()
})
}]
producer.send(payload, function(cb) {})
}
setInterval(heartbeat, 1000);
// Create a new isolate limited to 128MB
const ivm = require('isolated-vm');
function createIsolate() {
let context;
const isolate = new ivm.Isolate({ memoryLimit: 128 });
// Create a new context within this isolate. Each context has its own copy of all the builtin
// Objects. So for instance if one context does Object.prototype.foo = 1 this would not affect any
// other contexts.
context = isolate.createContextSync();
// Get a Reference{} to the global object within the context.
const jail = context.global;
// This make the global object available in the context as `global`. We use `derefInto()` here
// because otherwise `global` would actually be a Reference{} object in the new isolate.
jail.setSync('global', jail.derefInto());
// We will create a basic `log` function for the new isolate to use.
const logCallback = function(...args) {
console.log(...args);
};
context.evalClosureSync(`global.console.log = function(...args) {
$0.applyIgnored(undefined, args, { arguments: { copy: true } });
}`, [ logCallback ], { arguments: { reference: true } });
// And let's test it out:
// context.evalSync('logging sync test.');
return {isolate, context};
}
module.exports.createIsolate = createIsolate;
const fetch = require('node-fetch');
const fs = require('fs');
const process = require('process')
const { spawnSync } = require('child_process');
const constants = require(".././constants_local.json")
const kafka = require('kafka-node')
const winston = require('winston')
const { createLogger, format, transports } = winston;
function updateConfig() {
console.log("Retrieving primary IP");
let file = JSON.parse(fs.readFileSync('./config.json', { encoding: 'utf-8' }))
const getIP = spawnSync("ip", ["route", "get", file.master_node]);
let err = getIP.stderr.toString().trim()
if (err !== '') {
console.log(err);
process.exit(1);
}
let data = getIP.stdout.toString().trim()
data = data.substr(0, data.indexOf("\n")).trim()
data = data.split(' ')
file.id = data[data.length - 1]
fs.writeFileSync('./config.json', JSON.stringify(file));
console.log("Updated Config file");
console.log("updateconfig file ", file)
}
function makeTopic(id) {
console.log("Using Primary IP", id, "as topic", "publishing to:", constants.network.external.kafka_host);
let client = new kafka.KafkaClient({
kafkaHost: constants.network.external.kafka_host,
autoConnect: true
}),
Producer = kafka.Producer,
producer = new Producer(client)
return new Promise((resolve, reject) => {
producer.send([{
topic: id,
messages: JSON.stringify({
status: "success",
})
}], (err, data) => {
if (err)
reject();
else
resolve();
})
})
}
// var download = function (url, dest, check = true, cb) {
// return new Promise((resolve, reject) => {
// console.log(url);
// if (!check || !fs.existsSync(dest)) {
// var file = fs.createWriteStream(dest);
// var request = https.get(url, function (response) {
// response.pipe(file);
// file.on('finish', function () {
// file.close(cb); // close() is async, call cb after close completes.
// resolve();
// });
// }).on('error', function (err) { // Handle errors
// fs.unlink(dest); // Delete the file async. (But we don't check the result)
// logger.error("download failed" + err.message);
// if (cb) cb(err.message);
// reject(err);
// });
// } else {
// resolve();
// }
// })
// };
const download = (async (url, path, check = true) => {
if (!check || !fs.existsSync(path)) {
console.log(url);
const res = await fetch(url);
const fileStream = fs.createWriteStream(path);
await new Promise((resolve, reject) => {
res.body.pipe(fileStream);
res.body.on("error", (err) => {
reject(err);
});
fileStream.on("finish", function () {
resolve();
});
});
}
});
function makeid(length) {
var result = '';
var characters = 'abcdefghijklmnopqrstuvwxyz0123456789';
var charactersLength = characters.length;
for (var i = 0; i < length; i++) {
result += characters.charAt(Math.floor(Math.random() * charactersLength));
}
return result;
}
function returnPort(port, usedPort) {
usedPort.delete((port))
}
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
format.timestamp(),
format.json()
),
defaultMeta: { module: 'Dispatch Agent' },
transports: [
//
// - Write to all logs with level `info` and below to `combined.log`
// - Write all logs error (and below) to `error.log`.
//
new winston.transports.File({ filename: 'log/error.log', level: 'error' }),
new winston.transports.File({ filename: 'log/combined.log' }),
new winston.transports.Console({
format: winston.format.combine(
format.colorize({ all: true }),
format.timestamp(),
format.simple()
)
})
]
});
module.exports = {
download, makeid, updateConfig, makeTopic, returnPort, logger
}
'use strict';
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
var childProcess = require('child_process');
var spawn = childProcess.spawn;
const os = require('os')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 70, flagFirstRequest = true
let waitTime, isDead = false
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch (e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (!isDead && Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{ topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
server.kill(0);
})
cleanup()
console.log("server killed");
isDead = true;
}
}
const server = spawn('./server', ['0.0.0.0', port, 2]);
server.stdout.on('data', (data) => {
console.log(`stdout: ${data.toString()}`);
});
server.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});
server.on('close', (code) => {
console.log(`child process exited with code ${code}`);
process.exit(0)
});
process.on('SIGINT', () => {
console.log('Received SIGINT. Press Control-D to exit.');
// server.kill(0);
// server.emit()
});
function handle(signal) {
console.log(`Received ${signal}`);
}
function cleanup() {
server.kill('SIGINT');
}
process.on('SIGINT', handle);
process.on('SIGTERM', handle);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id = execSync('ip a | grep -oE "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep 192.168 | head -n 1').toString()
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
// console.log("message", msg)
let payload = unpackPacket(msg)
// console.log(payload, typeof payload);
lastRequest = Date.now()
totalRequest++
executor(payload).then(result => {
result = packPacket(payload)
try {
udpProxy.send(result, 0, result.length, "8080", "192.168.2.2", function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null; exec_id = null, function_count = null, function_id = null
let base = 0
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack("B", packet, base)
base += 1
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0]
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
base = 0
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo("B", message, base, [dataPacket.function_id])
base += 1
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id = execSync('ip a | grep -oE "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep 192.168 | head -n 1').toString()
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
console.log("message", msg)
let payload = {}
// console.log(payload, typeof payload);
lastRequest = Date.now()
totalRequest++
executor(payload).then(result => {
result = {}
try {
udpProxy.send(result, 0, result.length, "8080", "192.168.2.2", function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const os = require('os')
let struct = require('jspack')
struct = struct.jspack
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 30, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid, mac: mac_address}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
// console.log("message", msg)
let payload = unpackPacket(msg)
console.log(payload, typeof payload);
lastRequest = Date.now()
console.log("network stack time", lastRequest - payload.t1)
totalRequest++
executor(payload).then(result => {
result = packPacket(payload)
try {
udpProxy.send(result, 0, result.length, "8080", rinfo.address, function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null, exec_id = null, function_count = null, function_id = null, data = null
let base = 0, f0, f1, f2, f3, f4, t1, t2, t3, t4
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack(">I", packet, base)
base += 4
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
base += 1
f0 = struct.Unpack("B", packet, base)
base += 1
f1 = struct.Unpack("B", packet, base)
base += 1
f2 = struct.Unpack("B", packet, base)
base += 1
f3 = struct.Unpack("B", packet, base)
base += 1
f4 = struct.Unpack("B", packet, base)
base += 1
t1 = struct.Unpack("I", packet, base)
base += 8
t2 = struct.Unpack("I", packet, base)
base += 8
t3 = struct.Unpack("I", packet, base)
base += 8
t4 = struct.Unpack("I", packet, base)
console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0],
f0, f1, f2, f3, f4, t1, t2, t3, t4
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
let base = 0, chain_id, exec_id, function_id, data, function_count
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
base += 4
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
// server.bind(port, "192.168.2.3");
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id = execSync('ip a | grep -oE "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep 192.168 | head -n 1').toString()
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
// console.log("message", msg)
let payload = unpackPacket(msg)
// console.log(payload, typeof payload);
lastRequest = Date.now()
totalRequest++
executor(payload).then(result => {
let result = packPacket(payload)
try {
udpProxy.send(result, 0, result.length, "8080", "192.168.2.2", function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null; exec_id = null, function_count = null, function_id = null
let base = 0
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack("B", packet, base)
base += 1
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0]
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
base = 0
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo("B", message, base, [dataPacket.function_id])
base += 1
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const os = require('os')
let struct = require('jspack')
struct = struct.jspack
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 600, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid, mac: mac_address}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
// console.log("message", msg)
let payload = unpackPacket(msg)
console.log(payload, typeof payload);
lastRequest = Date.now()
totalRequest++
executor(payload).then(result => {
result = packPacket(payload)
console.log(result)
try {
udpProxy.send(result, 0, result.length, "8080", rinfo.address, function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null, exec_id = null, function_count = null, function_id = null, data = null
let base = 0
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack(">I", packet, base)
base += 1
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0]
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
let base = 0, chain_id, exec_id, function_id, data, function_count
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
base += 1
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
// server.bind(port, "192.168.2.3");
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
var childProcess = require('child_process');
var spawn = childProcess.spawn;
const os = require('os')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 10, flagFirstRequest = true
let waitTime, isDead = false
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch (e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (!isDead && Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{ topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
server.kill(0);
})
cleanup()
console.log("server killed");
isDead = true;
}
}
const server = spawn('./server', ['0.0.0.0', port, 2]);
server.stdout.on('data', (data) => {
console.log(`stdout: ${data.toString()}`);
});
server.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});
server.on('close', (code) => {
console.log(`child process exited with code ${code}`);
process.exit(0)
});
process.on('SIGINT', () => {
console.log('Received SIGINT. Press Control-D to exit.');
// server.kill(0);
// server.emit()
});
function handle(signal) {
console.log(`Received ${signal}`);
}
function cleanup() {
server.kill('SIGINT');
}
process.on('SIGINT', handle);
process.on('SIGTERM', handle);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 60, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id = execSync('ip a | grep -oE "([0-9]{1,3}\.){3}[0-9]{1,3}" | grep 192.168 | head -n 1').toString()
console.log(node_id);
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[ {topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
let payload = JSON.parse(msg)
// console.log(payload, typeof payload);
lastRequest = Date.now()
totalRequest++
executor(payload).then(result => {
result = JSON.stringify(result)
try {
udpProxy.send(result, 0, result.length, "8080", "192.168.2.3", function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const os = require('os')
let struct = require('jspack')
struct = struct.jspack
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 30, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid, mac: mac_address}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
// console.log("message", msg)
let payload = unpackPacket(msg)
console.log(payload, typeof payload);
lastRequest = Date.now()
console.log("network stack time", lastRequest - payload.t1)
totalRequest++
executor(payload).then(result => {
result = packPacket(payload)
try {
udpProxy.send(result, 0, result.length, "8080", rinfo.address, function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null, exec_id = null, function_count = null, function_id = null, data = null
let base = 0, f0, f1, f2, f3, f4, t1, t2, t3, t4
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack(">I", packet, base)
base += 4
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("B", packet, base)
base += 1
f0 = struct.Unpack("B", packet, base)
base += 1
f1 = struct.Unpack("B", packet, base)
base += 1
f2 = struct.Unpack("B", packet, base)
base += 1
f3 = struct.Unpack("B", packet, base)
base += 1
f4 = struct.Unpack("B", packet, base)
base += 1
t1 = struct.Unpack(">Q", packet, base)
base += 8
t2 = struct.Unpack(">Q", packet, base)
base += 8
t3 = struct.Unpack(">Q", packet, base)
base += 8
t4 = struct.Unpack(">Q", packet, base)
console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0],
f0, f1, f2, f3, f4, t1: t1[0], t2, t3, t4
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
let base = 0, chain_id, exec_id, function_id, data, function_count
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
base += 4
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
// server.bind(port, "192.168.2.3");
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 60, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
}),
producer = new Producer(client)
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
let wait = 1
let a = payload
sleep(wait).then(() => {
a.waited = wait
a.timestamp = Date.now()
resolve(a)
})
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
setInterval(shouldDie, 1000);
\ No newline at end of file
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const os = require('os')
let struct = require('jspack')
struct = struct.jspack
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 300, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",
messages: JSON.stringify({
functionHash, portExternal: port, node_id: node_id.trim(),
runtime, resource_id, entity_id: process.pid, mac: mac_address
}),
"status": true
});
if (producer)
producer.send(
[{
topic: "deployed",
messages: JSON.stringify({ functionHash, portExternal: port, node_id,
runtime, resource_id, entity_id: process.pid, mac: mac_address}),
"status": true
}], () => { })
waitTime = Date.now()
})
function shouldDie() {
if (Date.now() - lastRequest > idleTime * 1000) {
let message = JSON.stringify({
functionHash, portExternal: port,
runtime, resource_id, entity_id: process.pid,
total_request: totalRequest, wait_time: waitTime
})
console.log("Idle for too long. Exiting");
if (producer)
producer.send(
[
{topic: "removeWorker", messages: message }
], (err, data) => {
if (err)
console.log(err);
console.log("Ending worker for function", functionHash, "resource_id", resource_id);
process.exit(0)
})
}
}
server.on('error', (err) => {
console.log(`server error:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
// console.log("message", msg)
// let payload = unpackPacket(msg)
// console.log(payload, typeof payload);
lastRequest = Date.now()
// console.log("network stack time", lastRequest - payload.t1)
totalRequest++
executor(msg).then(result => {
// result = packPacket(msg)
try {
udpProxy.send(msg, 0, msg.length, "8080", rinfo.address, function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
})
} catch (e) {
console.log(e)
}
})
});
function unpackPacket(packet) {
// let buffer = new Array(1024)
let chain_id = null, exec_id = null, function_count = null, function_id = null, data = null
let base = 0, f0, f1, f2, f3, f4, t1, t2, t3, t4
chain_id = struct.Unpack(">I", packet, base)
base += 4
exec_id = struct.Unpack(">I", packet, base)
base += 4
function_id = struct.Unpack(">I", packet, base)
base += 4
data = struct.Unpack(">I", packet, base)
base += 4
function_count = struct.Unpack("I", packet, base)
base += 4
f0 = struct.Unpack("B", packet, base)
base += 1
f1 = struct.Unpack("B", packet, base)
base += 1
f2 = struct.Unpack("B", packet, base)
base += 1
f3 = struct.Unpack("B", packet, base)
base += 1
f4 = struct.Unpack("B", packet, base)
base += 1
t1 = struct.Unpack("I", packet, base)
base += 8
t2 = struct.Unpack("I", packet, base)
base += 8
t3 = struct.Unpack("I", packet, base)
base += 8
t4 = struct.Unpack("I", packet, base)
// console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
chain_id: chain_id[0],
exec_id: exec_id[0],
data: data[0],
function_count: function_count[0],
function_id: function_id[0],
f0, f1, f2, f3, f4, t1, t2, t3, t4
}
}
function packPacket(dataPacket) {
let message = new Array(1024)
let base = 0, chain_id, exec_id, function_id, data, function_count
chain_id = struct.PackTo(">I", message, base, [dataPacket.chain_id])
base += 4
exec_id = struct.PackTo(">I", message, base, [dataPacket.exec_id])
base += 4
function_id = struct.PackTo(">I", message, base, [dataPacket.function_id])
base += 4
data = struct.PackTo(">I", message, base, [dataPacket.data])
base += 4
function_count = struct.PackTo("B", message, base, [dataPacket.function_count])
message = Buffer.from(message)
return message
}
server.on('listening', () => {
const address = server.address();
console.log(`server listening ${address.address}:${address.port}`);
});
// server.bind(port, "192.168.2.3");
server.bind(port);
setInterval(shouldDie, 1000);
'use strict';
const express = require('express')
let request = require('request')
const process = require('process')
const dgram = require('dgram');
const { constants } = require('buffer');
const { spawnSync, execSync } = require('child_process');
// const { logger } = require('../../lib');
const server = dgram.createSocket('udp4');
const udpProxy = dgram.createSocket('udp4');
const os = require('os')
let struct = require('jspack')
struct = struct.jspack
const app = express()
let port = 5000, resource_id, functionHash, runtime, idleTime = 300, flagFirstRequest = true
let waitTime
resource_id = process.argv[2]
functionHash = process.argv[3]
port = process.argv[4]
runtime = process.argv[5]
request = request.defaults({
headers: { 'x-resource-id': resource_id }
});
let producer
try {
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
kafkaHost: process.argv[6],
autoConnect: true
})
producer = new Producer(client)
} catch(e) {
console.log("Exception: ", e);
}
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
let lastRequest = Date.now(), totalRequest = 0
app.post('/serverless/function/execute/', (req, res) => {
if (flagFirstRequest) {
waitTime = Date.now() - waitTime
flagFirstRequest = false
}
let payload = req.body
lastRequest = Date.now()
totalRequest++
executor(payload).then((result) => {
res.json(result)
})
})
app.post('/serverless/function/timeout', (req, res) => {
console.log(req.body);
let idleTime = req.body.timeout
console.log("Idle time set to: ", idleTime);
res.json({
status: "success"
})
})
async function executor(payload) {
return new Promise((resolve, reject) => {"f1"
resolve(payload)
})
}
app.listen(port, () => {
console.log(`Resource ${resource_id} Server listening on port ${port}!`)
let node_id, mac_address;
let interfaces = os.networkInterfaces()
for (let networkInterface in interfaces) {
networkInterface = interfaces[networkInterface]
if (networkInterface[0].address.startsWith("192.168.2")) {
node_id = networkInterface[0].address
mac_address = networkInterface[0].mac
}
}
console.log({
topic: "deployed",