Commit b0f8fff0 authored by Santhosh Kumar's avatar Santhosh Kumar

cleanup code

parent 66f1e195
FROM python:3.8-slim
# Install any dependencies your function may have
# For example, if you're using Flask:
# RUN pip install flask
# Copy your function code into the container
COPY hello.py /home/app/
# Set working directory
WORKDIR /home/app/
# Define the command to run your function
CMD ["python", "hello.py"]
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/invoke', methods=['POST'])
def invoke_function():
# Parse request data
data = request.json
# Extract parameters or payload from the request
param1 = data.get('param1')
param2 = data.get('param2')
# Call your deployed function with the provided parameters
result = your_deployed_function(param1, param2)
# Prepare response
response = {
"result": result
}
return jsonify(response)
def your_deployed_function(param1, param2):
# Your logic to process the parameters and perform some computation
# This function represents your deployed function
return param1 + param2
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
service: hello
provider:
name: kubeless
runtime: python3.9
plugins:
- serverless-kubeless
functions:
hello:
description: 'Hello function'
handler: handler.hello
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
spec:
selector:
matchLabels:
app: nginx
replicas: 2 # tells deployment to run 2 pods matching the template
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
ssh cs695@http://10.129.131.202
passwd:1234
sudo vi /etc/nginx/conf.d/loadbalancer.conf
systemctl restart nginx
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "bXNhbnRoOmRja3JfcGF0XzZJcmpxNTdhazk1Q3I1R054YkZwZTlDeFFGYw=="
}
}
}
\ No newline at end of file
#!pip install kubernetes
from kubernetes import client, config
import time
class FunctionController:
def __init__(self):
# Load Kubernetes configuration
config.load_kube_config()
# Initialize Kubernetes API client
self.k8s_client = client.CoreV1Api()
def create_function(self, function_name, image, replicas=1):
# Define function deployment object
deployment = client.V1Deployment()
deployment.metadata = client.V1ObjectMeta(name=function_name)
deployment.spec = client.V1DeploymentSpec(
replicas=replicas,
selector=client.V1LabelSelector(
match_labels={"app": function_name}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": function_name}),
spec=client.V1PodSpec(
containers=[client.V1Container(
name=function_name,
image=image,
ports=[client.V1ContainerPort(container_port=8080)]
)]
)
)
)
# Create function deployment
self.k8s_client.create_namespaced_deployment(namespace='default', body=deployment)
def scale_function(self, function_name, replicas):
# Get function deployment
deployment = self.k8s_client.read_namespaced_deployment(name=function_name, namespace='default')
# Update replicas
deployment.spec.replicas = replicas
# Apply changes
self.k8s_client.patch_namespaced_deployment(name=function_name, namespace='default', body=deployment)
if __name__ == "__main__":
controller = FunctionController()
# Example: Create a function
controller.create_function(function_name="hello-python", image="your-registry/hello-python:latest")
# Example: Scale a function
controller.scale_function(function_name="hello-python", replicas=3)
# Keep the controller running
while True:
time.sleep(60) # Sleep to prevent high CPU usage
apiVersion: v1
kind: Service
metadata:
name: hello-python-service
spec:
selector:
app: hello-python
ports:
- protocol: "TCP"
port: 6000
targetPort: 5000
type: LoadBalancer
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: hello-python
spec:
replicas: 4
template:
metadata:
labels:
app: hello-python
spec:
containers:
- name: hello-python
image: msanth/example1:latest
imagePullPolicy: Never
ports:
- containerPort: 5000
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: example-ingress
spec:
rules:
- host: exmample1-service.com
http:
paths:
- path: /example1
pathType: Prefix
backend:
service:
name: example1-service
port:
number: 8080
-----BEGIN PGP PUBLIC KEY BLOCK-----
mQGNBGYuJHcBDAC3RyCWZT3ZeFpMIfpP3OTy1T99bgCG7ZTZ2IyYncQ1RWDTVY2B
uyB7y55wonqqemehs7mh721iwrX2oOgvxkV/6VV2ibQvMq9Qy4GR66vSbeLdumpN
Hux7sQV6RCahtYauNvfOxexuRZHEidNZXJXNWebI+u/SUSctH7yggpW67xKx3g92
XwEixd8MnIUjAyi96CiAjU20fnMuBNH7MMY+pGR9sO9sHnsvrsiu7KVdRmRNeenF
579Uh3Dko9qyhVQn8ROJ1/wngTTqV3MwW1nWowaaJRAsx7X9uYjddZabc8qAskdd
ip+oC8AQYHh2ppI4hARodnCaRgGkAeUGgVebBVaq22rrkctojjbnKLKRX0X7CVgJ
l8x+fzSU/TCsjYZIAm+PAzHjMZnJwFtNISK1yeGOQ3V0TMWg0tIwe13negv3uJbR
5AvHqARcnkrRRehT2wFsxWikHcjYv3kzaKca4t4HIYJFjc0xiJeDLQ8TqgBzbp0y
1S4Q45ZCIfL5FFEAEQEAAbQ3c2l2YTMzMyAobXlncGdrZXkpIDxzaXZhcHJhc2Fk
cmVkZHlnYXJsYXBhdGlAZ21haWwuY29tPokBzgQTAQoAOBYhBPwEsBrxqs9Icb2v
Ly0MOhpEZup8BQJmLiR3AhsDBQsJCAcCBhUKCQgLAgQWAgMBAh4BAheAAAoJEC0M
OhpEZup8KygL/A5xWSbDNweK6TNvYdy0Qq3946mwPfDbwfid5czyNLamgupUAm9z
u08pmVSw9b1HHWKQ68qaHLTv62N0yG81KTCwPKbzwyYo5RwqiRNVW+bGGx1I+sIs
sxWanlE9M08Gm7NVgFBOaD6PA0GwohuO91ytnHjIurmldH7VhDSSqK+48adqRqEC
FZ+xn6MQKmU8qKC+uZj+9M921rmuehWCz1COBFL4gwPSoPtiZmWHxiLbHUWkgM56
Bmtr3X/BSJDIqf9lSEsMgzuTke/DSdjw8iWlPRk8PP6nC1w2/0i7MJgsROyBXQku
Rjuq4jIZoejFCsVQ12PR0TeB7L0aD6tNKc5AKk/KKCFQJZC4NFVDOhUpHQ6yWcAB
rOe3t5Jr/c56L7Tj/wz7/lsms0GXtSMwSngxwmU9akQEeb9RNEHHof2S/bzN5nLO
bP6mAGW+TUkW7dXllEIaoKg2V2HCM9ONGZckE5ijeAjDNar8qIW5JcbDHqK4DSXS
2JiEBtM18Ge3y7kBjQRmLiR3AQwAv33mw6F2FvtmHnyXdiK0yxHaTx5D2rMEu6Cn
Xg5h4Q3+hvcH24MU59vuLYHl1IiuQbUr0dEWs+9GlJTc0DYtCdoRHlqoRQOa7OOJ
AXLqmyUcPuzRrnKHwMe578pBcLtSGYqMpRu/uHblU4+NKArU5WySkrjkWygC05YR
pevJy89ovAy1m4yj+8RzdAwEpSI2Oq9qCZGw9U+rrQ0y+tKWqWAHXY3aUyRK8IMw
vedfZS/0xeQ3GZ9k1C1OIzbJjFp37UA1YADOsk7g/ytO9ZmI/1C9A0/NmX0v0qdf
CsxWwtGCHtZC3oiuMLBdGK5wMPiJciCnEmoIsxt+30BSNDGjGKovKZYMdNKrXKtv
FINQolKCSHTY/DXlWNE1R4IqSxsq9HpiT4BPj4BLtXhc7s10Pklx5ODZzDcxmY8c
sqTCa8PeCup3eYHpcZ/OS7tip0OjRwPcxmO98DE1iUQHBL6z7pO25Xp8lRfuuSmc
aLSGIKocGtYo5DaVERz3oKRqsmgxABEBAAGJAbYEGAEKACAWIQT8BLAa8arPSHG9
ry8tDDoaRGbqfAUCZi4kdwIbDAAKCRAtDDoaRGbqfPeDC/9NFRSz+nPqCwRMCosQ
BPYBEdygj9FDZtgACoBX6mnWZWhods7OJfOY/qlnZI9nrzqzUCL/A6nodB8w9LbQ
TnTh62YpL7+HFkUhOPyHE9rPv2uR94ToDDbpR/F62/nFWw5zaah8POUVnUf+nrUW
zGadporNiHNNaX5IU37NBP6XBRKxttTPqAPPruvGpYNpVXxEZcM60QlouYzaPsqR
7fgmKjT/GqTGxoIx2Q0MSnob0c1sWMpaT96S1FUDmoqv+WEGE1tE2a7orAMUrlwc
oolL/iufStW2E0mw6raUV80G4du+rFHykVDsYsRPb987ustu4iRlkFjlITId8uGW
xxFI14m8IsShncBBS1oZlBWP7BJP4FhSr30V2RBwpABVjUrsxqC/oyjSV0bMPi9H
0qyx947M8TsYONVAE99tVeSN0xcLu0PwQRCUl0goQ15+lKi560czhLHt8LMRMBsD
rczNzjncj1e5lKOgMgGgpT4+wa4Cl/3Hrpg0t1p/TQMF7hU=
=I71U
-----END PGP PUBLIC KEY BLOCK-----
from flask import Flask, request, jsonify
from kubernetes import client, config, watch
import threading
# Load Kubernetes configuration from default location
config.load_kube_config()
# Define Kubernetes API client
k8s_client = client.ApiClient()
# Define custom resource group/version
group = 'example.com'
version = 'v1'
# Define custom resource plural
plural = 'functions'
# Define custom resource API
crd_api = client.CustomObjectsApi(k8s_client)
# Define namespace where functions will be deployed
namespace = 'default'
# Initialize Flask app
app = Flask(__name__)
# Dictionary to store function replicas
function_replicas = {}
def deploy_function(function_name, image):
"""
Deploy a function to Kubernetes as a custom resource.
"""
# Define function resource
function_resource = {
'apiVersion': f'{group}/{version}',
'kind': 'Function',
'metadata': {'name': function_name},
'spec': {'image': image}
}
# Create function resource in Kubernetes
k8s_client.create_namespaced_custom_object(group, version, namespace, plural, function_resource)
print(f"Function {function_name} deployed with image {image}.")
def update_function(function_name, image):
"""
Update an existing function in Kubernetes.
"""
# Define function resource
function_resource = {
'apiVersion': f'{group}/{version}',
'kind': 'Function',
'metadata': {'name': function_name},
'spec': {'image': image}
}
# Update function resource in Kubernetes
k8s_client.replace_namespaced_custom_object(group, version, namespace, plural, function_name, function_resource)
print(f"Function {function_name} updated with image {image}.")
def scale_function(function_name, replicas):
"""
Scale the number of replicas for a function in Kubernetes.
"""
# Define function scale spec
scale_spec = {'spec': {'replicas': replicas}}
# Patch function scale in Kubernetes
k8s_client.patch_namespaced_custom_object_scale(group, version, namespace, plural, function_name, scale_spec)
print(f"Function {function_name} scaled to {replicas} replicas.")
def handle_function(event):
"""
Handle function events (create, update, delete).
"""
function = event['object']
function_name = function['metadata']['name']
function_status = event['type']
if function_status == 'ADDED':
print(f"Function {function_name} added.")
deploy_function(function_name, function['spec']['image'])
elif function_status == 'MODIFIED':
print(f"Function {function_name} modified.")
update_function(function_name, function['spec']['image'])
elif function_status == 'DELETED':
print(f"Function {function_name} deleted.")
def watch_functions():
"""
Watch for changes to function resources.
"""
resource_version = ''
while True:
stream = watch.Watch().stream(crd_api.list_namespaced_custom_object, group, version, namespace, plural, resource_version=resource_version)
for event in stream:
resource_version = event['object']['metadata']['resourceVersion']
handle_function(event)
def auto_scale():
"""
Auto-scale functions based on request rate.
"""
while True:
# Example auto-scaling logic
for function_name in function_replicas:
# Adjust number of replicas based on request rate
request_rate = function_replicas.get(function_name, 0)
replicas = min(max(int(request_rate / 10), 1), 10) # Scale between 1 and 10 replicas
scale_function(function_name, replicas)
# Sleep for 1 minute (adjust as needed)
time.sleep(60)
@app.route('/invoke', methods=['POST'])
def invoke_function():
"""
Handle HTTP POST requests to invoke functions.
"""
data = request.get_json()
function_name = data.get('function_name')
# Increment request rate for function
function_replicas[function_name] = function_replicas.get(function_name, 0) + 1
return jsonify({'message': f"Function {function_name} invoked."})
if __name__ == '__main__':
# Start watching for changes to function resources
watch_thread = threading.Thread(target=watch_functions, daemon=True)
watch_thread.start()
# Start auto-scaling thread
auto_scale_thread = threading.Thread(target=auto_scale, daemon=True)
auto_scale_thread.start()
# Run Flask app
app.run(host='0.0.0.0', port=5000)
import os
import subprocess
import yaml
from kubernetes import client, config
# Load Kubernetes configuration from default location
config.load_kube_config()
# Kubernetes API client
k8s_client = client.ApiClient()
# Docker Hub username (replace with your Docker Hub username)
docker_username = "gsiva333"
def build_and_push_image(function_name):
"""
Build the Docker image and push it to Docker Hub.
"""
try:
# Change directory to function directory
os.chdir(function_name)
# Build Docker image
subprocess.run(["docker", "build", "-t", f"{docker_username}/{function_name}:latest", "."], check=True)
# Push Docker image to Docker Hub
subprocess.run(["docker", "push", f"{docker_username}/{function_name}:latest"], check=True)
except subprocess.CalledProcessError as e:
print(f"Error occurred while building/pushing Docker image: {e}")
finally:
# Change back to original directory
os.chdir("..")
def create_deployment(function_name):
"""
Create a Kubernetes Deployment resource for the function.
"""
try:
deployment_manifest = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": function_name},
"spec": {
"replicas": 1,
"selector": {"matchLabels": {"app": function_name}},
"template": {
"metadata": {"labels": {"app": function_name}},
"spec": {
"containers": [{
"name": function_name,
"image": f"{docker_username}/{function_name}:latest",
"ports": [{"containerPort": 5000}] # Assuming function listens on port 8080
}]
}
}
}
}
#yourusername Create Deployment in Kubernetes
api_instance = client.AppsV1Api(k8s_client)
api_instance.create_namespaced_deployment(namespace="default", body=deployment_manifest)
print(f"Deployment created for {function_name}.")
except client.rest.ApiException as e:
print(f"Error occurred while creating Deployment: {e}")
def create_service(function_name):
"""
Create a Kubernetes Service resource for the function.
"""
try:
service_manifest = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {"name": f"{function_name}-service"},
"spec": {
"selector": {"app": function_name},
"ports": [{"protocol": "TCP", "port": 8080, "targetPort": 5000}], # Assuming function listens on port 8080
"type": "LoadBalancer" # Expose service only within the cluster
}
}
# Create Service in Kubernetes
#k8s_client.create_namespaced_service(namespace="default", body=service_manifest)
api_instance = client.CoreV1Api()
api_instance.create_namespaced_service(namespace="default", body=service_manifest)
print(f"Service created for {function_name}.")
except client.rest.ApiException as e:
print(f"Error occurred while creating Service: {e}")
"""
def get_external_ip(service_name):
try:
# Wait for the external IP to be assigned
while True:
service = api_instance.read_namespaced_service(name=service_name, namespace="default")
if service.status.load_balancer.ingress:
external_ip = service.status.load_balancer.ingress[0].ip
if external_ip:
return external_ip
time.sleep(5) # Wait for 5 seconds before checking again
except client.rest.ApiException as e:
print(f"Error occurred while getting external IP: {e}")
"""
def deploy_function(function_name):
"""
Deploy the function to Kubernetes.
"""
try:
build_and_push_image(function_name)
create_deployment(function_name)
create_service(function_name)
# Wait for the external IP to be assigned
#external_ip = get_external_ip(f"{function_name}-service")
#print(f"External IP for {function_name}-service: {external_ip}")
except Exception as e:
print(f"Error occurred during deployment of {function_name}: {e}")
if __name__ == "__main__":
function_name = input("Enter the name of the function directory: ")
deploy_function(function_name)
import os
import subprocess
import yaml
import time
from kubernetes import client, config
# Load Kubernetes configuration from default location
config.load_kube_config()
# Kubernetes API client
k8s_client = client.ApiClient()
# Docker Hub username (replace with your Docker Hub username)
docker_username = "gsiva333"
def build_and_push_image(function_name):
"""
Build the Docker image and push it to Docker Hub.
"""
try:
# Change directory to function directory
os.chdir(function_name)
# Build Docker image
subprocess.run(["docker", "build", "-t", f"{docker_username}/{function_name}:latest", "."], check=True)
# Push Docker image to Docker Hub
subprocess.run(["docker", "push", f"{docker_username}/{function_name}:latest"], check=True)
except subprocess.CalledProcessError as e:
print(f"Error occurred while building/pushing Docker image: {e}")
finally:
# Change back to original directory
os.chdir("..")
def create_deployment(function_name):
"""
Create a Kubernetes Deployment resource for the function.
"""
try:
deployment_manifest = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": function_name},
"spec": {
"replicas": 1,
"selector": {"matchLabels": {"app": function_name}},
"template": {
"metadata": {"labels": {"app": function_name}},
"spec": {
"containers": [{
"name": function_name,
"image": f"{docker_username}/{function_name}:latest",
"ports": [{"containerPort": 5000}] # Assuming function listens on port 8080
}]
}
}
}
}
#yourusername Create Deployment in Kubernetes
api_instance = client.AppsV1Api(k8s_client)
api_instance.create_namespaced_deployment(namespace="default", body=deployment_manifest)
print(f"Deployment created for {function_name}.")
except client.rest.ApiException as e:
print(f"Error occurred while creating Deployment: {e}")
def create_service(function_name):
"""
Create a Kubernetes Service resource for the function.
"""
try:
service_manifest = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {"name": f"{function_name}-service"},
"spec": {
"selector": {"app": function_name},
"ports": [{"protocol": "TCP", "port": 8080, "targetPort": 5000}], # Assuming function listens on port 8080
"type": "LoadBalancer" # Expose service only within the cluster
}
}
# Create Service in Kubernetes
#k8s_client.create_namespaced_service(namespace="default", body=service_manifest)
api_instance = client.CoreV1Api()
api_instance.create_namespaced_service(namespace="default", body=service_manifest)
print(f"Service created for {function_name}.")
except client.rest.ApiException as e:
print(f"Error occurred while creating Service: {e}")
def get_external_ip(service_name):
"""
Get the external IP of the service.
"""
try:
# Wait for the external IP to be assigned
api_instance = client.CoreV1Api()
while True:
service = api_instance.read_namespaced_service(name=service_name, namespace="default")
if service.status.load_balancer.ingress:
external_ip = service.status.load_balancer.ingress[0].ip
if external_ip:
return external_ip
time.sleep(5) # Wait for 5 seconds before checking again
except client.rest.ApiException as e:
print(f"Error occurred while getting external IP: {e}")
def deploy_function(function_name):
"""
Deploy the function to Kubernetes.
"""
try:
build_and_push_image(function_name)
create_deployment(function_name)
create_service(function_name)
# Wait for the external IP to be assigned
#external_ip = get_external_ip(f"{function_name}-service")
#print(f"External IP for {function_name}-service: {external_ip}")
except Exception as e:
print(f"Error occurred during deployment of {function_name}: {e}")
if __name__ == "__main__":
function_name = input("Enter the name of the function directory: ")
deploy_function(function_name)
apiVersion: apps/v1
kind: Deployment
metadata:
name: function-deployment
spec:
replicas: 1
selector:
matchLabels:
app: function
template:
metadata:
labels:
app: function
spec:
containers:
- name: function
image: yourusername/function-name:latest
ports:
- containerPort: 8080 # Assuming your function listens on port 8080
apiVersion: v1
kind: Service
metadata:
name: function-service
spec:
selector:
app: function
ports:
- protocol: TCP
port: 8080 # Port on which your function container is listening
targetPort: 8080 # Port to forward traffic to in the pod
type: ClusterIP # Expose service only within the cluster
from flask import Flask, request, jsonify
from kubernetes import client, config, watch
from prometheus_client import Counter, Gauge, generate_latest, CONTENT_TYPE_LATEST, Histogram
from prometheus_client.exposition import make_wsgi_app
from werkzeug.middleware.dispatcher import DispatcherMiddleware
import threading
import time
import requests
from wsgiref.simple_server import make_server
# Load Kubernetes configuration from default location
config.load_kube_config()
# Define Kubernetes API client
k8s_client = client.ApiClient()
# Initialize Flask app
app = Flask(__name__)
# Prometheus metrics
function_invocation_counter = Counter('function_invocation_count', 'Number of function invocations', ['function_name'])
function_request_rate_gauge = Gauge('function_request_rate', 'Request rate for functions', ['function_name'])
cpu_utilization_gauge= Gauge('cpu_utilization','CPU utilization for functions',['function_name'])
storage_utilization_gauge = Gauge('storage_utilization', 'Storage utilization for functions', ['function_name'])
service_time_histogram = Histogram('service_time_seconds', 'Service time for functions', ['function_name'])
def deploy_function(function_name, image):
"""
Deploy a function to Kubernetes as a Deployment.
"""
# Define deployment spec
try:
deployment_spec = {
'apiVersion': 'apps/v1',
'kind': 'Deployment',
'metadata': {'name': function_name},
'spec': {
'replicas': 1,
'selector': {'matchLabels': {'app': function_name}},
'template': {
'metadata': {'labels': {'app': function_name}},
'spec': {
'containers': [{
'name': function_name,
'image': image,
'ports': [{'containerPort': 5000}]
}]
}
}
}
}
# Create deployment in Kubernetes
api_instance = client.AppsV1Api(k8s_client)
api_instance.create_namespaced_deployment(namespace='default', body=deployment_spec)
print(f"Function {function_name} deployed with image {image}.")
except Exception as e:
print(f"Error deploying {function_name}: {e}")
def scale_function(function_name, replicas):
"""
Scale the number of replicas for a function in Kubernetes.
"""
# Define HPA spec
try:
hpa_spec = {
'apiVersion': 'autoscaling/v1',
'kind': 'HorizontalPodAutoscaler',
'metadata': {'name': f'{function_name}-hpa'},
'spec': {
'scaleTargetRef': {
'apiVersion': 'apps/v1',
'kind': 'Deployment',
'name': function_name
},
'minReplicas': 1,
'maxReplicas': replicas,
'targetCPUUtilizationPercentage': 50 # Adjust as needed
}
}
# Create HPA in Kubernetes
#api_instance = client.CoreV1Api()
api_instance = client.AutoscalingV1Api(k8s_client)
api_instance.create_namespaced_horizontal_pod_autoscaler(namespace='default', body=hpa_spec)
print(f"Function {function_name} auto-scaler created with max replicas {replicas}.")
except Exception as e:
print(f"Error scaling {function_name}: {e}")
def handle_function(event):
"""
Handle function events (create, update, delete).
"""
function = event['object']
#metadata = function['metadata']
#function_name = metadata.get('name')
function_name = function.metadata.name
function_status = event['type']
if function_status == 'ADDED':
print(f"Function {function_name} added.")
#deploy_function(function_name, function['spec']['image'])
#deploy_function(function_name, function.spec['image'])
scale_function(function_name, 10) # Initially scale to max replicas of 10
elif function_status == 'MODIFIED':
print(f"Function {function_name} modified.")
# For simplicity, we don't handle updates in this example
elif function_status == 'DELETED':
print(f"Function {function_name} deleted.")
def watch_functions():
"""
Watch for changes to function resources.
"""
resource_version = ''
api_instance = client.AppsV1Api(k8s_client)
w = watch.Watch()
my_resource = {
"apiVersion": "example.com/v1",
"kind": "deployment",
"metadata": {"name": "my-new-cron-object"},
"spec": {
"cronSpec": "* * * * */5",
"image": "my-awesome-cron-image"
}
}
while True:
deployment_watch = watch.Watch().stream(api_instance.list_namespaced_deployment, namespace='default')
#stream = w.stream(client.CustomObjectsApi().list_namespaced_custom_object, group='example.com', version='v1', namespace='default', plural='functions', resource_version=resource_version)
#for event in stream:
for event in deployment_watch:
#resource_version = event['object']['metadata']['resourceVersion']
print(event)
handle_function(event)
@app.route('/invoke', methods=['POST'])
def invoke_function():
"""
Handle HTTP POST requests to invoke functions.
"""
data = request.get_json()
function_name = data.get('function_name')
# Increment request rate for function
function_request_rate_gauge.labels(function_name=function_name).inc()
# Increment invocation count for function
function_invocation_counter.labels(function_name=function_name).inc()
start_time=time.time()
# Invoke function
result = invoke_kubernetes_function(function_name)
end_time=time.time()
service_time=end_time-start_time
service_time_histogram.labels(function_name=function_name).observe(service_time)
return jsonify({'message': f"Function {function_name} invoked. Result: {result}"})
def invoke_kubernetes_function(function_name):
"""
Invoke a Kubernetes function by sending an HTTP request.
"""
# Assuming the function is exposed via a service with type ClusterIP
# You need to replace 'function-service' with the actual service name in your Kubernetes cluster
url = f"http://function-service.default.svc.cluster.local:8080/invoke"
# Example payload, adjust as needed
payload = {
"message": "Hello from Python!"
}
try:
response = requests.post(url, json=payload)
if response.status_code == 200:
return response.json() # Assuming the function returns JSON response
else:
return f"Error: {response.status_code}"
except requests.exceptions.RequestException as e:
return f"Error: {e}"
@app.route('/metrics')
def metrics_app(environ, start_response):
"""
Serve Prometheus metrics.
"""
response_body = generate_latest()
status = '200 OK'
headers = [('Content-type', CONTENT_TYPE_LATEST)]
start_response(status, headers)
return [response_body]
if __name__ == '__main__':
# Start watching for changes to function resources
watch_thread = threading.Thread(target=watch_functions, daemon=True)
watch_thread.start()
# Run Flask app
app.run(host='0.0.0.0', port=5000)
# Serve Prometheus metrics
metrics_app_wrapper = DispatcherMiddleware(app.wsgi_app, {'/metrics': metrics_app})
httpd = make_server('0.0.0.0', 8080, metrics_app_wrapper)
httpd.serve_forever()
This diff is collapsed.
# Use an official Python runtime as a parent image
FROM python:3.8-slim
# Set the working directory in the container
WORKDIR /app
# Copy the Flask application code into the container
COPY . .
# Install any dependencies needed for your Flask application
RUN pip install Flask
# Define the command to run the Flask application
CMD ["python", "app.py"]
apiVersion: v1
kind: Service
metadata:
name: your-service
spec:
selector:
app: example1-service
ports:
- name: http
protocol: TCP
port: 80
targetPort: 8080
type: LoadBalancer
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/invoke', methods=['POST'])
def invoke_function():
# Parse request data
data = request.json
# Extract parameters or payload from the request
param1 = data.get('param1')
param2 = data.get('param2')
# Call your deployed function with the provided parameters
result = your_deployed_function(param1, param2)
# Prepare response
response = {
"result": result
}
return jsonify(response)
def your_deployed_function(param1, param2):
# Your logic to process the parameters and perform some computation
# This function represents your deployed function
return param1 + param2
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
docker build -t your-image-name .
docker push your-image-name
kubectl apply -f deployment.yaml
kubectl apply -f service.yaml
kubectl apply -f hpa.yaml
# metrics
kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml
# edit/modify
kubectl get hpa <hpa_name> -o yaml > hpa.yaml
kubectl apply -f hpa.yaml
or
kubectl edit hpa <hpa_name>
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
k8s-app: metrics-server
name: metrics-server
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
k8s-app: metrics-server
rbac.authorization.k8s.io/aggregate-to-admin: "true"
rbac.authorization.k8s.io/aggregate-to-edit: "true"
rbac.authorization.k8s.io/aggregate-to-view: "true"
name: system:aggregated-metrics-reader
rules:
- apiGroups:
- metrics.k8s.io
resources:
- pods
- nodes
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
k8s-app: metrics-server
name: system:metrics-server
rules:
- apiGroups:
- ""
resources:
- nodes/metrics
verbs:
- get
- apiGroups:
- ""
resources:
- pods
- nodes
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
labels:
k8s-app: metrics-server
name: metrics-server-auth-reader
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: extension-apiserver-authentication-reader
subjects:
- kind: ServiceAccount
name: metrics-server
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
labels:
k8s-app: metrics-server
name: metrics-server:system:auth-delegator
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: system:auth-delegator
subjects:
- kind: ServiceAccount
name: metrics-server
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
labels:
k8s-app: metrics-server
name: system:metrics-server
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: system:metrics-server
subjects:
- kind: ServiceAccount
name: metrics-server
namespace: kube-system
---
apiVersion: v1
kind: Service
metadata:
labels:
k8s-app: metrics-server
name: metrics-server
namespace: kube-system
spec:
ports:
- name: https
port: 443
protocol: TCP
targetPort: https
selector:
k8s-app: metrics-server
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
k8s-app: metrics-server
name: metrics-server
namespace: kube-system
spec:
selector:
matchLabels:
k8s-app: metrics-server
strategy:
rollingUpdate:
maxUnavailable: 0
template:
metadata:
labels:
k8s-app: metrics-server
spec:
containers:
- args:
- --cert-dir=/tmp
- --secure-port=10250
- --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname
- --kubelet-use-node-status-port
- --metric-resolution=15s
image: registry.k8s.io/metrics-server/metrics-server:v0.7.1
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 3
httpGet:
path: /livez
port: https
scheme: HTTPS
periodSeconds: 10
name: metrics-server
ports:
- containerPort: 10250
name: https
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /readyz
port: https
scheme: HTTPS
initialDelaySeconds: 20
periodSeconds: 10
resources:
requests:
cpu: 100m
memory: 200Mi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
seccompProfile:
type: RuntimeDefault
volumeMounts:
- mountPath: /tmp
name: tmp-dir
nodeSelector:
kubernetes.io/os: linux
priorityClassName: system-cluster-critical
serviceAccountName: metrics-server
volumes:
- emptyDir: {}
name: tmp-dir
---
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
k8s-app: metrics-server
name: v1beta1.metrics.k8s.io
spec:
group: metrics.k8s.io
groupPriorityMinimum: 100
insecureSkipTLSVerify: true
service:
name: metrics-server
namespace: kube-system
version: v1beta1
versionPriority: 100
apiVersion: apps/v1
kind: Deployment
metadata:
name: your-deployment
spec:
replicas: 1
selector:
matchLabels:
app: your-app
template:
metadata:
labels:
app: your-app
spec:
containers:
- name: your-container
image: msanth/your-image-name
ports:
- containerPort: 8080
resources:
requests:
memory: "256Mi"
cpu: "0.5"
limits:
memory: "512Mi"
cpu: "1"
# Define autoscaling behavior using annotations
#annotations:
# autoscaling.knative.dev/target: "70"
# autoscaling.knative.dev/metric: "cpu" # can be concurrency and rps (rates per second)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: my-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: my-deployment
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 50
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: simple-fanout-example
spec:
rules:
- host: cs695
http:
paths:
- path: /foo
pathType: Prefix
backend:
service:
name: service1
port:
number: 4200
- path: /bar
pathType: Prefix
backend:
service:
name: your-service
port:
number: 80
import curses
def main():
screen = curses.initscr()
curses.start_color()
curses.noecho()
curses.cbreak()
screen.keypad(True)
# Set colors
black = curses.color_pair(1)
red = curses.color_pair(2)
green = curses.color_pair(3)
while True:
# Clear the screen
screen.clear()
# Display information in first row
screen.addstr(0, 0, "INFO: " + black("Field Info"))
screen.refresh()
# Split second row into two columns
screen.addstr(1, 0, "Col 1: " + black("Functions"))
screen.addstr(2, 0, "Col 2: " + black("WIP Actions"))
screen.addstr(3, 0, "Col 2: " + black("Failed Actions"))
# Split third row for Grafanna metrics
screen.addstr(4, 0, "Col 1: " + green("Metric 1"))
screen.addstr(5, 0, "Col 2: " + green("Metric 2"))
# Last row for user commands
screen.addstr(6, 0, "Commands: " + red("Command 1"))
# Display the user interface and handle input
curses.wbeginnen(2); curses.winsch([[2]]);
curses.wgetnstr(2,1);
while True:
# Update information in first row
screen.addstr(0, 20, black("Current Date"))
screen.refresh()
# Handle user input
key = screen.getkey()
if key == ord('q'):
break
curses.endwin()
if __name__ == "__main__":
main()
from kubernetes import client, config
from kubernetes.client import configuration
import time
def get_deployment_metrics():
configuration.assert_hostname = False
config.load_kube_config() # Load Kubernetes configuration from default location
configuration.assert_hostname = False
api_instance = client.CustomObjectsApi()
# Retrieve metrics of all deployments in the "default" namespace
deployments_metrics = api_instance.list_namespaced_custom_object(
group="metrics.k8s.io",
version="v1beta1",
namespace="default",
plural="deployments"
)
deployment_metrics_info = []
for deployment in deployments_metrics["items"]:
deployment_name = deployment["metadata"]["name"]
replicas = deployment["spec"]["replicas"]
ready_replicas = deployment["status"]["readyReplicas"]
unavailable_replicas = replicas - ready_replicas if ready_replicas is not None else 0
deployment_metrics_info.append((deployment_name, replicas, ready_replicas, unavailable_replicas))
return deployment_metrics_info
def display_dashboard(deployment_metrics_info):
print("Deployment Name\tReplicas\tReady Replicas\tUnavailable Replicas")
print("-----------------------------------------------------------------")
for deployment_name, replicas, ready_replicas, unavailable_replicas in deployment_metrics_info:
print(f"{deployment_name}\t\t{replicas}\t\t{ready_replicas}\t\t{unavailable_replicas}")
def main():
try:
while True:
deployment_metrics_info = get_deployment_metrics()
display_dashboard(deployment_metrics_info)
time.sleep(10) # Wait for 10 seconds before refreshing
except KeyboardInterrupt:
print("\nMonitoring stopped.")
if __name__ == "__main__":
main()
from kubernetes import client, config
from kubernetes.client import configuration
import time
def get_pod_metrics():
configuration.assert_hostname = False
config.load_kube_config() # Load Kubernetes configuration from default location
configuration.assert_hostname = False
api_instance = client.CustomObjectsApi()
# Retrieve metrics of all pods in the "default" namespace
pods_metrics = api_instance.list_namespaced_custom_object(
group="metrics.k8s.io",
version="v1beta1",
namespace="default",
plural="pods"
)
pod_metrics_info = []
for pod in pods_metrics["items"]:
pod_name = pod["metadata"]["name"]
cpu_usage = pod["containers"][0]["usage"]["cpu"]
memory_usage = pod["containers"][0]["usage"]["memory"]
pod_metrics_info.append((pod_name, cpu_usage, memory_usage))
return pod_metrics_info
def display_dashboard(pod_metrics_info):
print("Pod Name\t\tCPU Usage\tMemory Usage")
print("---------------------------------------------")
for pod_name, cpu_usage, memory_usage in pod_metrics_info:
print(f"{pod_name}\t\t{cpu_usage}\t\t{memory_usage}")
def main():
try:
while True:
pod_metrics_info = get_pod_metrics()
display_dashboard(pod_metrics_info)
time.sleep(10) # Wait for 10 seconds before refreshing
except KeyboardInterrupt:
print("\nMonitoring stopped.")
if __name__ == "__main__":
main()
apiVersion: apps/v1
kind: Deployment
metadata:
name: metrics-server
namespace: kube-system
labels:
k8s-app: metrics-server
spec:
selector:
matchLabels:
k8s-app: metrics-server
template:
metadata:
name: metrics-server
labels:
k8s-app: metrics-server
spec:
serviceAccountName: metrics-server
volumes:
# mount in tmp so we can safely use from-scratch images and/or read-only containers
- name: tmp-dir
emptyDir: {}
containers:
- name: metrics-server
image: k8s.gcr.io/metrics-server/metrics-server:v0.3.7
imagePullPolicy: IfNotPresent
args:
- --kubelet-insecure-tls
- --cert-dir=/tmp
- --secure-port=4443
ports:
- name: main-port
containerPort: 4443
protocol: TCP
securityContext:
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
volumeMounts:
- name: tmp-dir
mountPath: /tmp
nodeSelector:
kubernetes.io/os: linux
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
k8s-app: metrics-server
name: metrics-server
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
k8s-app: metrics-server
rbac.authorization.k8s.io/aggregate-to-admin: "true"
rbac.authorization.k8s.io/aggregate-to-edit: "true"
rbac.authorization.k8s.io/aggregate-to-view: "true"
name: system:aggregated-metrics-reader
rules:
- apiGroups:
- metrics.k8s.io
resources:
- pods
- nodes
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
k8s-app: metrics-server
name: system:metrics-server
rules:
- apiGroups:
- ""
resources:
- pods
- nodes
- nodes/stats
- namespaces
- configmaps
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
labels:
k8s-app: metrics-server
name: metrics-server-auth-reader
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: extension-apiserver-authentication-reader
subjects:
- kind: ServiceAccount
name: metrics-server
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
labels:
k8s-app: metrics-server
name: metrics-server:system:auth-delegator
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: system:auth-delegator
subjects:
- kind: ServiceAccount
name: metrics-server
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
labels:
k8s-app: metrics-server
name: system:metrics-server
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: system:metrics-server
subjects:
- kind: ServiceAccount
name: metrics-server
namespace: kube-system
---
apiVersion: v1
kind: Service
metadata:
labels:
k8s-app: metrics-server
name: metrics-server
namespace: kube-system
spec:
ports:
- name: https
port: 443
protocol: TCP
targetPort: https
selector:
k8s-app: metrics-server
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
k8s-app: metrics-server
name: metrics-server
namespace: kube-system
spec:
selector:
matchLabels:
k8s-app: metrics-server
strategy:
rollingUpdate:
maxUnavailable: 0
template:
metadata:
labels:
k8s-app: metrics-server
spec:
containers:
- args:
- --cert-dir=/tmp
- --secure-port=4443
- --kubelet-preferred-address-types=InternalIP
- --kubelet-insecure-tls
image: k8s.gcr.io/metrics-server/metrics-server:v0.4.1
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 3
httpGet:
path: /livez
port: https
scheme: HTTPS
periodSeconds: 10
name: metrics-server
ports:
- containerPort: 4443
name: https
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /readyz
port: https
scheme: HTTPS
periodSeconds: 10
securityContext:
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
volumeMounts:
- mountPath: /tmp
name: tmp-dir
nodeSelector:
kubernetes.io/os: linux
priorityClassName: system-cluster-critical
serviceAccountName: metrics-server
volumes:
- emptyDir: {}
name: tmp-dir
---
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
k8s-app: metrics-server
name: v1beta1.metrics.k8s.io
spec:
group: metrics.k8s.io
groupPriorityMinimum: 100
insecureSkipTLSVerify: true
service:
name: metrics-server
namespace: kube-system
version: v1beta1
versionPriority: 100
#!/bin/bash
# Number of requests to send
requests=100
# URL to test
#url="http://example.com"
url="http://10.108.160.104"
# Maximum transfer rate in bytes per second (e.g., 1000 bytes/s)
rate_limit=1000000000
# Send requests in parallel using xargs with rate control
seq $requests | xargs -n1 -P10 curl -s --limit-rate $rate_limit -o /dev/null -w "%{http_code}\n" $url
--2024-04-28 11:54:46-- http://10.108.160.104/invoke?param1=test
Connecting to 10.108.160.104:80... failed: Connection refused.
# hello.py
def handle(req):
return "Hello, World!"
apiVersion: batch/v1
kind: Job
metadata:
name: hello-1
spec:
template:
# This is the pod template
spec:
containers:
- name: hello
image: busybox:1.28
command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600']
restartPolicy: OnFailure
imagePullSecrets:
- name: regcred
# The pod template ends here
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: ui-acct
namespace: kubeless
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: kubeless-ui
rules:
- apiGroups:
- ""
resources:
- pods
- pods/log
verbs:
- get
- list
- apiGroups:
- kubeless.io
resources:
- functions
verbs:
- get
- list
- watch
- update
- create
- delete
- apiGroups:
- ""
resources:
- services
verbs:
- get
- list
- proxy
- apiGroups:
- ""
resources:
- services/proxy
verbs:
- get
# the 'create' verb is required because otherwise POST requests are blocked
- create
- proxy
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: kubeless-ui
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kubeless-ui
subjects:
- kind: ServiceAccount
name: ui-acct
namespace: kubeless
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
controller: ui
namespace: kubeless
name: ui
spec:
replicas: 1
selector:
matchLabels:
controller: ui
template:
metadata:
labels:
controller: ui
spec:
containers:
- name: ui
image: bitnami/kubeless-ui:latest
imagePullPolicy: Always
ports:
- containerPort: 3000
protocol: TCP
- name: proxy
image: lachlanevenson/k8s-kubectl:v1.13.1
imagePullPolicy: Always
args:
- proxy
- "-p"
- "8080"
serviceAccountName: ui-acct
---
apiVersion: v1
kind: Service
metadata:
labels:
controller: ui
name: ui
namespace: kubeless
spec:
ports:
- name: ui-port
port: 3000
protocol: TCP
targetPort: 3000
selector:
controller: ui
sessionAffinity: None
type: NodePort
apiVersion: v1
kind: Pod
metadata:
name: nginx-2
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
labels:
app: myapp
spec:
replicas: 1
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
spec:
containers:
- name: myapp
image: alpine:latest
command: ['sh', '-c', 'while true; do echo "logging" >> /opt/logs.txt; sleep 1; done']
volumeMounts:
- name: data
mountPath: /opt
initContainers:
- name: logshipper
image: alpine:latest
restartPolicy: Always
command: ['sh', '-c', 'tail -F /opt/logs.txt']
volumeMounts:
- name: data
mountPath: /opt
volumes:
- name: data
emptyDir: {}
from kubernetes import client, config
import time
# Docker Hub username (replace with your Docker Hub username)
docker_username = "msanth"
def build_and_push_image(function_name):
"""
Build the Docker image and push it to Docker Hub.
"""
# Change directory to function directory
os.chdir(function_name)
# Build Docker image
subprocess.run(["docker", "build", "-t", f"{docker_username}/{function_name}:latest", "."], check=True)
# Push Docker image to Docker Hub
subprocess.run(["docker", "push", f"{docker_username}/{function_name}:latest"], check=True)
# Change back to original directory
os.chdir("..")
def deploy_function(function_name):
# Load Kubernetes config from default location
image_name = function_name+":latest"
config.load_kube_config()
# Initialize Kubernetes API client
api_instance = client.AppsV1Api()
# Define deployment object
deployment = client.V1Deployment()
deployment.metadata = client.V1ObjectMeta(name=function_name)
deployment.spec = client.V1DeploymentSpec(
replicas=1,
selector=client.V1LabelSelector(
match_labels={"app": function_name}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": function_name}),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name=function_name,
image=image_name
)
]
)
)
)
# Create deployment
print("Creating deployment...")
api_instance.create_namespaced_deployment(namespace="default", body=deployment)
# Wait for deployment to succeed
while True:
time.sleep(5) # Check deployment status every 5 seconds
deployment_status = api_instance.read_namespaced_deployment_status(name=function_name, namespace="default")
if deployment_status.status.ready_replicas == 1:
print("Deployment succeeded.")
break
def create_service(function_name):
"""
Create a Kubernetes Service resource for the function.
"""
service_manifest = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {"name": f"{function_name}-service"},
"spec": {
"selector": {"app": function_name},
"ports": [{"protocol": "TCP", "port": 8080, "targetPort": 5000}], # Assuming function listens on port 8080
"type": "LoadBalancer" # Expose service only within the cluster
}
}
# Create Service in Kubernetes
#k8s_client.create_namespaced_service(namespace="default", body=service_manifest)
api_instance = client.CoreV1Api()
api_instance.create_namespaced_service(namespace="default", body=service_manifest)
print(f"Service created for {function_name}.")
def create_network_trigger(function_name):
# Initialize Kubernetes API client
trigger_name = fimctopm_name + "-trigger"
api_instance = client.CustomObjectsApi()
# Define trigger object
trigger = {
"apiVersion": "networking.knative.dev/v1",
"kind": "Trigger",
"metadata": {
"name": trigger_name
},
"spec": {
"broker": "default",
"filter": {
"attributes": {
"type": "sh.keptn.event.project.create"
}
},
"subscriber": {
"ref": {
"apiVersion": "serving.knative.dev/v1",
"kind": "Service",
"name": function_name
}
}
}
}
# Create trigger
print("Creating network trigger...")
api_instance.create_namespaced_custom_object(group="networking.knative.dev", version="v1", namespace="default", plural="triggers", body=trigger)
# Wait for trigger to be successfully created
while True:
time.sleep(5) # Check trigger status every 5 seconds
try:
api_instance.get_namespaced_custom_object(group="networking.knative.dev", version="v1", namespace="default", plural="triggers", name=trigger_name)
print("Trigger created successfully.")
break
except client.rest.ApiException as e:
if e.status != 404:
raise
def get_trigger_url(function_name):
trigger_name = function_name + "-network"
# Initialize Kubernetes API client
api_instance = client.CustomObjectsApi()
# Get trigger details
trigger = api_instance.get_namespaced_custom_object(group="networking.knative.dev", version="v1", namespace="default", plural="triggers", name=trigger_name)
url = trigger["spec"]["subscriber"]["ref"]["name"]
print("Trigger URL:", url)
if __name__ == "__main__":
# Deploy function
function_name = "example2"
deploy_function(function_name)
# Create service
create_service(function_name)
# Create network trigger
create_network_trigger(function_name)
# Get trigger URL
get_trigger_url(function_name)
FROM python:3.9-slim
RUN mkdir /app
# Set the working directory in the container
WORKDIR /app
ADD . /app/
# Copy the function code into the container
COPY function.py .
# Install any dependencies
RUN pip install -r requirements.txt
RUN pip install prometheus_client
RUN pip install psutil
EXPOSE 5000
# Define the command to run the function
CMD ["python", "/app/function.py"]
from flask import Flask, request, jsonify
import ctypes
from prometheus_client import Counter, Gauge, start_http_server
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, CollectorRegistry
import psutil
import time
import multiprocessing
app = Flask(__name__)
# Define Prometheus metrics
function_requests_total = Counter('function_requests_total', 'Total number of requests to the function')
cpu_utilization_gauge = Gauge('cpu_utilization_percentage', 'CPU utilization percentage')
request_rate_gauge = Gauge('function_request_rate', 'Function request rate (requests per second)')
service_time_gauge = Gauge('function_service_time_seconds', 'Service time of the function in seconds')
network_bytes_sent = Counter('network_bytes_sent_total', 'Total number of bytes sent over the network')
network_bytes_recv = Counter('network_bytes_received_total', 'Total number of bytes received over the network')
start_time=time.time()
@app.route("/clear_metrics")
def clear_metrics():
function_requests_total._value.set(0)
cpu_utilization_gauge.set(0)
service_time_gauge.set(0)
network_bytes_sent._value.set(0)
network_bytes_recv._value.set(0)
start_time=time.time()
registry = CollectorRegistry()
return generate_latest(registry), 200, {"Content-Type": CONTENT_TYPE_LATEST}
cpu_process = None
def occupy_cpu(percentage):
while True:
# Start time
start_time = time.time()
# Perform CPU-bound task
while (time.time() - start_time) < (percentage / 100):
pass
# Sleep to balance CPU usage
time.sleep(1 - (percentage / 100))
@app.route('/occupy_cpu/<int:percentage>', methods=['GET'])
def start_cpu_occupier(percentage):
global cpu_process
if 0 <= percentage <= 100:
# Kill previous process if exists
if cpu_process and cpu_process.is_alive():
cpu_process.terminate()
# Create a new process for occupying CPU
cpu_process = multiprocessing.Process(target=occupy_cpu, args=(percentage,))
cpu_process.start()
return jsonify({'message': f'CPU is being occupied at {percentage}%'}), 200
else:
return jsonify({'error': 'Invalid percentage, must be between 0 and 100'}), 400
memory_process = None
def allocate_memory(memory_size):
try:
memory_size = int(memory_size) * 1024*1024
except ValueError:
return jsonify({'error': 'Invalid memory size'}), 400
if memory_size <= 0:
return jsonify({'error': 'Memory size must be greater than 0'}), 400
# Allocate memory of specified size
ptr = ctypes.c_char * memory_size
memory_block = ptr()
return jsonify({'message': 'Memory allocated successfully'}), 200
@app.route('/allocate_memory/<int:memory_size>', methods=['GET'])
def start_memory_allocator(memory_size):
global memory_process
# Kill previous process if exists
if memory_process and memory_process.is_alive():
memory_process.terminate()
# Create a new process for memory allocation
memory_process = multiprocessing.Process(target=allocate_memory, args=(memory_size,))
memory_process.start()
return jsonify({'message': f'Memory allocated with size {memory_size}'}), 200
@app.route("/")
def hello():
time_invoked=time.time()
# Increment request counter
function_requests_total.inc()
time.sleep(0.1)
service_time = time.time() - time_invoked
service_time_gauge.set(service_time)
# Collect CPU utilization
cpu_utilization = psutil.cpu_percent()
cpu_utilization_gauge.set(cpu_utilization)
return "Hello from Python function"
@app.route("/metrics")
def metrics():
registry = CollectorRegistry()
# Register your Prometheus metrics collectors here
# For example:
# registry.register(...)
elapsed_time = time.time() - start_time
registry.register(function_requests_total)
registry.register(cpu_utilization_gauge)
registry.register(service_time_gauge)
registry.register(network_bytes_sent)
registry.register(network_bytes_recv)
metric=generate_latest(registry)
for line in metric.decode().split("\n"):
if "function_requests_total" in line:
try:
value = float(line.split()[1])
request_rate = value/elapsed_time
except:
pass
if "function_requests_total" in line:
try:
no_of_requests = float(line.split()[1])
except:
pass
if "cpu_utilization_percentage" in line:
try:
cpu_utilization = float(line.split()[1])
except:
pass
if "function_service_time_seconds" in line:
try:
service_time= float(line.split()[1])
except:
pass
if "network_bytes_sent_total" in line:
try:
bytes_sent= float(line.split()[1])
except:
pass
if "network_bytes_received_total" in line:
try:
bytes_received= float(line.split()[1])
except:
pass
request_rate_gauge.set(request_rate)
registry.register(request_rate_gauge)
network_stats = psutil.net_io_counters()
network_bytes_sent.inc(network_stats.bytes_sent)
network_bytes_recv.inc(network_stats.bytes_recv)
print("Number of Requests: ",no_of_requests)
print("Request_rate: ",request_rate)
print("Service Time: ",service_time)
print("CPU utilization: ",cpu_utilization)
print("Bytes send: ",bytes_sent)
print("Bytes received: ",bytes_received)
return generate_latest(registry), 200, {"Content-Type": CONTENT_TYPE_LATEST}
if __name__ == "__main__":
# Start Prometheus HTTP server to expose metrics
start_http_server(8000)
# Run Flask app
app.run(host='0.0.0.0')
apiVersion: v1
kind: Pod
metadata:
name: private-reg
spec:
containers:
- name: private-reg-container
image: msanth/example1:latest
imagePullSecrets:
- name: regcred
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment