Commit 0357774b authored by Mahendra Patel's avatar Mahendra Patel

commit_message

parent 3a6cef8c
......@@ -4,7 +4,7 @@ import time
import threading
import random
import time
import numpy as np
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
......@@ -13,6 +13,8 @@ parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--t', help='Runtime',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
......@@ -25,106 +27,100 @@ args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
runtime = args.t
concurrency = args.c
SERVER_IP = "192.168.2.3"
egress_time = []
ingress_time = []
packet_holder = [None] * 11
ingress_time = {}
stop_thread = False
def receive():
global egress_time, stop_thread
def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 10000 + i
print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 8080))
print "listening to {} at port {}".format(CLIENT_IP, 8080)
s.bind((CLIENT_IP, port))
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
# print packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
t = int(time.time() * 1000) % 1000000000
data = int(data) - t
print "rec", chain_id, exec_id, data, function_id, function_count,
packet_holder[i].append((packet, time.time() ))
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
print exec_id
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
# print chain_id, exec_id, "function_id", function_id, function_count, \
# f0, f1, f2, f3, f4,
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
exec_id_packed = struct.pack(">I", exec_id) # execution id
dataInt = int(time.time() * 1000) % 1000000000
dataInt = 0
# print " dataInt", dataInt
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id_packed = struct.pack(">I", function_id)
function_id = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id_packed + data + function_count + f0 + f1 + f2 + f3 + f4
packet = chain_id + exec_id_packed + function_id + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, function_id
return packet, exec_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
packet, function_id = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
if time.time() - start_time > runtime:
break
packet, exec_id = genPacket()
if exec_id in ingress_time:
continue
s.sendto(packet, (SERVER_IP, PORT))
ingress_time[exec_id] = time.time()
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
global egress_time, ingress_time, concurrency, runtime, stop_thread
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
print("Sending packet to %s at port %s" % (SERVER_IP, PORT))
print("Runtime: %d Concurrency %d" % (runtime, concurrency))
print("chain id, exec id, data, function count, functions dependencies...")
# op = struct.unpack("B", packet[0])
packet, _ = genPacket()
if args.n is not None:
for i in range(args.req_count):
for i in range(args.n):
packet, exec_id = genPacket()
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
ingress_time[exec_id] = time.time() * 1000
print("s", "{0:f}".format(ingress_time[exec_id]))
elif args.rps is not None:
runtime = 10
thread_count = args.c
start_time = time.time()
sleep_time = thread_count / float(args.rps)
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
sleep_time = concurrency / float(args.rps)
print("calculated inter-arrival time, offload mode", sleep_time)
for i in range(concurrency):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
......@@ -135,13 +131,37 @@ def send():
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
def printStatistics():
global runtime
e2e_time = []
for packetThread in packet_holder:
for packetTuple in packetThread:
packet = packetTuple[0]
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
print e2e_time
data = np.array(e2e_time, dtype=float)
p50 = np.percentile(data, 50)
p95 = np.percentile(data, 95)
p99 = np.percentile(data, 99)
mean = np.mean(data)
print("mean \t p50 \t p95 \t p99")
print(mean, p50, p95, p99)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
r.start()
time.sleep(1)
send()
r.join()
time.sleep(2)
# r.join()
printStatistics()
import socket
import struct
import time
import threading
import random
import time
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--c', help='Concurrency',
type=int, action="store", required=True)
parser.add_argument('--req_count', help='request count',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--n', help='Number of requests to send',
type=int, action="store")
args = parser.parse_args()
PORT = 8000
dataInt = 0
fid = args.fid
SERVER_IP = "192.168.2.3"
egress_time = []
ingress_time = []
stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 8080))
print "listening to {} at port {}".format(CLIENT_IP, 8080)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
# print packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
t = int(time.time() * 1000) % 1000000000
data = int(data) - t
print "rec", chain_id, exec_id, data, function_id, function_count,
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 2 ** 30)
print exec_id
chain_id = 1
function_count = 5
function_id = fid if (fid) else 1
f0 = 0; f1 = 1; f2 = 2; f3 = 0; f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
dataInt = int(time.time() * 1000) % 1000000000
data = struct.pack(">I", dataInt) # data
function_count = struct.pack("B", function_count) # function count
function_id_packed = struct.pack(">I", function_id)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id_packed + data + function_count + f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, function_id
def sendThread(start_time, runtime, sleep_time):
global ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
packet, function_id = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
# op = struct.unpack("B", packet[0])
packet, _ = genPacket()
if args.n is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
elif args.rps is not None:
runtime = 10
thread_count = args.c
start_time = time.time()
sleep_time = thread_count / float(args.rps)
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(1)
send()
r.join()
import socket
import struct
import time
import threading
import random
import time
import numpy as np
import argparse
import signal
parser = argparse.ArgumentParser(description='Mininet demo')
packet_holder = [None] * 11
ingress_time = {}
stop_thread = False
runtime = 10
def receive(i):
global stop_thread, packet_holder
CLIENT_IP = "0.0.0.0"
port = 10000 + i
print i
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, port))
print("listening to {} at port {}".format(CLIENT_IP, port))
run_status = {}
packet_holder[i] = []
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
packet_holder[i].append((packet, time.time() ))
# print "r", "{0:f}".format((time.time() * 1000)), "{0:f}".format(ingress_time[exec_id])
def printStatistics():
global runtime
e2e_time = []
for packetThread in packet_holder:
for packetTuple in packetThread:
packet = packetTuple[0]
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
# e2e_time.append((packetTuple[1] - ingress_time[exec_id])* 1000)
# data = np.array(e2e_time, dtype=float)
# p50 = np.percentile(data, 50)
# p95 = np.percentile(data, 95)
# p99 = np.percentile(data, 99)
# mean = np.mean(data)
# print("mean \t p50 \t p95 \t p99")
# print(mean, p50, p95, p99)
print("rps", len(e2e_time) / runtime, len(ingress_time))
return 0
ri = []
for i in range(0, 11):
r = threading.Thread(name="receive", target=receive, args=[i])
r.daemon = True
r.start()
ri.append(r)
def signal_handler(sig, frame):
global stop_thread
print "sigint"
stop_thread = True
print "here"
time.sleep(15)
printStatistics()
......@@ -4,7 +4,7 @@
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "10.129.6.5:5984",
"env": "env_udp.js",
"env": "env_cpp.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
......@@ -15,10 +15,10 @@
"network_bridge": "hybrid_kafka-serverless",
"use_bridge": false,
"internal": {
"kafka_host": "10.129.6.5:9092"
"kafka_host": "127.0.0.1:9092"
},
"external": {
"kafka_host": "10.129.6.5:9092"
"kafka_host": "127.0.0.1:29092"
}
},
"topics": {
......
......@@ -4,6 +4,7 @@
"master_address": "localhost",
"grunt_host": "https://www.namandixit.net/lovecraftian_nightmares/grunt",
"couchdb_host": "localhost:5984",
"env": "env_udp.js",
"db": {
"function_meta": "serverless",
"metrics": "metrics",
......@@ -11,13 +12,13 @@
"explicit_chain_meta": "explicit_chain"
},
"network": {
"network_bridge": "hybrid_kafka-serverless",
"network_bridge": "xanadu_kafka-serverless",
"use_bridge": true,
"internal": {
"kafka_host": "kafka:9092"
},
"external": {
"kafka_host": "localhost:29092"
"kafka_host": "10.129.2.201:9092"
}
},
"topics": {
......@@ -38,4 +39,4 @@
"speculative_deployment": true,
"JIT_deployment": true,
"id_size": 20
}
\ No newline at end of file
}
......@@ -2,7 +2,7 @@
// const isolateBackend = require('./isolate')
const fs = require('fs')
const { spawn } = require('child_process');
const constants = require("../constants.json")
const constants = require("../constants_local.json")
const libSupport = require('./lib')
const { Worker, isMainThread, workerData } = require('worker_threads');
const registry_url = constants.registry_url
......@@ -38,7 +38,8 @@ function runIsolate(local_repository, metadata) {
}
function runProcess(local_repository, metadata) {
let port = metadata.port,
console.log("inside run process : ",metadata, local_repository)
let port = metadata.port,
functionHash = metadata.functionHash,
resource_id = metadata.resource_id,
memory = metadata.resources.memory
......@@ -76,7 +77,7 @@ function runContainer(metadata) {
memory = metadata.resources.memory
logger.info(imageName);
console.log('run contianer function : ', metadata, imageName, port, resource_id, memory)
return new Promise((resolve, reject) => {
let timeStart = Date.now()
......@@ -111,8 +112,10 @@ function runContainer(metadata) {
logger.info(`stdout: ${data}`);
let timeDifference = Math.ceil((Date.now() - timeStart))
logger.info("container run time taken: ", timeDifference);
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let _ = spawn('docker', ['start', resource_id])
// let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let add_network = spawn('docker', ['network', 'connect', 'macvlantest', resource_id])
let _ = spawn('docker', ['start', resource_id])
_.on('data', (data) => {
console.log("container started", data);
......@@ -140,7 +143,8 @@ function runContainer(metadata) {
})
} else {
logger.info("container starting at port", port);
logger.info("container starting at port", port,"to check");
console.log(port, "no to check!!")
let process = null;
/**
* create docker on the default bridge
......@@ -162,8 +166,10 @@ function runContainer(metadata) {
/**
* attach smartnic interface
*/
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let _ = spawn('docker', ['start', resource_id])
// let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let add_network = spawn('docker', ['network', 'connect', 'pub_net', resource_id])
let _ = spawn('docker', ['start', resource_id])
_.stdout.on('data', (data) => {
logger.info(data.toString())
......@@ -201,4 +207,4 @@ function runContainer(metadata) {
module.exports.runContainer = runContainer;
module.exports.runProcess = runProcess;
module.exports.runIsolate = runIsolate;
\ No newline at end of file
module.exports.runIsolate = runIsolate;
'use strict';
const constants = require(".././constants.json")
const constants = require(".././constants_local.json")
const secrets = require('./secrets.json')
const config = require('./config.json')
const libSupport = require('./lib')
......@@ -131,6 +131,7 @@ function startWorker(local_repository, producer, metadata) {
}], () => { })
})
else if (runtime === "process")
// console.log("rutime is process : ",metadata)
execute.runProcess(local_repository, metadata)
.catch(err => {
logger.error("=====================deployment failed=========================");
......
......@@ -2,7 +2,7 @@ const fetch = require('node-fetch');
const fs = require('fs');
const process = require('process')
const { spawnSync } = require('child_process');
const constants = require(".././constants.json")
const constants = require(".././constants_local.json")
const kafka = require('kafka-node')
const winston = require('winston')
const { createLogger, format, transports } = winston;
......@@ -24,6 +24,7 @@ function updateConfig() {
file.id = data[data.length - 1]
fs.writeFileSync('./config.json', JSON.stringify(file));
console.log("Updated Config file");
console.log("updateconfig file ", file)
}
function makeTopic(id) {
......
......@@ -5,7 +5,7 @@ const router = express.Router()
const fs = require('fs')
const { spawn } = require('child_process')
const fetch = require('node-fetch')
const constants = require('../constants.json')
const constants = require('../constants_local.json')
const operator = require('./operator')
const sharedMeta = require('./shared_meta')
const util = require('util')
......
......@@ -2,7 +2,7 @@
const express = require('express');
const fileUpload = require('express-fileupload');
const constants = require('.././constants.json');
const constants = require('.././constants_local.json');
const chainHandler = require('./explicit_chain_handler');
const secrets = require('./secrets.json');
const fs = require('fs');
......@@ -14,7 +14,7 @@ const fetch = require('node-fetch');
// const apiSpec = require('./swagger.json');
const util = require('util')
const sharedMeta = require('./shared_meta')
var bodyParser = require('body-parser'); // newcode
const app = express()
const libSupport = require('./lib')
......@@ -58,7 +58,11 @@ app.use(morgan('combined', {
skip: function (req, res) { return res.statusCode < 400 }
}))
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
//app.use(express.bodyParser());//newcode
//app.use(express.urlencoded({ extended: true }));//com
app.use(bodyParser.urlencoded({ extended: false }));//newcode
app.use(bodyParser.json());//newcode
const file_path = __dirname + "/repository/"
app.use('/repository', express.static(file_path)); // file server hosting deployed functions
app.use(fileUpload())
......@@ -79,11 +83,17 @@ app.get('/metrics', (req, res) => {
* REST API to receive deployment requests
*/
app.post('/serverless/deploy', (req, res) => {
console.log("req = "+req+" ** "+req.body.runtime+" ** "+req.body.serverless,req.files,req.files.serverless)//newcode
console.log("res = "+res)//newcode
// console.log("req json = "+JSON.parse(req)) //newcode
console.log("baseurl : ",req.baseUrl)
console.log('Request URL:', req.originalUrl)
let runtime = req.body.runtime
let file = req.files.serverless
console.log("req = "+req)
let functionHash = file.md5
console.log("filepath: ",file_path,"hash: ",functionHash)
file.mv(file_path + functionHash, function (err) { // move function file to repository
functionHash = libSupport.generateExecutor(file_path, functionHash)
......@@ -157,12 +167,16 @@ function deployContainer(path, imageName) {
/**
* Generating dockerfile for the received function
*/
let environmentCopy = ""
if (constants.env === "env_cpp.js")
environmentCopy = "COPY ./worker_env/server /app"
fs.writeFile('./repository/Dockerfile',
`FROM node:latest
WORKDIR /app
COPY ./worker_env/package.json /app
ADD ./worker_env/node_modules /app/node_modules
COPY ${imageName}.js /app
${environmentCopy}
ENTRYPOINT ["node", "${imageName}.js"]`
, function (err) {
if (err) {
......@@ -214,16 +228,18 @@ function deployContainer(path, imageName) {
* REST API to receive execute requests
*/
app.post('/serverless/execute/:id', (req, res) => {
console.log("executing called ", req.params.id, req.body.runtime)
let runtime = req.body.runtime
let id = req.params.id + runtime
res.timestamp = Date.now()
if (functionToResource.has(id)) {
res.start = 'warmstart'
console.log('warmstart')
res.dispatch_time = Date.now()
libSupport.reverseProxy(req, res)
} else {
res.start = 'coldstart'
console.log('coldstart')
/**
* Requests are queued up before being dispatched. To prevent requests coming in for the
* same function from starting too many workers, they are grouped together
......@@ -633,4 +649,4 @@ async function speculative_deployment(req, runtime) {
setInterval(libSupport.metrics.broadcastMetrics, 5000)
// setInterval(autoscalar, 1000);
setInterval(dispatch, 1000);
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
\ No newline at end of file
app.listen(port, () => logger.info(`Server listening on port ${port}!`))
......@@ -4,7 +4,7 @@ const fs = require('fs')
const rp = require('request-promise');
const fetch = require('node-fetch');
const winston = require('winston')
const constants = require('.././constants.json')
const constants = require('.././constants_local.json')
const secrets = require('./secrets.json')
const metrics = require('./metrics')
const sharedMeta = require('./shared_meta')
......@@ -77,6 +77,7 @@ function generateExecutor(functionPath, functionHash) {
* @param {JSON} res Object to use to return the response to the user
*/
async function reverseProxy(req, res) {
console.log("reverseProxy called !!!")
res.reverse_ingress = Date.now()
if (req.headers['x-chain-type'] !== 'explicit' && req.body.type === "tcp")
branchChainPredictor(req)
......@@ -105,6 +106,7 @@ async function reverseProxy(req, res) {
json: true // Automatically stringifies the body to JSON
};
if (req.body.type === "tcp") {
console.log("tcp request to reverseproxy")
try {
let parsedBody = await rp(options)
let serviceTime = Date.now() - res.timestamp
......@@ -149,6 +151,7 @@ async function reverseProxy(req, res) {
logger.error("error" + err)
}
} else if (req.body.type === "udp") {
console.log("udp request to reverseproxy")
let request_id = Math.floor(Math.random() * 1000)
req.body.request_id = request_id
// res.request_id = request_id
......@@ -166,7 +169,7 @@ async function reverseProxy(req, res) {
})
res.pack_time = Date.now()
udpProxy.send(packet, 0, packet.length, resource.port, resource.node_id, function (err, bytes) {
// logger.info(`forwarded request via UDP, IP 192.168.2.5 Port ${resource.port}`)
logger.info(`forwarded request via UDP, IP 192.168.2.5 Port ${resource.port}`)
res.send_time = Date.now()
})
}
......@@ -419,30 +422,32 @@ udpProxy.on('error', (err) => {
udpProxy.on('message', (msg, rinfo) => {
let result = unpackPacket(msg)
console.log("received request , result = ", result, "requestflighqueu = ", requestFlightQueue)
let res = requestFlightQueue.get(result.exec_id)
res.json(result)
console.log("resource_lookup",
res.dispatch_time - res.timestamp,
"reverse_proxy_call",
res.reverse_ingress - res.dispatch_time,
"metadata_lookup",
res.lookup_time - res.reverse_ingress,
"data_set_time",
res.data_set_time - res.lookup_time,
"pack_time",
res.pack_time - res.data_set_time,
"network_send",
res.send_time - res.pack_time,
"total_dispatch_delay",
res.send_time - res.timestamp,
"E2E time:",
Date.now() - res.timestamp
)
console.log("res = ",res)
// res.json(result)
// console.log("resource_lookup",
// res.dispatch_time - res.timestamp,
// "reverse_proxy_call",
// res.reverse_ingress - res.dispatch_time,
// "metadata_lookup",
// res.lookup_time - res.reverse_ingress,
// "data_set_time",
// res.data_set_time - res.lookup_time,
// "pack_time",
// res.pack_time - res.data_set_time,
// "network_send",
// res.send_time - res.pack_time,
// "total_dispatch_delay",
// res.send_time - res.timestamp,
// "E2E time:",
// Date.now() - res.timestamp
// )
});
udpProxy.on('listening', () => {
const address = udpProxy.address();
console.log(`server listening ${address.address}:${address.port}`);
console.log(`udp server listening ${address.address}:${address.port}`);
});
async function fetchData(url, data = null) {
......@@ -525,4 +530,4 @@ udpProxy.bind(constants.master_port); // starting UDP server for offloaded endpo
getPort, logger, compare,
logBroadcast, fetchData, metrics,
producer
}
\ No newline at end of file
}
'use strict';
const constants = require('.././constants.json');
const constants = require('.././constants_local.json');
const secrets = require('./secrets.json')
const fetch = require('node-fetch');
const util = require('util')
......@@ -198,4 +198,4 @@ async function fetchData(functionHash, metric, runtime) {
module.exports = {
collectMetrics, broadcastMetrics, register
}
\ No newline at end of file
}
......@@ -707,7 +707,6 @@ $(OUTDIR)/pif_app_nfd.list/pif_app_nfd.list: $(SDKP4DIR)/components/nfp_pif/me/a
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_flcalc_algorithms.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_memops.c \
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c \
p4src/prime.c \
$(PIFOUTDIR)/pif_design.h \
$(MAKEFILE_LIST)
@echo ---------
......@@ -816,8 +815,7 @@ $(OUTDIR)/pif_app_nfd.list/pif_app_nfd.list: $(SDKP4DIR)/components/nfp_pif/me/a
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_init.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_flcalc_algorithms.c \
$(SDKP4DIR)/components/nfp_pif/me/lib/pif/src/pif_memops.c \
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c \
p4src/prime.c
$(SDKP4DIR)/components/dcfl/me/lib/dcfl/libdcfl.c
#
# APP_MASTER
......
......@@ -3,23 +3,51 @@ sudo ifconfig vf0_0 down
sudo ifconfig vf0_0 hw ether 00:11:11:11:11:11
sudo ifconfig vf0_1 down
sudo ifconfig vf0_1 hw ether 00:22:22:22:22:22
# sudo ifconfig vf0_2 down
# sudo ifconfig vf0_2 hw ether 00:33:33:33:33:33
sudo ifconfig vf0_0 192.168.2.2/24 up
sudo ifconfig vf0_1 192.168.2.3/24 up
# sudo ifconfig vf0_2 192.168.2.4/24 up
# create a MAC VLAN for docker attached to vf0_1
echo "y" | docker system prune
docker network create -d macvlan --subnet=192.168.2.0/24 --aux-address="vf0_0=192.168.2.2" --aux-address="vf0_1=192.168.2.3" -o parent=vf0_1 pub_net
docker network create -d macvlan --subnet=192.168.2.0/24 --aux-address="vf0_0=192.168.2.2" --aux-address="vf0_1=192.168.2.3" --aux-address="vf0_2=192.168.2.4" -o parent=vf0_1 pub_net
move vf0_0 into its own namespace
sudo ip netns exec ns_server ip link set vf0_0 netns 1
# move vf0_0 into its own namespace
# sudo ip netns exec ns_server ip link set vf0_0 netns 1
sudo ip netns delete ns_server
sudo ip netns add ns_server
sudo ip link set vf0_0 netns ns_server
sudo ip netns exec ns_server ip addr add dev vf0_0 192.168.2.2/24
sudo ip netns exec ns_server ip link set dev vf0_0 up
sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_0
# sudo ip link set vf0_2 netns ns_server
# sudo ip netns exec ns_server ip addr add dev vf0_2 192.168.2.4/24
# sudo ip netns exec ns_server ip link set dev vf0_2 up
# sudo ip netns exec ns_server arp -s 192.168.2.3 00:22:22:22:22:22 -i vf0_2
# sudo ip netns exec ns_server arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_2
sudo arp -s 192.168.2.2 00:11:11:11:11:11 -i vf0_1
# sudo arp -s 192.168.2.4 00:33:33:33:33:33 -i vf0_1
sudo ip netns exec ns_server ethtool --offload vf0_2 rx off tx off
sudo ethtool --offload vf0_1 rx off tx off
sudo ifconfig vf0_1 mtu 9000
sudo ip netns exec ns_server ifconfig vf0_2 mtu 9000
# ip link add vethd1 type veth peer name br-vethd1
# ip link set vethd1 netns ns_server
# ip netns exec ns_server ip addr add 192.168.1.11/24 dev vethd1
# Create a bridge "bridgek0"
# sudo brctl addbr bridgek0
# sudo brctl addif bridgek0 br-vethd1
# sudo ip netns exec ns_server ip addr add 10.129.6.6/24 dev vethd1
# sudo ip netns exec ns_server ip link set vethd1 up
# sudo ip link set br-vethd1 up
# sudo ip addr add 10.129.6.5/24 dev bridgek0
# sudo ip link set bridgek0 up
......@@ -15,7 +15,7 @@ done
if [[ $compile_flag -eq 1 ]]
then
# compile the nfp code
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp -c ./p4src/prime.c
sudo /opt/netronome/p4/bin/nfp4build -o ./p4src/orchestrator.nffw -p ./p4src/out -4 ./p4src/orchestrator.p4 -l lithium --nfp4c_p4_version 16 --nfirc_mac_ingress_timestamp --nfp4c_p4_compiler p4c-nfp
fi
if [[ $offload_flag -eq 1 ]]
......
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
......@@ -80,10 +80,10 @@
"action": {
"type" : "ingress::dispatch_act",
"data" : {
"dstAddr" : { "value" : "192.168.2.2" },
"dstPort" : { "value" : "8081" },
"dstAddr" : { "value" : "192.168.2.3" },
"dstPort" : { "value" : "8080" },
"egress_port" : { "value" : "v0.1" },
"ethernetAddr" : { "value" : "02:42:c0:a8:02:06" }
"ethernetAddr" : { "value" : "00:22:22:22:22:22" }
}
},
"name": "default"
......
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
#define REPLY_PORT 9000
#define SPEEDO_REPLY_PORT 9090
#define DISPATCHER_PORT 8000
#define MDS_PORT 8000
#define NUM_CACHE 128
......
......@@ -11,6 +11,7 @@ struct ingress_metadata_t {
register< bit<8>>(16384) current_state;
register< bit<8>>(16384) dispatch_state;
register< bit<8>>(16384) batch;
header ethernet_t {
bit<48> dstAddr;
......@@ -40,10 +41,24 @@ header udp_t {
bit<16> checksum;
}
header tcp_t {
bit<16> src_port;
bit<16> dst_port;
bit<32> seq_no;
bit<32> ack_no;
bit<4> data_offset;
bit<3> res;
bit<3> ecn;
bit<6> ctrl;
bit<16> window;
bit<16> checksum;
bit<16> urgent_ptr;
}
header map_hdr_t {
bit<32> chain_id;
bit<32> exec_id;
bit<32> function_id;
bit<8> function_id;
bit<32> data;
bit<8> function_count;
bit<8> f0;
......@@ -51,6 +66,7 @@ header map_hdr_t {
bit<8> f2;
bit<8> f3;
bit<8> f4;
// bit<8> batch_count;
}
struct exec_hdr_t {
......@@ -74,6 +90,8 @@ struct headers {
ipv4_t ipv4;
@name(".udp")
udp_t udp;
// @name(".tcp")
// tcp_t tcp;
@name(".map_hdr")
map_hdr_t map_hdr;
}
\ No newline at end of file
......@@ -4,6 +4,7 @@ parser ParserImpl(packet_in packet, out headers hdr, inout metadata meta, inout
packet.extract<ipv4_t>(hdr.ipv4);
transition select(hdr.ipv4.fragOffset, hdr.ipv4.ihl, hdr.ipv4.protocol) {
(13w0x0 &&& 13w0x0, 4w0x5 &&& 4w0xf, 8w0x11 &&& 8w0xff): parse_udp;
// (13w0x0 &&& 13w0x0, 4w0x5, 8w0x6): parse_tcp;
default: accept;
}
}
......@@ -12,11 +13,24 @@ parser ParserImpl(packet_in packet, out headers hdr, inout metadata meta, inout
packet.extract<udp_t>(hdr.udp);
transition select(hdr.udp.dstPort) {
DISPATCHER_PORT: parse_map_hdr;
16w9001: parse_map_hdr;
16w9002: parse_map_hdr;
16w9003: parse_map_hdr;
16w9004: parse_map_hdr;
16w9005: parse_map_hdr;
16w9006: parse_map_hdr;
16w9007: parse_map_hdr;
SPEEDO_REPLY_PORT: parse_map_hdr;
REPLY_PORT: parse_map_hdr;
default: accept;
}
}
// state parse_tcp {
// packet.extract(hdr.tcp);
// transition accept;
// }
@name(".parse_map_hdr") state parse_map_hdr {
packet.extract(hdr.map_hdr);
transition accept;
......
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
......@@ -53,7 +53,7 @@
"fields" : [
["chain_id", 32, false],
["exec_id", 32, false],
["function_id", 32, false],
["function_id", 8, false],
["data", 32, false],
["function_count", 8, false],
["f0", 8, false],
......@@ -258,6 +258,46 @@
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x2329",
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x232a",
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x232b",
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x232c",
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x232d",
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x232e",
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x232f",
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x2382",
"mask" : null,
"next_state" : "parse_map_hdr"
},
{
"value" : "0x2328",
"mask" : null,
......@@ -277,7 +317,7 @@
],
"source_info" : {
"filename" : "p4src/includes/parsers.p4",
"line" : 11,
"line" : 12,
"column" : 30,
"source_fragment" : "parse_udp"
}
......@@ -306,7 +346,7 @@
"transition_key" : [],
"source_info" : {
"filename" : "p4src/includes/parsers.p4",
"line" : 20,
"line" : 34,
"column" : 34,
"source_fragment" : "parse_map_hdr"
}
......@@ -345,7 +385,7 @@
],
"source_info" : {
"filename" : "p4src/includes/parsers.p4",
"line" : 25,
"line" : 39,
"column" : 26,
"source_fragment" : "start"
}
......@@ -392,6 +432,18 @@
},
"size" : 16384,
"bitwidth" : 8
},
{
"name" : "batch",
"id" : 2,
"source_info" : {
"filename" : "p4src/includes/headers.p4",
"line" : 14,
"column" : 25,
"source_fragment" : "batch"
},
"size" : 16384,
"bitwidth" : 8
}
],
"calculations" : [
......
......@@ -37,7 +37,7 @@
{
"id": 1,
"name": "map_hdr.function_id",
"bitwidth": 32,
"bitwidth": 8,
"matchType": "EXACT"
}
],
......
......@@ -62,7 +62,7 @@ map_hdr:
fields:
- chain_id: 32
- exec_id: 32
- function_id: 32
- function_id: 8
- data: 32
- function_count: 8
- f0: 8
......@@ -105,6 +105,13 @@ udp:
# Register definitions #
##########################################
batch:
class: global
fields:
- value: 8
instance_count: 16384
type: register
current_state:
class: global
fields:
......@@ -176,7 +183,7 @@ parse_ipv4:
parse_map_hdr:
implementation: extract(map_hdr);
src_filename: p4src/includes/parsers.p4
src_lineno: 20
src_lineno: 34
type: parse_state
parse_udp:
......@@ -184,7 +191,7 @@ parse_udp:
select_value:
- udp.dstPort
src_filename: p4src/includes/parsers.p4
src_lineno: 11
src_lineno: 12
type: parse_state
start:
......@@ -192,7 +199,7 @@ start:
select_value:
- ethernet.etherType
src_filename: p4src/includes/parsers.p4
src_lineno: 25
src_lineno: 39
type: parse_state
......@@ -209,8 +216,16 @@ parser:
parse_ipv4 -> parse_udp [value="0x00000511", mask="0x00000fff", order="0"]
parse_ipv4 -> exit [value="default", mask="none", order="1"]
parse_udp -> parse_map_hdr [value="0x1f40", mask="none", order="0"]
parse_udp -> parse_map_hdr [value="0x2328", mask="none", order="1"]
parse_udp -> exit [value="default", mask="none", order="2"]
parse_udp -> parse_map_hdr [value="0x2329", mask="none", order="1"]
parse_udp -> parse_map_hdr [value="0x232a", mask="none", order="2"]
parse_udp -> parse_map_hdr [value="0x232b", mask="none", order="3"]
parse_udp -> parse_map_hdr [value="0x232c", mask="none", order="4"]
parse_udp -> parse_map_hdr [value="0x232d", mask="none", order="5"]
parse_udp -> parse_map_hdr [value="0x232e", mask="none", order="6"]
parse_udp -> parse_map_hdr [value="0x232f", mask="none", order="7"]
parse_udp -> parse_map_hdr [value="0x2382", mask="none", order="8"]
parse_udp -> parse_map_hdr [value="0x2328", mask="none", order="9"]
parse_udp -> exit [value="default", mask="none", order="10"]
parse_map_hdr -> exit [value="default", mask="none", order="0"]
}
start_state: start
......@@ -362,7 +377,7 @@ layout:
##########################################
source_info:
date: 2021/04/23 22:03:40
date: 2021/09/11 07:07:44
output_file: p4src/orchestrator.yml
p4_version: '16'
source_files:
......
#include <core.p4>
#define V1MODEL_VERSION 20200408
#include <v1model.p4>
#include "includes/defines.p4"
#include "includes/headers.p4"
#include "includes/parsers.p4"
extern void prime();
control ingress(inout headers hdr, inout metadata meta, inout standard_metadata_t standard_metadata) {
@name(".fwd_act") action fwd_act(bit<16> port) {
standard_metadata.egress_spec = port;
}
@name(".fwd") table fwd {
actions = {
fwd_act;
}
key = {
standard_metadata.ingress_port : exact;
}
}
@name(".dispatch_act") action dispatch_act(bit<32> dstAddr, bit<16> dstPort, bit<48> ethernetAddr , bit<16> egress_port) {
hdr.ipv4.dstAddr = dstAddr;
hdr.udp.dstPort = dstPort;
hdr.ethernet.dstAddr = ethernetAddr;
}
@name(".dispatch") table dispatch {
actions = {
dispatch_act;
}
key = {
hdr.map_hdr.function_id : exact;
}
}
@name("circleBack") action circleBack() {
bit<32> tempAddr = hdr.ipv4.dstAddr;
hdr.ipv4.dstAddr = hdr.ipv4.srcAddr;
hdr.ipv4.srcAddr = tempAddr;
bit<16> tempPort = hdr.udp.dstPort;
hdr.udp.dstPort = hdr.udp.srcPort;
hdr.udp.srcPort = tempPort;
bit<48> tempEth = hdr.ethernet.dstAddr;
hdr.ethernet.dstAddr = hdr.ethernet.srcAddr;
hdr.ethernet.srcAddr = tempEth;
standard_metadata.egress_spec = standard_metadata.ingress_port;
hdr.ipv4.ttl = hdr.ipv4.ttl - 8w1;
}
apply {
if (hdr.ipv4.isValid() && hdr.udp.dstPort == DISPATCHER_PORT) {
dispatch.apply();
fwd.apply();
} else if (hdr.ipv4.isValid() && hdr.udp.dstPort == 9001 || hdr.udp.dstPort == 9002 ||
hdr.udp.dstPort == 9003 || hdr.udp.dstPort == 9004 || hdr.udp.dstPort == 9005 || hdr.udp.dstPort == 9006 || hdr.udp.dstPort == 9007 ) {
// if (hdr.map_hdr.function_count != 8w0) {
clone3(CloneType.I2E, 32w1, standard_metadata);
// }
// hdr.map_hdr.function_count = 8w0;
fwd.apply();
} else if (hdr.udp.dstPort == SPEEDO_REPLY_PORT) {
if (hdr.map_hdr.function_id < hdr.map_hdr.function_count) {
hdr.map_hdr.function_id = hdr.map_hdr.function_id + 8w1;
circleBack();
clone3(CloneType.I2E, 32w1, standard_metadata);
} else {
fwd.apply();
}
} else {
fwd.apply();
}
}
}
control egress(inout headers hdr, inout metadata meta, inout standard_metadata_t standard_metadata) {
// @name(".ethernet_set_mac_act") action ethernet_set_mac_act(bit<48> smac, bit<48> dmac) {
// hdr.ethernet.srcAddr = smac;
// hdr.ethernet.dstAddr = dmac;
// }
// @name(".ethernet_set_mac") table ethernet_set_mac {
// actions = {
// ethernet_set_mac_act;
// }
// key = {
// standard_metadata.egress_port: exact;
// }
// }
@name("fix_checksum") action fix_checksum() {
hdr.udp.checksum = 16w0;
}
apply {
// if (hdr.udp.dstPort == MDS_PORT) {
// ethernet_set_mac.apply();
// }
if (hdr.udp.dstPort == DISPATCHER_PORT || hdr.udp.dstPort == 9001 || hdr.udp.dstPort == 9002 ||
hdr.udp.dstPort == 9003 || hdr.udp.dstPort == 9004 || hdr.udp.dstPort == 9005 || hdr.udp.dstPort == 9006 || hdr.udp.dstPort == 9007 ) {
fix_checksum();
}
}
}
control DeparserImpl(packet_out packet, in headers hdr) {
apply {
packet.emit<ethernet_t>(hdr.ethernet);
packet.emit<ipv4_t>(hdr.ipv4);
// packet.emit<tcp_t>(hdr.tcp);
packet.emit<udp_t>(hdr.udp);
packet.emit<map_hdr_t>(hdr.map_hdr);
}
}
control verifyChecksum(inout headers hdr, inout metadata meta) {
apply {
verify_checksum(
hdr.ipv4.isValid(),
{ hdr.ipv4.version,
hdr.ipv4.ihl,
hdr.ipv4.diffserv,
hdr.ipv4.totalLen,
hdr.ipv4.identification,
hdr.ipv4.flags,
hdr.ipv4.fragOffset,
hdr.ipv4.ttl,
hdr.ipv4.protocol,
hdr.ipv4.srcAddr,
hdr.ipv4.dstAddr },
hdr.ipv4.hdrChecksum,
HashAlgorithm.csum16);
}
}
control computeChecksum(inout headers hdr, inout metadata meta) {
apply {
update_checksum(
hdr.ipv4.isValid(),
{ hdr.ipv4.version,
hdr.ipv4.ihl,
hdr.ipv4.diffserv,
hdr.ipv4.totalLen,
hdr.ipv4.identification,
hdr.ipv4.flags,
hdr.ipv4.fragOffset,
hdr.ipv4.ttl,
hdr.ipv4.protocol,
hdr.ipv4.srcAddr,
hdr.ipv4.dstAddr },
hdr.ipv4.hdrChecksum,
HashAlgorithm.csum16);
}
}
V1Switch<headers, metadata>(ParserImpl(), verifyChecksum(), ingress(), egress(), computeChecksum(), DeparserImpl()) main;
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
......@@ -576,7 +576,7 @@
],
"source_info" : {
"filename" : "p4src/includes/parsers.p4",
"line" : 11,
"line" : 12,
"column" : 30,
"source_fragment" : "parse_udp"
}
......@@ -605,7 +605,7 @@
"transition_key" : [],
"source_info" : {
"filename" : "p4src/includes/parsers.p4",
"line" : 20,
"line" : 26,
"column" : 34,
"source_fragment" : "parse_map_hdr"
}
......@@ -644,7 +644,7 @@
],
"source_info" : {
"filename" : "p4src/includes/parsers.p4",
"line" : 25,
"line" : 31,
"column" : 26,
"source_fragment" : "start"
}
......
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
......@@ -265,7 +265,7 @@ parse_ipv4:
parse_map_hdr:
implementation: extract(map_hdr);
src_filename: p4src/includes/parsers.p4
src_lineno: 20
src_lineno: 26
type: parse_state
parse_udp:
......@@ -273,7 +273,7 @@ parse_udp:
select_value:
- udp.dstPort
src_filename: p4src/includes/parsers.p4
src_lineno: 11
src_lineno: 12
type: parse_state
start:
......@@ -281,7 +281,7 @@ start:
select_value:
- ethernet.etherType
src_filename: p4src/includes/parsers.p4
src_lineno: 25
src_lineno: 31
type: parse_state
......@@ -761,7 +761,7 @@ layout:
##########################################
source_info:
date: 2021/03/29 12:08:35
date: 2021/05/19 23:06:24
output_file: p4src/test.yml
p4_version: '16'
source_files:
......
......@@ -35,7 +35,7 @@ while True:
packet, addr = s.recvfrom(1024)
# print packet
# print packet
print "packet : ",packet
counter = counter + 1
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
......
......@@ -37,6 +37,7 @@ PORT = args.client_port
dataInt = args.send_data
fid = args.fid
SERVER_IP = "192.168.2.3"
#SERVER_IP = "10.129.2.201"
egress_time = []
ingress_time = []
......@@ -46,6 +47,7 @@ stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
# CLIENT_IP = "10.129.2.201"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 8080))
print "listening to {} at port {}".format(CLIENT_IP, 8080)
......@@ -53,9 +55,10 @@ def receive():
while True:
if stop_thread:
print "stop_thread=true breaking!!!"
break
packet, addr = s.recvfrom(1024)
# print packet
print "receive ",packet,addr
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
......@@ -67,12 +70,13 @@ def receive():
base += 4
function_count = struct.unpack("B", packet[base])[0]
print "rec", chain_id, exec_id, data, function_id, function_count
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 16384)
exec_id = random.randint(0, 2 ** 30)
chain_id = 1
# data = 100
function_count = 5
......@@ -82,8 +86,8 @@ def genPacket():
f2 = 2
f3 = 0
f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
print "genp_Data : ",chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
offload_status = False
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
......@@ -108,7 +112,7 @@ def genPacket():
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id + data + function_count + \
f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
print "return genp : ",dataInt, offload_status
return packet, offload_status
......@@ -116,9 +120,10 @@ def sendThread(start_time, runtime, sleep_time, s):
global ingress_time
while True:
packet, offload_status = genPacket()
print "packet : ",packet
if time.time() - start_time > runtime:
break
print "sendthread sendto"
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
......@@ -132,11 +137,12 @@ def send():
# op = struct.unpack("B", packet[0])
packet, _ = genPacket()
print "spack : ",packet
if args.req_count is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
# print "%.20f" % time.time()
print "send %.20f" % time.time()
# time.sleep(2)
# break
......
echo $1
python2 send.py --client-port 8000 --closed 1 --offload 0 --rps 1 --send-data 0 --closed 1 --fid $1
\ No newline at end of file
# python2 send.py --client-port 8000 --closed 1 --offload 0 --req-count 50 --send-data 10 --fid $1
sudo ip netns exec ns_server python2 send.py --client-port 8000 --closed 0 --offload 0 --req-count 10 --send-data 0 --fid $1
import socket
import struct
import time
import threading
import random
import time
import argparse
parser = argparse.ArgumentParser(description='Mininet demo')
parser.add_argument('--client-port', help='Port of client',
type=int, action="store", required=True)
parser.add_argument('--send-data', help='Data to send',
type=int, action="store", required=False)
parser.add_argument('--fid', help='Funtion id',
type=int, action="store", required=False)
parser.add_argument('--closed', help='Closed loop',
type=int, action="store", required=True)
group = parser.add_mutually_exclusive_group(required=True)
# group.add_argument('--bandwidth', help='Bandwidth',
# type=int, action="store")
group.add_argument('--rps', help='Requests per second',
type=int, action="store")
group.add_argument('--req-count', help='Number of requests to send',
type=int, action="store")
parser.add_argument('--offload', help='offload a portion of workloads',
type=float, action="store")
args = parser.parse_args()
print args.send_data
PORT = args.client_port
dataInt = args.send_data
fid = args.fid
SERVER_IP = "192.168.2.3"
egress_time = []
ingress_time = []
stop_thread = False
def receive():
global egress_time, stop_thread
CLIENT_IP = "0.0.0.0"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((CLIENT_IP, 8080))
print "listening to {} at port {}".format(CLIENT_IP, 8080)
run_status = {}
while True:
if stop_thread:
break
packet, addr = s.recvfrom(1024)
# print packet
base = 0
chain_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
exec_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_id = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
data = struct.unpack(">I", packet[base:base + 4])[0]
base += 4
function_count = struct.unpack("B", packet[base])[0]
print "rec", chain_id, exec_id, data, function_id, function_count
def genPacket():
global fid
packet = None
exec_id = random.randint(0, 16384)
chain_id = 1
# data = 100
function_count = 5
function_id = fid if (fid) else 1
f0 = 0
f1 = 1
f2 = 2
f3 = 0
f4 = 0
print chain_id, exec_id, "function_id", function_id, function_count, \
f0, f1, f2, f3, f4
offload_status = False
chain_id = struct.pack(">I", chain_id) # chain id
exec_id = struct.pack(">I", exec_id) # execution id
max_workload = 100
dataInt = random.randint(1, max_workload)
if args.offload is not None:
cutoff = max_workload * args.offload
if dataInt <= cutoff:
data = struct.pack(">I", dataInt * 256) # data
offload_status = True
else:
data = struct.pack(">I", dataInt) # data
else:
data = struct.pack(">I", dataInt) # data
# print "{0:b}".format(data)
function_count = struct.pack("B", function_count) # function count
function_id = struct.pack(">I", function_id) # function count (changed to byte for test was >I)
f0 = struct.pack("B", f0) # f0
f1 = struct.pack("B", f1) # f1
f2 = struct.pack("B", f2) # f2 -> f0
f3 = struct.pack("B", f3) # f3 -> f1 f2
f4 = struct.pack("B", f4) # f4 -> f3
packet = chain_id + exec_id + function_id + data + function_count + \
f0 + f1 + f2 + f3 + f4
# print dataInt, offload_status
return packet, offload_status
def sendThread(start_time, runtime, sleep_time, s):
global ingress_time
while True:
packet, offload_status = genPacket()
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
def send():
global egress_time, ingress_time
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print "Sending packet to %s at port %s" % (SERVER_IP, PORT)
print "chain id, exec id, data, function count, functions dependencies..."
# op = struct.unpack("B", packet[0])
packet, _ = genPacket()
if args.req_count is not None:
for i in range(args.req_count):
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
# print "%.20f" % time.time()
# time.sleep(2)
# break
elif args.offload is None and args.rps is not None:
runtime = 10
start_time = time.time()
sleep_time = 1 / float(args.rps)
print "calculated inter-arrival time", sleep_time
while True:
if time.time() - start_time > runtime:
break
s.sendto(packet, (SERVER_IP, PORT))
ingress_time.append(time.time())
time.sleep(sleep_time)
elif args.offload is not None:
runtime = 10
thread_count = 8
start_time = time.time()
sleep_time = 1 / float(args.rps) * thread_count
print "calculated inter-arrival time, offload mode", sleep_time
for i in range(thread_count):
t = threading.Thread(target=sendThread, args=[
start_time, runtime, sleep_time, s])
t.daemon = True
t.start()
time.sleep(runtime)
stop_thread = True
# s.sendto(packet, (SERVER_IP, PORT))
# r.join()
# r.join()
if args.closed == 1:
r = threading.Thread(name="receive", target=receive)
r.daemon = True
r.start()
time.sleep(1)
send()
#! /bin/bash -ex
#!
/bin/bash -ex
start_docker=0
copy_send=0
......
......@@ -3,8 +3,10 @@ import json, ast
from RTEInterface import RTEInterface
from kafka import KafkaConsumer
#consumer = KafkaConsumer('deployed', 'removeWorker',
# "request", bootstrap_servers='10.129.6.5:9092')
consumer = KafkaConsumer('deployed', 'removeWorker',
"request", bootstrap_servers='10.129.6.5:9092')
"request", bootstrap_servers='localhost:9092')
RTEInterface.Connect('thrift', "10.129.2.201", 20206)
tableId = "ingress::dispatch"
......
......@@ -136,20 +136,27 @@ server.on('error', (err) => {
server.close();
});
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1)) + min;
}
server.on('message', (msg, rinfo) => {
// console.log("message", msg)
console.log("message", msg)
let payload = unpackPacket(msg)
// console.log(payload, typeof payload);
console.log(payload, typeof payload);
lastRequest = Date.now()
// console.log("network stack time", lastRequest - payload.t1)
console.log("network stack time", lastRequest - payload.t1)
totalRequest++
executor(payload).then(result => {
result = packPacket(payload)
executor(msg).then(result => {
result = packPacket(msg)
let port = 10000 + getRandomInt(0, 10)
try {
udpProxy.send(result, 0, result.length, "8080", rinfo.address, function (err, bytes) {
udpProxy.send(msg, 0, msg.length, port, rinfo.address, function (err, bytes) {
if (err)
console.log(err)
// console.log("response via UDP")
console.log("response via UDP")
})
} catch (e) {
console.log(e)
......@@ -193,7 +200,7 @@ function unpackPacket(packet) {
t4 = struct.Unpack("I", packet, base)
// console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
console.log("chain_id", chain_id, "exec_id", exec_id, "data", data, "function_count", function_count, "function_id", function_id)
return {
......
let workerNodes = {}, timeline = {}
const constants = require('../constants.json')
const constants = require('../constants_local.json')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({
......@@ -21,10 +21,21 @@ function getAddress() {
return Object.keys(workerNodes)[0];
}
// payloads = [
// { topic: 'heartbeat', messages: 'hi', partition: 0 }
// ];
// producer.on('ready', function () {
// producer.send(payloads, function (err, data) {
// console.log(data);
// });
// });
consumer.on('message', function (message) {
let topic = message.topic
message = message.value
// console.log("message ",message)
if (topic !== "heartbeat")
console.log(message);
if (topic === "heartbeat") {
......
const secrets = require('./secrets.json')
const constants = require('.././constants.json')
const constants = require('.././constants_local.json')
let db = new Map(), // queue holding request to be dispatched
resourceMap = new Map(), // map between resource_id and resource details like node_id, port, associated function etc
......@@ -17,6 +17,8 @@ let db = new Map(), // queue holding request to be dispatched
let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metadataDB = metadataDB + "/" + constants.db.function_meta + "/"
console.log("metadata : "+metadataDB)
let metricsDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
metricsDB = metricsDB + "/" + constants.db.metrics + "/"
......@@ -30,4 +32,4 @@ module.exports = {
db, functionBranchTree, functionToResource, workerNodes, resourceMap,
conditionProbabilityExplicit, requestFlightQueue,
metadataDB, metricsDB, implicitChainDB, explicitChainDB
}
\ No newline at end of file
}
const constants = require('./constants.json')
const constants = require('./constants_local.json')
const util = require('util')
let kafka = require('kafka-node'),
Producer = kafka.Producer,
......@@ -17,4 +17,4 @@ consumer.on('message', function (message) {
message = JSON.parse(message.value)
console.log(util.inspect(message, false, null, true /* enable colors */))
})
\ No newline at end of file
})
const constants = require('./constants.json')
const constants = require('./constants_local.json')
var kafka = require('kafka-node');
let client = new kafka.KafkaClient({
......@@ -12,4 +12,4 @@ for (const [key, value] of Object.entries(constants.topics)) {
client.createTopics(topicsToCreate, (error, result) => {
console.log("topic created", result);
});
\ No newline at end of file
});
......@@ -31,6 +31,7 @@ services:
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
......
version: '2'
networks:
kafka-serverless:
driver: bridge
services:
zookeeper:
image: 'bitnami/zookeeper:3'
restart: unless-stopped
networks:
- kafka-serverless
ports:
- '2182:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2'
restart: unless-stopped
networks:
- kafka-serverless
ports:
- '9093:9092'
- '29092:29092'
volumes:
- 'kafka_data:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,PLAINTEXT_HOST://127.0.0.1:29092
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
console.log('running test1')
console.log('running test1')
console.log('running test1')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment