Commit efcc7cd9 authored by Anshul's avatar Anshul

Newer device plugin

parent 9555df12
docker build -t xzaviourr/mps-device-plugin:v$1 .
docker push xzaviourr/mps-device-plugin:v$1
\ No newline at end of file
......@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"log"
"net"
"strconv"
"sync"
......@@ -24,23 +25,20 @@ const (
var (
computeResourceName = "nvidia-mps.com/vcore"
memResourceName = "nvidia-mps.com/vmemory"
)
type mpsGPUManager struct {
grpcServer *grpc.Server
devices map[string]pluginapi.Device
computePartitionSize int // In utilization (eg. 1%)
memPartitionSize int // In MB (eg. 256MB)
socket string
stop chan bool
}
func NewMpsGPUManager(computePartitionSize, memPartitionSize int) *mpsGPUManager {
func NewMpsGPUManager(computePartitionSize int) *mpsGPUManager {
return &mpsGPUManager{
devices: make(map[string]pluginapi.Device),
computePartitionSize: computePartitionSize,
memPartitionSize: memPartitionSize,
stop: make(chan bool),
}
}
......@@ -80,10 +78,10 @@ type pluginService struct {
// }
func (mgm *mpsGPUManager) Serve() {
glog.Infof("Starting MPS GPU Manager")
log.Println("Serve() called ...")
lis, err := net.Listen("unix", pluginEndpoint)
if err != nil {
glog.Fatal("starting device plugin server failed : %v", err)
log.Fatalf("starting device plugin server failed : %v", err)
}
mgm.socket = pluginEndpoint
mgm.grpcServer = grpc.NewServer()
......@@ -106,14 +104,14 @@ func (mgm *mpsGPUManager) Serve() {
defer wg.Done()
// Blocking call to accept incoming connections.
err := mgm.grpcServer.Serve(lis)
glog.Errorf("device-plugin server stopped serving: %v", err)
log.Fatalf("device-plugin server stopped serving: %v", err)
}()
if !registeredWithKubelet {
for len(mgm.grpcServer.GetServiceInfo()) <= 0 {
time.Sleep(1 * time.Second)
}
glog.Infoln("device-plugin server started serving")
log.Println("device-plugin server started serving")
err = RegisterToKubelet()
if err != nil {
......@@ -121,7 +119,7 @@ func (mgm *mpsGPUManager) Serve() {
wg.Wait()
glog.Fatal(err)
}
glog.Infoln("device plugin registered with kubelet")
log.Println("device plugin registered with kubelet")
registeredWithKubelet = true
}
}
......@@ -130,6 +128,7 @@ func (mgm *mpsGPUManager) Serve() {
}
func RegisterToKubelet() error {
log.Println("RegisterToKubelet() called ...")
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
......@@ -153,23 +152,24 @@ func RegisterToKubelet() error {
}
func (ps *pluginService) RegisterService() {
log.Println("RegisterSerivce() called ...")
pluginapi.RegisterDevicePluginServer(ps.mgm.grpcServer, ps)
}
func (mgm *mpsGPUManager) Stop() {
log.Println("Stop() called ...")
if mgm.grpcServer != nil {
mgm.grpcServer.Stop()
}
mgm.stop <- true
<-mgm.stop
glog.Infof("MPS GPU Manager stopped")
log.Println("MPS GPU Manager stopped")
}
func (ps *pluginService) ListDevices() []*pluginapi.Device {
gpuMemoryAvailable := 16384 // Using static value for now
log.Println("ListDevices() called ...")
computeDevicesCount := 100 / ps.mgm.computePartitionSize
memoryDevicesCount := gpuMemoryAvailable / ps.mgm.memPartitionSize
virtualDevices := make([]*pluginapi.Device, computeDevicesCount+memoryDevicesCount)
virtualDevices := make([]*pluginapi.Device, computeDevicesCount)
for i := 0; i < computeDevicesCount; i++ {
virtualDeviceID := fmt.Sprintf("%s-%d", computeResourceName, i)
......@@ -178,29 +178,23 @@ func (ps *pluginService) ListDevices() []*pluginapi.Device {
Health: pluginapi.Healthy,
}
}
for i := 0; i < memoryDevicesCount; i++ {
virtualDeviceID := fmt.Sprintf("%s-%d", memResourceName, i)
virtualDevices[computeDevicesCount+i] = &pluginapi.Device{
ID: virtualDeviceID,
Health: pluginapi.Healthy,
}
}
return virtualDevices
}
func (ps *pluginService) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
log.Println("ListAndWatch() called ...")
resp := new(pluginapi.ListAndWatchResponse)
resp.Devices = ps.ListDevices()
if err := stream.Send(resp); err != nil {
glog.Infof("Error sending device list : %v", err)
log.Fatalf("error sending device list : %v", err)
return err
}
glog.Infof("Successfully sent the list of devices ...")
log.Println("successfully sent the list of devices ...")
select {}
}
func (ps *pluginService) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
log.Println("Allocate() called ...")
allocateResponse := &pluginapi.AllocateResponse{}
for _, req := range rqt.ContainerRequests {
......@@ -218,12 +212,13 @@ func (ps *pluginService) Allocate(ctx context.Context, rqt *pluginapi.AllocateRe
containerAllocateResponse.Envs = envs
containerAllocateResponse.Mounts = mounts
allocateResponse.ContainerResponses = append(allocateResponse.ContainerResponses, containerAllocateResponse)
glog.Infof("Successfully allocated the devices ...")
log.Println("successfully allocated the required devices ...")
}
return allocateResponse, nil
}
func (ps *pluginService) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
log.Println("GetDevicePluginOptions() called ...")
return &pluginapi.DevicePluginOptions{
PreStartRequired: false,
GetPreferredAllocationAvailable: false,
......@@ -231,6 +226,7 @@ func (ps *pluginService) GetDevicePluginOptions(context.Context, *pluginapi.Empt
}
func (ps *pluginService) GetPreferredAllocation(ctx context.Context, rqt *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
log.Println("GetPreferredAllocation() called ...")
preferredAllocateResponse := &pluginapi.PreferredAllocationResponse{}
for _, req := range rqt.ContainerRequests {
......@@ -246,15 +242,13 @@ func (ps *pluginService) GetPreferredAllocation(ctx context.Context, rqt *plugin
}
func (ps *pluginService) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
log.Println("PreStartContainer() called ...")
preStartContainerResponse := pluginapi.PreStartContainerResponse{}
return &preStartContainerResponse, nil
}
func main() {
mgm := NewMpsGPUManager(1, 256)
mgm := NewMpsGPUManager(1)
defer mgm.Stop()
mgm.Serve()
// if err := mgm.Serve(); err != nil {
// glog.Fatalf("Error starting the MPS GPU Manager : %v", err)
// }
}
......@@ -20,7 +20,7 @@ spec:
mps-gpu-enabled: "true"
containers:
- name: mps-device-plugin
image: xzaviourr/mps-device-plugin:v4
image: xzaviourr/mps-device-plugin:v6.1
securityContext:
privileged: true
volumeMounts:
......
apiVersion: v1
kind: Pod
metadata:
name: cuda-test
spec:
restartPolicy: OnFailure
containers:
- name: cuda-vector-add
image: "nvidia/samples:vectoradd-cuda10.2"
resources:
requests:
nvidia.com/gpu: 5
limits:
nvidia.com/gpu: 5
\ No newline at end of file
......@@ -13,12 +13,6 @@ kubectl apply -f https://raw.githubusercontent.com/flannel-io/flannel/master/Doc
kubectl taint nodes ub-10 node-role.kubernetes.io/master- # Allow device plugins and pods to run on master
kubectl label node ub-10 mps-gpu-enabled=true # Add device plugin label
# Delete daemonset
# kubectl delete daemonset gpu-device-plugin-daemonset
# kubectl delete clusterrolebinding gpu-device-plugin-manager-role
# Attach daemonset again
# kubectl create namespace gpu-device-plugin-namespace
kubectl create sa mps-device-plugin-manager -n kube-system
kubectl create clusterrolebinding mps-device-plugin-manager-role --clusterrole=cluster-admin --serviceaccount=kube-system:mps-device-plugin-manager
kubectl apply -f mps-manager.yaml
\ No newline at end of file
......@@ -2,6 +2,7 @@ FROM golang:1.20.4-alpine3.18
WORKDIR /app
ENV KUBECONFIG=/root/.kube/config
ENV CACERT=/root/.kube/ca.crt
COPY scheduler_ext.go .
RUN go mod init scheduler_ext
......
......@@ -2,6 +2,11 @@ apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: gpu-scheduler-extender-profile
pluginConfig:
- name: gpu-scheduler-extender
- args:
kubeConfigPath: "/etc/kubernetes/scheduler.conf"
pluginConfigPath: "/etc/kubernetes/gpu-scheduler-extender-plugin-config.conf"
plugins:
score:
enabled:
......@@ -15,8 +20,5 @@ profiles:
postFilter:
enabled:
- name: "gpu-scheduler-extender"
pluginConfig:
- name: DevicePlugin
- args:
kubeConfigPath: "/etc/kubernetes/scheduler.conf"
pluginConfigPath: "/etc/kubernetes/gpu-scheduler-extender-plugin-config.conf"
\ No newline at end of file
clientConnection:
kubeconfig: "/etc/kubernetes/scheduler.conf"
// package main
// import (
// "context"
// "fmt"
// "os"
// "time"
// "google.golang.org/grpc"
// corev1 "k8s.io/api/core/v1"
// v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "k8s.io/apimachinery/pkg/util/wait"
// "k8s.io/client-go/kubernetes"
// "k8s.io/client-go/tools/cache"
// "k8s.io/client-go/tools/clientcmd"
// "k8s.io/client-go/util/workqueue"
// "k8s.io/klog"
// pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
// )
// const (
// pluginName = "scheduler_ext"
// gpuMemoryKey = "mps.iitb/gpu-vmemory"
// gpuCoresKey = "mps.iitb/gpu-vcore"
// resyncPeriod = 5 * time.Minute
// defaultWeight = 1
// )
// type MPSDevicePluginScheduler struct {
// clientset kubernetes.Interface
// queue workqueue.RateLimitingInterface
// deviceConn *grpc.ClientConn
// deviceClient pluginapi.DevicePluginClient
// }
// func main() {
// kubeconfig := os.Getenv("KUBECONFIG")
// config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
// if err != nil {
// klog.Fatalf("Error building kubeconfig: %v", err)
// }
// clientset, err := kubernetes.NewForConfig(config)
// if err != nil {
// klog.Fatalf("Error creating clientset: %v", err)
// }
// deviceConn, err := grpc.Dial("unix:/var/lib/kubelet/device-plugins/gpu-device-plugin.sock", grpc.WithInsecure())
// if err != nil {
// klog.Fatalf("Error connecting to device plugin: %v", err)
// }
// deviceClient := pluginapi.NewDevicePluginClient(deviceConn)
// scheduler := &MPSDevicePluginScheduler{
// clientset: clientset,
// queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
// deviceConn: deviceConn,
// deviceClient: deviceClient,
// }
// stopCh := make(chan struct{})
// defer close(stopCh)
// go scheduler.run(stopCh)
// wait.Until(func() {
// if err := scheduler.syncPods(); err != nil {
// klog.Errorf("Error syncing pods: %v", err)
// }
// }, resyncPeriod, stopCh)
// <-stopCh
// }
// func (s *MPSDevicePluginScheduler) run(stopCh <-chan struct{}) {
// go wait.Until(s.worker, time.Second, stopCh)
// <-stopCh
// }
// func (s *MPSDevicePluginScheduler) worker() {
// for s.processNextItem() {
// }
// }
// func (s *MPSDevicePluginScheduler) processNextItem() bool {
// obj, shutdown := s.queue.Get()
// if shutdown {
// return false
// }
// err := func(obj interface{}) error {
// defer s.queue.Done(obj)
// var key string
// var ok bool
// if key, ok = obj.(string); !ok {
// s.queue.Done(obj)
// klog.Errorf("Expected string in workqueue, but got %#v", obj)
// return nil
// }
// if err := s.syncPod(key); err != nil {
// return fmt.Errorf("error syncing pod '%s': %v", key, err)
// }
// s.queue.Forget(obj)
// klog.Infof("Successfully synced pod '%s'", key)
// return nil
// }(obj)
// if err != nil {
// klog.Error(err)
// return true
// }
// return true
// }
// func (s *MPSDevicePluginScheduler) syncPod(key string) error {
// namespace, name, err := cache.SplitMetaNamespaceKey(key)
// if err != nil {
// return err
// }
// pod, err := s.clientset.CoreV1().Pods(namespace).Get(context.TODO(), name, v1.GetOptions{})
// if err != nil {
// return err
// }
// if isPodScheduled(pod) {
// return nil
// }
// nodeName, err := s.findBestNode(pod)
// if err != nil {
// return err
// }
// boundPod := &corev1.Pod{
// ObjectMeta: v1.ObjectMeta{
// Namespace: pod.Namespace,
// Name: pod.Name,
// },
// Spec: pod.Spec,
// Status: pod.Status,
// }
// boundPod.Spec.NodeName = nodeName
// if _, err := s.clientset.CoreV1().Pods(pod.Namespace).Update(context.TODO(), boundPod, v1.UpdateOptions{}); err != nil {
// return err
// }
// return nil
// }
// func (s *MPSDevicePluginScheduler) findBestNode(pod *corev1.Pod) (string, error) {
// nodes, err := s.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{
// LabelSelector: "gpu=true",
// })
// if err != nil {
// return "", err
// }
// var bestNode string
// bestScore := -1
// for _, node := range nodes.Items {
// if !s.isMPSAvailable(node) {
// continue
// }
// score := s.calculateNodeScore(node, pod)
// if score > bestScore {
// bestNode = node.Name
// bestScore = score
// }
// }
// return bestNode, nil
// }
// func (s *MPSDevicePluginScheduler) isMPSAvailable(node corev1.Node) bool {
// if node.Labels == nil {
// return false
// }
// _, mpsEnabled := node.Labels["mps-enabled"]
// return mpsEnabled
// }
// func (s *MPSDevicePluginScheduler) calculateNodeScore(node corev1.Node, pod *corev1.Pod) int {
// score := 0
// for _, container := range pod.Spec.Containers {
// if container.Resources.Requests != nil {
// gpuMemoryReq, ok := container.Resources.Requests[corev1.ResourceName(gpuMemoryKey)]
// if !ok {
// continue
// }
// gpuMemoryNode, ok := node.Status.Allocatable[corev1.ResourceName(gpuMemoryKey)]
// if !ok {
// continue
// }
// if gpuMemoryReq.Value() <= gpuMemoryNode.Value() {
// score++
// } else {
// score--
// }
// }
// if container.Resources.Requests != nil {
// gpuCoresReq, ok := container.Resources.Requests[corev1.ResourceName(gpuCoresKey)]
// if !ok {
// continue
// }
// gpuCoresNode, ok := node.Status.Allocatable[corev1.ResourceName(gpuCoresKey)]
// if !ok {
// continue
// }
// if gpuCoresReq.Value() <= gpuCoresNode.Value() {
// score++
// } else {
// score--
// }
// }
// }
// return score
// }
// func (s *MPSDevicePluginScheduler) syncPods() error {
// pods, err := s.clientset.CoreV1().Pods("").List(context.TODO(), v1.ListOptions{
// FieldSelector: "status.phase!=Succeeded,status.phase!=Failed",
// })
// if err != nil {
// return err
// }
// for _, pod := range pods.Items {
// key, err := cache.MetaNamespaceKeyFunc(&pod)
// if err != nil {
// klog.Errorf("Error getting key for pod '%s/%s': %v", pod.Namespace, pod.Name, err)
// continue
// }
// s.queue.Add(key)
// }
// return nil
// }
// func isPodScheduled(pod *corev1.Pod) bool {
// return pod.Spec.NodeName != ""
// }
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
......@@ -45,7 +297,29 @@ func main() {
klog.Fatalf("Error creating clientset: %v", err)
}
deviceConn, err := grpc.Dial("unix:/var/lib/kubelet/device-plugins/gpu-device-plugin.sock", grpc.WithInsecure())
// Load the CA certificate
caCertFile := os.Getenv("CACERT")
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
klog.Fatalf("Error loading CA certificate: %v", err)
}
// Create a certificate pool and add the CA certificate
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCert) {
klog.Fatalf("Failed to append CA certificate to the certificate pool")
}
// Create TLS credentials with the certificate pool and skip certificate verification
creds := credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
RootCAs: certPool,
})
// Create a dial option with the TLS credentials
dialOption := grpc.WithTransportCredentials(creds)
deviceConn, err := grpc.Dial("unix:/var/lib/kubelet/device-plugins/mps-device-plugin.sock", dialOption) //grpc.WithInsecure()
if err != nil {
klog.Fatalf("Error connecting to device plugin: %v", err)
}
......
#!/bin/bash
# Setup kubernetes cluster
sudo kubeadm reset # Delete existing master
rm $HOME/.kube/config
sudo rm -rf /etc/cni/net.d
sudo swapoff -a # Swapoff
sudo kubeadm init --pod-network-cidr=10.244.0.0/16 # Initialize cluster
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config
kubectl apply -f https://raw.githubusercontent.com/flannel-io/flannel/master/Documentation/kube-flannel.yml # Use flannel for networking
kubectl taint nodes ub-10 node-role.kubernetes.io/master- # Allow device plugins and pods to run on master
kubectl label node ub-10 mps-gpu-enabled=true # Add device plugin label
# Delete daemonset
# kubectl delete daemonset gpu-device-plugin-daemonset
# kubectl delete clusterrolebinding gpu-device-plugin-manager-role
# Attach daemonset again
# kubectl create namespace gpu-device-plugin-namespace
kubectl create sa mps-device-plugin-manager -n kube-system
kubectl create clusterrolebinding mps-device-plugin-manager-role --clusterrole=cluster-admin --serviceaccount=kube-system:mps-device-plugin-manager
kubectl apply -f mps-manager.yaml
\ No newline at end of file
module mpsmanager
go 1.20
require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
k8s.io/kubelet v0.27.2 // indirect
)
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
k8s.io/kubelet v0.27.2 h1:vpJnBkqQjxItEhehKG0toXoZ+G+tf4UXAOqtMJy6qgc=
k8s.io/kubelet v0.27.2/go.mod h1:1SVrHaLnuw53nQJx8036k9HjE0teDXZtbN51cYC0HSc=
package main
func main() {
mm := NewMPSManager()
mm.RunGRPCService()
mm.RegisterToKubelet()
select {}
}
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: mps-device-plugin
namespace: kube-system
spec:
selector:
matchLabels:
app: mps-device-plugin
template:
metadata:
labels:
app: mps-device-plugin
spec:
hostPID: true
hostIPC: true
hostNetwork: true
serviceAccount: mps-device-plugin-manager
nodeSelector:
mps-gpu-enabled: "true"
containers:
- name: mps-device-plugin
image: xzaviourr/mps-device-plugin:v5.1
securityContext:
privileged: true
volumeMounts:
- name: device-plugin
mountPath: /device-plugins
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
package main
import (
"context"
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
kubeletEndpoint = "/var/lib/kubelet/device-plugins/kubelet.sock"
)
type deviceManager struct {
grpcServer *grpc.Server
devices []*pluginapi.Device
socket string
manager *mpsManager
}
type DeviceServer interface {
Run() error
Stop()
DeviceEndpoint() string
DeviceName() string
}
type mpsManager struct {
deviceServers map[string]DeviceServer
grpcServer *grpc.Server
}
func NewMPSManager() mpsManager {
return mpsManager{
deviceServers: make(map[string]DeviceServer),
grpcServer: grpc.NewServer(),
}
}
func (mm *mpsManager) RunGRPCService() {
vcoreServer := NewVcoreServer(mm)
mm.deviceServers[vcoreServer.DeviceName()] = vcoreServer
for name, srv := range mm.deviceServers {
log.Printf("server %s is running", name)
go srv.Run()
}
}
func (mm *mpsManager) RegisterToKubelet() error {
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
if err != nil {
return fmt.Errorf("device plugin cannot connect to the kubelet service: %v", err)
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
for _, srv := range mm.deviceServers {
request := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: srv.DeviceEndpoint(),
ResourceName: srv.DeviceName(),
}
if _, err = client.Register(context.Background(), request); err != nil {
return fmt.Errorf("device plugin cannot register to kubelet service: %v", err)
}
}
return nil
}
package main
import (
"context"
"fmt"
"log"
"net"
"strconv"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
vcoreEndpoint = "/var/lib/kubelet/device-plugins/vcore.sock"
computeResourceName = "nvidia-mps.com/vcore"
computePartitionSize = 1
)
type vcoreManager struct {
deviceManager
}
func NewVcoreServer(mm *mpsManager) DeviceServer {
return vcoreManager{
deviceManager: deviceManager{
grpcServer: grpc.NewServer(),
manager: mm,
},
}
}
func (vm vcoreManager) DeviceName() string {
log.Println("vcore-DeviceName() called ...")
return computeResourceName
}
func (vm vcoreManager) DeviceEndpoint() string {
log.Println("vcore-DeviceEndpoint() called ...")
return vcoreEndpoint
}
func (vm vcoreManager) Run() error {
log.Println("vcore-Run() called ...")
log.Println("starting vcore-server ...")
lis, err := net.Listen("unix", vcoreEndpoint)
if err != nil {
log.Println("starting vcore-server failed : %v", err)
}
vm.socket = vcoreEndpoint
vm.grpcServer = grpc.NewServer()
pluginapi.RegisterDevicePluginServer(vm.grpcServer, vm)
return vm.grpcServer.Serve(lis)
}
func (vm vcoreManager) Stop() {
log.Println("vcore-Stop() called ...")
vm.grpcServer.Stop()
}
func (vm vcoreManager) ListDevices() []*pluginapi.Device {
log.Println("vcore-ListDevices() called ...")
computeDevicesCount := 100 / computePartitionSize
virtualDevices := make([]*pluginapi.Device, computeDevicesCount)
for i := 0; i < computeDevicesCount; i++ {
virtualDeviceID := fmt.Sprintf("%s-%d", computeResourceName, i)
virtualDevices[i] = &pluginapi.Device{
ID: virtualDeviceID,
Health: pluginapi.Healthy,
}
}
return virtualDevices
}
func (vm vcoreManager) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
log.Println("vcore-ListAndWatch() called ...")
resp := new(pluginapi.ListAndWatchResponse)
resp.Devices = vm.ListDevices()
vm.devices = resp.Devices
if err := stream.Send(resp); err != nil {
log.Println("error sending device list of vcores ... %v", err)
return err
}
log.Println("successfully sent the list of devices of vcores ...")
select {}
}
func (vm vcoreManager) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
log.Println("vcore-Allocate() called ...")
allocateResponse := &pluginapi.AllocateResponse{}
for _, req := range rqt.ContainerRequests {
containerAllocateResponse := &pluginapi.ContainerAllocateResponse{}
totalCompute := len(req.DevicesIDs)
envs := make(map[string]string)
envs["CUDA_MPS_ACTIVE_THREAD_PERCENTAGE"] = strconv.Itoa(totalCompute)
envs["CUDA_MPS_PINNED_DEVICE_MEM_LIMIT"] = "0=2G"
mounts := []*pluginapi.Mount{
{
ContainerPath: "/usr/local/nvidia",
HostPath: "/usr/local/nvidia",
},
}
containerAllocateResponse.Envs = envs
containerAllocateResponse.Mounts = mounts
allocateResponse.ContainerResponses = append(allocateResponse.ContainerResponses, containerAllocateResponse)
log.Println("successfully allocated the devices for vcore ...")
}
return allocateResponse, nil
}
func (vm vcoreManager) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
log.Println("vcore-GetDevicePluginOptions() called ...")
return &pluginapi.DevicePluginOptions{
PreStartRequired: false,
GetPreferredAllocationAvailable: false,
}, nil
}
func (vm vcoreManager) GetPreferredAllocation(ctx context.Context, rqt *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
log.Println("vcore-getPreferredAllocation() called ...")
preferredAllocateResponse := &pluginapi.PreferredAllocationResponse{}
return preferredAllocateResponse, nil
}
func (vm vcoreManager) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
log.Println("vcore-PreStartContainer() called ...")
preStartContainerResponse := pluginapi.PreStartContainerResponse{}
return &preStartContainerResponse, nil
}
apiVersion: v1
kind: Pod
metadata:
name: cuda-test
name: cuda-test1
spec:
restartPolicy: OnFailure
#automountServiceAccountToken: false
......@@ -12,6 +12,6 @@ spec:
image: "nvidia/samples:vectoradd-cuda10.2"
resources:
requests:
xzaviourr/gpu-vcore: 5
nvidia.com/gpu: 5
limits:
xzaviourr/gpu-vcore: 5
\ No newline at end of file
nvidia.com/gpu: 5
\ No newline at end of file
# Kubernetes with MPS
\ No newline at end of file
# Kubernetes with MPS
Kubelet logs -
```
sudo journalctl -u kubelet.service -f
```
\ No newline at end of file
......@@ -19,15 +19,15 @@ kubectl label node ub-10 mps-gpu-enabled=true # Add device plugin label
# Attach daemonset again
# kubectl create namespace gpu-device-plugin-namespace
kubectl create sa gpu-device-plugin-manager -n kube-system
kubectl create clusterrolebinding gpu-device-plugin-manager-role --clusterrole=cluster-admin --serviceaccount=kube-system:gpu-device-plugin-manager
kubectl apply -f gpu_device_plugin/mps-manager.yaml
kubectl create sa mps-device-plugin-manager -n kube-system
kubectl create clusterrolebinding mps-device-plugin-manager-role --clusterrole=cluster-admin --serviceaccount=kube-system:mps-device-plugin-manager
kubectl apply -f device_plugin/mps-manager.yaml
# Start scheduler extender
sudo cp $HOME/.kube/config /root/.kube/config
sudo cp gpu_scheduler_extender/gpu-scheduler-extender-plugin-config.conf /etc/kubernetes/
sudo cp gpu_scheduler_extender/scheduler_config.yaml /etc/kubernetes/
sudo cp gpu_scheduler_extender/kube-scheduler.yaml /etc/kubernetes/manifests/
kubectl apply -f gpu_scheduler_extender/scheduler_extender.yaml
export KUBERNETES_MASTER=https://kubernetes.default.svc.cluster.local
# sudo cp $HOME/.kube/config /root/.kube/config
# sudo cp /etc/kubernetes/pki/ca.crt /root/.kube/ca.crt
# sudo cp gpu_scheduler_extender/gpu-scheduler-extender-plugin-config.conf /etc/kubernetes/
# sudo cp gpu_scheduler_extender/scheduler_config.yaml /etc/kubernetes/
# sudo cp gpu_scheduler_extender/kube-scheduler.yaml /etc/kubernetes/manifests/
# kubectl apply -f gpu_scheduler_extender/scheduler_extender.yaml
# kubectl apply -f gpu_scheduler_extender/scheduler_config.yaml
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment