Commit 5d29e9d6 authored by Anshul's avatar Anshul

Added scheduler configurations

parent f4ffa5f7
apiVersion: apps/v1
kind: Deployment
metadata:
name: device-plugin
spec:
replicas: 1
selector:
matchLabels:
app: gpu-device-plugin
template:
metadata:
labels:
app: gpu-device-plugin
spec:
nodeSelector:
mps-gpu-enabled: "true"
containers:
- name: gpu-device-plugin
image: xzaviourr/gpu-device-plugin:v2
securityContext:
privileged: true
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins/
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins/
\ No newline at end of file
apiVersion: deviceplugin.scheduler.example.com/v1
kind: PluginConfig
devicePluginName: gpu-device-plugin
\ No newline at end of file
apiVersion: v1
kind: Pod
metadata:
creationTimestamp: null
labels:
component: kube-scheduler
tier: control-plane
name: kube-scheduler
namespace: kube-system
spec:
containers:
- command:
- kube-scheduler
- --authentication-kubeconfig=/etc/kubernetes/scheduler.conf
- --authorization-kubeconfig=/etc/kubernetes/scheduler.conf
- --bind-address=127.0.0.1
- --kubeconfig=/etc/kubernetes/scheduler.conf
- --leader-elect=true
- --port=0
- --config=/etc/kubernetes/scheduler_config.yaml
image: k8s.gcr.io/kube-scheduler:v1.22.17
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 8
httpGet:
host: 127.0.0.1
path: /healthz
port: 10259
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 15
name: kube-scheduler
resources:
requests:
cpu: 100m
startupProbe:
failureThreshold: 24
httpGet:
host: 127.0.0.1
path: /healthz
port: 10259
scheme: HTTPS
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 15
volumeMounts:
- mountPath: /etc/kubernetes/scheduler.conf
name: kubeconfig
readOnly: true
- mountPath: /etc/kubernetes/scheduler_config.yaml
name: extenderconfig
readOnly: true
- mountPath: /etc/kubernetes/gpu-scheduler-extender-plugin-config.conf
name: extenderpluginconfig
readOnly: true
hostNetwork: true
priorityClassName: system-node-critical
securityContext:
seccompProfile:
type: RuntimeDefault
volumes:
- hostPath:
path: /etc/kubernetes/scheduler.conf
type: FileOrCreate
name: kubeconfig
- hostPath:
path: /etc/kubernetes/scheduler_config.yaml
type: FileOrCreate
name: extenderconfig
- hostPath:
path: /etc/kubernetes/gpu-scheduler-extender-plugin-config.conf
type: FileOrCreate
name: extenderpluginconfig
status: {}
apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: gpu-scheduler-extender-profile
plugins:
score:
enabled:
- name: "gpu-scheduler-extender"
preFilter:
enabled:
- name: "gpu-scheduler-extender"
filter:
enabled:
- name: "gpu-scheduler-extender"
postFilter:
enabled:
- name: "gpu-scheduler-extender"
pluginConfig:
- name: DevicePlugin
- args:
kubeConfigPath: "/etc/kubernetes/scheduler.conf"
pluginConfigPath: "/etc/kubernetes/gpu-scheduler-extender-plugin-config.conf"
\ No newline at end of file
package main
import (
"fmt"
"log"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/framework"
devicepluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
pluginSocketPath = "/var/lib/kubelet/device-plugins/mydevice.sock"
)
// DevicePluginSchedulerExtension implements the scheduler extension interface.
type DevicePluginSchedulerExtension struct {
// kubeClient is the Kubernetes client.
kubeClient kubernetes.Interface
// devicePlugins is a map of device plugins indexed by their names.
devicePlugins map[string]*devicepluginapi.DevicePlugin
// podQueue is a work queue for handling pods.
podQueue workqueue.RateLimitingInterface
}
// NewDevicePluginSchedulerExtension creates a new instance of DevicePluginSchedulerExtension.
func NewDevicePluginSchedulerExtension() *DevicePluginSchedulerExtension {
return &DevicePluginSchedulerExtension{
devicePlugins: make(map[string]*devicepluginapi.DevicePlugin),
podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}
}
// Name returns the name of the scheduler extension.
func (d *DevicePluginSchedulerExtension) Name() string {
return "DevicePluginSchedulerExtension"
}
// EventsToRegister returns the scheduler events to register for.
func (d *DevicePluginSchedulerExtension) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: framework.Add},
{Resource: framework.Pod, ActionType: framework.Update},
{Resource: framework.Node, ActionType: framework.Add},
{Resource: framework.Node, ActionType: framework.Update},
}
}
// SetBinder is not used in this example, so it is a no-op.
func (d *DevicePluginSchedulerExtension) SetBinder(_ framework.Binder) {}
// SetPreemptor is not used in this example, so it is a no-op.
func (d *DevicePluginSchedulerExtension) SetPreemptor(_ framework.Preemptor) {}
// Filter is not used in this example, so it returns an empty status and no error.
func (d *DevicePluginSchedulerExtension) Filter(_ *framework.CycleState, _ *corev1.Pod, _ *framework.NodeInfo) *framework.Status {
return framework.NewStatus(framework.Success, "")
}
// NormalizeScore is not used in this example, so it returns 0.
func (d *DevicePluginSchedulerExtension) NormalizeScore(_ *framework.CycleState, _ *framework.NodeInfo, _ *corev1.Pod, _ int64) float64 {
return 0
}
// Score is not used in this example, so it returns 0.
func (d *DevicePluginSchedulerExtension) Score(_ *framework.CycleState, _ *framework.NodeInfo, _ *corev1.Pod, _ framework.NodeScoreContainer, _ int64) (float64, *framework.Status) {
return 0, nil
}
// ScoreExtensions returns the score extensions for device plugins.
func (d *DevicePluginSchedulerExtension) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// Permit is not used in this example, so it returns an empty status and no error.
func (d *DevicePluginSchedulerExtension) Permit(_ *framework.CycleState, _ *corev1.Pod, _ *framework.NodeInfo) *framework.Status {
return framework.NewStatus(framework.Success, "")
}
// PreFilterExtensions returns the pre-filter extensions for device plugins.
func (d *DevicePluginSchedulerExtension) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// PostFilterExtensions returns the post-filter extensions for device plugins.
func (d *DevicePluginSchedulerExtension) PostFilterExtensions() framework.PostFilterExtensions {
return nil
}
// NewPod adds the pod to the pod work queue.
func (d *DevicePluginSchedulerExtension) NewPod(pod *corev1.Pod) {
d.podQueue.Add(pod.Namespace + "/" + pod.Name)
}
// UpdatePod adds the pod to the pod work queue.
func (d *DevicePluginSchedulerExtension) UpdatePod(oldPod, newPod *corev1.Pod) {
d.podQueue.Add(newPod.Namespace + "/" + newPod.Name)
}
// NewNode is not used in this example, so it is a no-op.
func (d *DevicePluginSchedulerExtension) NewNode(nodeInfo *framework.NodeInfo) {}
// UpdateNode adds the node to the pod work queue.
func (d *DevicePluginSchedulerExtension) UpdateNode(oldNode, newNode *corev1.Node) {
d.podQueue.Add(nodeName(newNode))
}
// nodeName returns the name of the node.
func nodeName(node *corev1.Node) string {
return node.Name
}
// Run starts the Device Plugin Scheduler Extension.
func (d *DevicePluginSchedulerExtension) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
// Initialize the Kubernetes client.
config, err := clientcmd.BuildConfigFromFlags("", "")
if err != nil {
log.Fatalf("Failed to build Kubernetes config: %v", err)
}
d.kubeClient, err = kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create Kubernetes client: %v", err)
}
// Watch for device plugin events.
devicePluginClient, err := devicepluginapi.NewDevicePluginClient(pluginSocketPath)
if err != nil {
log.Fatalf("Failed to create device plugin client: %v", err)
}
// Register the device plugin.
err = devicePluginClient.Register()
if err != nil {
log.Fatalf("Failed to register device plugin: %v", err)
}
go wait.Until(func() {
d.processNextPod()
}, time.Second, stopCh)
<-stopCh
log.Println("Stopping Device Plugin Scheduler Extension...")
}
// processNextPod processes the next pod in the pod work queue.
func (d *DevicePluginSchedulerExtension) processNextPod() {
for {
key, quit := d.podQueue.Get()
if quit {
return
}
err := d.processPod(key.(string))
if err == nil {
d.podQueue.Forget(key)
} else {
log.Printf("Error processing pod %q: %v", key.(string), err)
d.podQueue.AddRateLimited(key)
}
d.podQueue.Done(key)
}
}
// processPod processes a pod and attempts to schedule it on a suitable node.
func (d *DevicePluginSchedulerExtension) processPod(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("invalid pod key format: %s", key)
}
pod, err := d.kubeClient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod %q: %v", key, err)
}
// Check if the pod requires a device.
if !requiresDevice(pod) {
log.Printf("Pod %q does not require a device. Skipping scheduling.", key)
return nil
}
// Find a suitable node for the pod.
node, err := d.findSuitableNode(pod)
if err != nil {
return fmt.Errorf("failed to find suitable node for pod %q: %v", key, err)
}
// Update the pod's node affinity to the selected node.
err = d.updatePodNodeAffinity(pod, node)
if err != nil {
return fmt.Errorf("failed to update pod %q node affinity: %v", key, err)
}
log.Printf("Scheduled pod %q on node %q", key, node)
return nil
}
// requiresDevice checks if a pod requires a device.
func requiresDevice(pod *corev1.Pod) bool {
for _, container := range pod.Spec.Containers {
for _, resource := range container.Resources.Requests {
if strings.HasPrefix(resource.Name, "device") {
return true
}
}
}
return false
}
// findSuitableNode finds a suitable node for the pod.
func (d *DevicePluginSchedulerExtension) findSuitableNode(pod *corev1.Pod) (string, error) {
// Iterate through the available nodes.
nodes, err := d.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return "", fmt.Errorf("failed to list nodes: %v", err)
}
for _, node := range nodes.Items {
// Check if the node has the required device plugin.
devicePluginName := pod.Annotations["device-plugin"]
if !d.nodeHasDevicePlugin(&node, devicePluginName) {
continue
}
// Check if the node has enough available resources for the pod.
if d.hasEnoughResources(&node, pod) {
return node.Name, nil
}
}
return "", fmt.Errorf("no suitable node found for pod")
}
// nodeHasDevicePlugin checks if a node has the required device plugin.
func (d *DevicePluginSchedulerExtension) nodeHasDevicePlugin(node *corev1.Node, devicePluginName string) bool {
devicePlugins := node.Status.Allocatable.DevicePlugins
for _, plugin := range devicePlugins {
if plugin.Name == devicePluginName {
return true
}
}
return false
}
// hasEnoughResources checks if a node has enough available resources for a pod.
func (d *DevicePluginSchedulerExtension) hasEnoughResources(node *corev1.Node, pod *corev1.Pod) bool {
for _, container := range pod.Spec.Containers {
for resourceName, quantity := range container.Resources.Requests {
nodeQuantity := node.Status.Allocatable[corev1.ResourceName(resourceName)]
if !nodeQuantity.Sub(quantity).IsNonNegative() {
return false
}
}
}
return true
}
// updatePodNodeAffinity updates the pod's node affinity to the selected node.
func (d *DevicePluginSchedulerExtension) updatePodNodeAffinity(pod *corev1.Pod, node string) error {
affinity := &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchFields: []corev1.NodeSelectorRequirement{
{
Key: "metadata.name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{node},
},
},
},
},
},
},
}
pod.Spec.Affinity = affinity
_, err := d.kubeClient.CoreV1().Pods(pod.Namespace).Update(pod)
if err != nil {
return fmt.Errorf("failed to update pod node affinity: %v", err)
}
return nil
}
func main() {
stopCh := make(chan struct{})
defer close(stopCh)
devicePluginSchedulerExtension := NewDevicePluginSchedulerExtension()
devicePluginSchedulerExtension.Run(stopCh)
}
sudo su
cd /var/lib/kubelet/device-plugins
ls
kubectl get daemonsets --all-namespaces
kubectl describe node ub-10
sudo lsof -n -i -P | grep kubelet.sock
kubectl get pods --all-namespace
kubectl logs gpu-device-plugin-daemonset-s684c --namespace gpu-device-plugin-namespace
# Kubernetes with MPS
\ No newline at end of file
......@@ -24,6 +24,10 @@ kubectl create clusterrolebinding gpu-device-plugin-manager-role --clusterrole=c
kubectl apply -f gpu_device_plugin/mps-manager.yaml
# Start scheduler extender
cp $HOME/.kube/config /root/.kube/config
sudo cp $HOME/.kube/config /root/.kube/config
sudo cp gpu_scheduler_extender/gpu-scheduler-extender-plugin-config.conf /etc/kubernetes/
sudo cp gpu_scheduler_extender/scheduler_config.yaml /etc/kubernetes/
sudo cp gpu_scheduler_extender/kube-scheduler.yaml /etc/kubernetes/manifests/
kubectl apply -f gpu_scheduler_extender/scheduler_extender.yaml
export KUBERNETES_MASTER=https://kubernetes.default.svc.cluster.local
# kubectl apply -f gpu_scheduler_extender/scheduler_config.yaml
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment