本期智汇华云,为大家带来“Kubelet Device Plugin 的工作机制”。
device plugin简介
从Kubernetes 1.8开始,官方推荐使用Device Plugins方式来使用GPU、FPGA、NIC、InfiniBand等高性能硬件。
```
Kubernetes provides adevice plugin frameworkthat you can use to advertise system hardware
resources to theKubelet.
Instead of customizing the code for Kubernetes itself, vendors can implement adevice plugin that
you deploy either manually or as aDaemonSet.The targeted devices include GPUs, high-performance
NICs, FPGAs, InfiniBand adapters,and other similar computing resources that may require vendor
specific initializationand setup.
```
整个 Device Plugin 的工作机制可以分成以下四个部分:
device server启动和注册;
device的分配;
device的使用;
device的状态管理。
注:为了方便理解,先注解关键词的含义:
1. device server指向kubelet注册device的服务,如NVIDIA的k8s-device-plugin;
2. deviceManager代表kubelet device plugin的服务ManagerImpl
device server的启动和注册
device server实现了DevicePluginClient接口的方法:
```
type DevicePluginClient interface {
// 获取device server的管理plugin的参数,如PreStartRequired用来判断container启动使用plugin之前
// 是否需要执行PreStartContainer()
GetDevicePluginOptions(ctx context.Context, in *Empty, opts ...grpc.CallOption)
(*DevicePluginOptions, error)
// 监听plugin的状态,当plugin的状态发生变化时及时反馈到deviceManager
ListAndWatch(ctx context.Context, in *Empty, opts ...grpc.CallOption)
(DevicePlugin_ListAndWatchClient, error)
// device server分配plugin,返回container使用该plugin的参数
Allocate(ctx context.Context, in *AllocateRequest, opts ...grpc.CallOption)
(*AllocateResponse, error)
// container启动使用plugin之前需要执行的操作
PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption)
(*PreStartContainerResponse, error)
}
```
device server在启动的时候会监测/var/lib/kubelet/device-plugins/kubelet.sock文件,当该文件被创建,意味着kubelet发生了重启或者刚启动,那么device server会自动重启重新注册。
```
if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
log.Printf("inotify: %s created, restarting.", pluginapi.KubeletSocket)
goto restart
}
```
device server**的启动和注册流程
1.调用Devices()获取自身管理的device详情列表
2.调用Start()方法基于UDS(如/var/lib/kubelet/device-plugins/nvidia.sock)启动一个Grpc服务,kubelet deviceManager就是通过这个UDS和device server通信。
3.调用Register()完成注册,device server通过/var/lib/kubelet/device-plugins/kubelet.sock向kubelet deviceManager注册自己的版本、endpoint(即1中的UDS),名字。
4.调用CheckHealth开启协程检查device列表中各device的健康状况
kubelet deviceManager的Register()负责接收并处理device server的注册请求,先来看看deviceManager的启动:
**deviceManager**的启动
Device Manager(ManagerImpl)的结构体
```
type ManagerImpl struct {
// kubelet对外暴露的socket名,即 kubelet.sock
socketname string
// device plugins' socket的存放的目录,/var/lib/kubelet/device-plugins/
socketdir string
// map对象,key为Resource Name,value为endpoint接口(包括run,stop,allocate,preStartContainer,
// getDevices,callback,isStoped,StopGracePeriodExpired),每个endpoint接口对应一个已注册的device
// plugin,负责与device plugin的gRPC通信及缓存device plugin反馈的device states。
endpoints map[string]endpointInfo
mutex sync.Mutex
// Register服务暴露的gRPC Server
server *grpc.Server
wg sync.WaitGroup
// 用来获取该节点上所有active pods,即non-Terminated状态的Pods, 即:
// allPods := kl.podManager.GetPods()
// activePods := kl.filterOutTerminatedPods(allPods)
activePods ActivePodsFunc
sourcesReady config.SourcesReady
// kubelet收到device plugin的ListAndWatch gRCP stream中有devices state变更时的回调函数,
// 包括有新设备增加、旧设备删除、设备状态变化,所以通过ListAndWatch接口的回调方式,可以实现设备的自动发现
// 和热插拔。
callback monitorCallback
// 缓存所有设备信息,map[设备类型名][设备id]设备详情
allDevices map[string]map[string]pluginapi.Device
// key为Resource Name,value为对应的健康的device IDs。
healthyDevices map[string]sets.String
// key为Resource Name,value为对应的不健康的device IDs。
unhealthyDevices map[string]sets.String
// key为Resource Name,value为已经分配出去的device IDs。
allocatedDevices map[string]sets.String
// 记录每个pod中每个容器的device分配情况
podDevices podDevices
// 在/var/lib/kubelet/device-plugins/目录下创建file store类型的key-value存储文件
// kubelet_internal_checkpoint,用来作为kubelet的device plugin的checkpoint。
checkpointManager checkpointmanager.CheckpointManager
numaNodes []int
// 本地存储设备和pod的对应信息(/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint),
// 具体存储了每个Pod分配的Devices信息PodDeviceEntries, 以及已经注册的Resource Name及对应的Devices IDs
topologyAffinityStore topologymanager.Store
}
```
1.Device Manager在kubelet启动时的NewContainerManager中创建,属于containerManager的子模块
```go
pkg/kubelet/cm/container_manager_linux.go
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface,
nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool,
recorder record.EventRecorder) (ContainerManager, error) {
...略
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
}
```
2.初始化结构体 ManagerImpl{}的主要逻辑
```
//注册callback为genericDeviceUpdateCallback,用来处理对应devices的add,delete,update事件。
manager.callback = manager.genericDeviceUpdateCallback
```
genericDeviceUpdateCallback的实现:
- 将callback中收到的devices状态是Healthy,那么将device ID插入到ManagerImpl中healthDevices中,并从unhealthyDevices中删除。
- 将callback中收到的devices状态是Unhealthy,那么将device ID插入到ManagerImpl中unhealthDevices中,并从healthyDevices中删除。
- 将device plugin反馈的需要delete的devices从healthDevices和unhealthDevices中一并删除。
- 将ManagerImpl中的数据更新到/var/lib/kubelet/device-plugins/kubelet\_internal\_checkpoint文件中。
3.Device Manager的启动Start()ManagerIml.Start()负责启动Device Manager,对外提供gRPC服务。首先读取checkpoint file中数据,恢复ManagerImpl的相关数据,包括:
- podDevices;
- allocatedDevices;
- healthyDevices;
- unhealthyDevices;
- endpoints,
注意这里会将endpoint的stop time设置为当前时间,意味着kubelet restart后,需要等待device plugin进行re-register后,才认为这些resource是可用的。
```
// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := m.removeContents(m.socketdir); err != nil {
klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
}
```
然后将/var/lib/kubelet/device-plugins/下面的所有文件清空,当然checkpiont文件除外,也就是清空所有的socket文件,包括自己的kubelet.sock,以及其他所有之前的device plugin的socket文件。
最后创建kubelet.sock并启动gRPC Server对外提供gRPC服务,其中Register()用于Device plugin调用进行插件注册。
```
// Detect a kubelet restart by watching for a newly created
// 'pluginapi.KubeletSocket' file. When this occurs, restart this loop,
// restarting all of the plugins in the process.
case event := <-watcher.Events:
if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
log.Printf("inotify: %s created, restarting.", pluginapi.KubeletSocket)
goto restart
}
```
device server会监控kubelet.sock文件是否为新创建,如果是,则会触发自己的向kubelet重新注册自己。
deviceManager的注册逻辑
1.deviceManager接收到device server的 RegisterRequest请求,其结构体如下
```
type RegisterRequest struct {
Version string
Endpoint string
ResourceName string
Options *DevicePluginOptions
}
```
2.检查注册的device Name、version是否符合Extended Resource的规则,Name不能属于kubernetes.io,得有自己的domain,比如nvidia.com
3.根据endpoint信息创建EndpointImpl对象,即根据endpoint建立socket连接
4.将device name、 version、endpoint和socket连接存入deviceManager的缓存中
5.执行EndpointImpl对象的run(),在run方法中:
```
func (e *endpointImpl) run() {
//建立stream conn
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}
for {
response, err := stream.Recv()
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}
devs := response.Devices
klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
var newDevs []pluginapi.Device
for _, d := range devs {
newDevs = append(newDevs, *d)
}
e.callback(e.resourceName, newDevs)
}
}
```
调用device server的ListAndWatch gRPC接口,通过长连接持续获取ListAndWatch gRPC stream,从stream流中获取的devices详情列表然后调用Endpoint的callback(也就是ManagerImpl注册的callback方法genericDeviceUpdateCallback)进行Device Manager的缓存更新并写到checkpoint文件中。
run()是通过协程启动的,持续获取device server的ListAndWatch结果,持续更新device状态;
当获取异常时,deviceManager断开连接,将device设置为不健康的状态。
回到device server,阅读下ListAndWatch()的工作
```
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty,
s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.cachedDevices})
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID)
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.cachedDevices})
}
}
}
```
1.先是立马返回device详情列表
2.开启协程,一旦感知device的健康状态发生变化了,更细device详情列表再次返回给deviceManager
3.回想起健康检查,device server的CheckHealth就是2的生产者
注册流程基本走通,了解k8s plugin manager的同学可能有个疑问,plugin manager也能注册device server,那两者是一个什么关系?
简述plugin manager的工作机制
1.PluginManager会监听/var/lib/kubelet/plugins\_registry下的UDS文件,当有新创建的文件就会注册它,当文件被删同样也会删除该plugin;该部分注册流程,使用k8s常见的actual-desired模式,本文略过
2.当前支持两种类型的
- CSIPlugin-->csi.PluginHandler
- DevicePlugin-->PluginRegistrationHandler
3.plugin客户端注册时需要在GetInfo()指定类型,如csi-node-driver-registrar注册CSI插件时会指定CSIPlugin
4.继续分析下PluginRegistrationHandler:
PluginRegistrationHandler最终还是调用device manager来注册device server
```
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return cm.deviceManager.GetWatcherHandler()
}
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
}
```
RegisterPlugin()和Register()最终调用的注册方法是一样的,也就说(以nvidia为例):
1.device server是通过/var/lib/kubelet/device-plugins/kubelet.sock主动向deviceManager注册自己/var/lib/kubelet/device-plugins/nvidia.sock
2.pluginManager是监听/var/lib/kubelet/plugins\_registry目录下的UDS,当/var/lib/kubelet/plugins\_registry/nvidia-xxx-reg.sock被创建,pluginManager主动通过该uds文件和注册服务通信,获取device server的注册信息包括name、version、endpoint(/var/lib/kubelet/plugins/nvidia-xxx/xxx.sock)
总结:deviceManager和pluginManager并没有什么关联,deviceManager更专注于注册device server,pluginManager可以注册device server、CSI server等plugin
至此,分析到好几个目录,避免混淆,先列举一下
```
# pwd
/var/lib/kubelet
# tree
.
├── config.yaml // kubelet的配置文件
├── cpu_manager_state // cpu manager的checkpoint文件
├── device-plugins // device manager的工作目录
│ ├── DEPRECATION
│ ├── kubelet_internal_checkpoint // device manager的checkpoint文件,
// 具体存储了每个Pod分配的Devices信息PodDeviceEntries, 以及已经注册的Resource Name及对应的Devices IDs。
│ └── kubelet.sock // device manager和device server通信的UDS文件
├── kubeadm-flags.env // kubeadm配置kubelet服务的配置参数
├── plugins // 存放kubelet plugin manager和plugin server交互的UDS文件的目录
│ ├── kubernetes.io
│ │ └── csi
│ │ └── pv
│ └── xxx.csi.xxx.io
│ └── csi.sock // csi server的UDS文件
├── plugins_registry
│ └── xxx.csi.xxx.io-reg.sock // 用户注册csi server的UDS文件
├── pod-resources
│ └── kubelet.sock // PodResources service的工作目录
```
看到pod-resources,可以多扯一下
```
func (kl *Kubelet) ListenAndServePodResources() {
socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
klog.V(2).Infof("Failed to get local endpoint for PodResources endpoint: %v", err)
return
}
server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager)
}
```
kubelet启动时,会创建PodResources rpc服务,该服务用于发现pod正在使用的资源或设备,并提供资源或设备的元数据给第三方监控插件,常用于监控资源或设备。PodResources服务通过unix套接字/var/lib/kubelet/pod-resources/kubelet.sock提供。
```
// PodResources is a service provided by the kubelet that provides information about the
// node resources consumed by pods and containers on the node
service PodResources {
rpc List(ListPodResourcesRequest) returns (ListPodResourcesResponse) {}
}
```
当前 PodResources服务只有一个List()方法
```
// List returns information about the resources assigned to pods on the node
func (p *podResourcesServer) List(ctx context.Context, req *v1alpha1.ListPodResourcesRequest)
(*v1alpha1.ListPodResourcesResponse, error) {
…
for j, container := range pod.Spec.Containers {
pRes.Containers[j] = &v1alpha1.ContainerResources{
Name: container.Name,
Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
}
}
...
}
```
讨论参考:[https://github.com/kubernetes/enhancements/issues/606](https://github.com/kubernetes/enhancements/issues/606)
# device的分配
kubelet接收到被调度到本节点的pods后,lifecycle.PodAdmitHandler会对pods做一些判断,如
- evictionAdmitHandler:当节点有内存压力时,拒绝创建best effort的pod,还有其它条件先略过
- TopologyPodAdmitHandler:拒绝创建因为Topology locality冲突而无法分配资源的pod
总体是对pod的资源做一些准入判断。
```
func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType,
admissionFailureHandler AdmissionFailureHandler,
pluginResourceUpdateFunc pluginResourceUpdateFuncType) *predicateAdmitHandler {
return &predicateAdmitHandler{
getNodeAnyWayFunc,
pluginResourceUpdateFunc,
admissionFailureHandler,
}
}
```
```
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo,
attrs *lifecycle.PodAdmitAttributes) error {
return cm.deviceManager.Allocate(node, attrs)
}
```
pod的device资源的判断由predicateAdmitHandler调用deviceManager.Allocate来处理,接下来分下ManagerImpl.Allocate()的工作流程:
## ManagerImpl.Allocate()的工作流程
1.allocateContainerResources为Pod中的init container分配devices,并更新deviceManager中PodDevices缓存;
2.allocateContainerResources为Pod中的regular container分配devices,并更新deviceManager中PodDevices缓存;每次在为Pod分配devices之前,都去检查一下此时的active pods,并与podDevices缓存中的pods进行比对,将已经terminated的Pods的devices从podDevices中删除,即进行了devices的GC操作。从healthyDevices中随机分配对应数量的devices给该Pod,并注意更新allocatedDevices,否则会导致一个device被分配给多个Pod。拿到devices后,就通过Grpc调用device server的Allocate方法,device server返回ContainerAllocateResponse(包括注入的环境变量、挂载信息、Annotations),deviceManager根据pod uuid和container name将返回的信息存入podDevices缓存。更新podDevices缓存信息,并将deviceManager中缓存数据更新到checkpoint文件中。
3.sanitizeNodeAllocatable更新cache中Node对应Resource Name的Allocatable Resource,对于下一个pod,predicateAdmitHandler使用最新的数据来做准入判断
# device的使用
在kubelet的GetResource中,会调用DeviceManager的GetDeviceRunContainerOptions,并将这些options添加到kubecontainer.RunContainerOptions中。RunContainerOptions包括Envs、Mounts、Devices、PortMappings、Annotations等信息。
kubelet调用GetResources()为启动container获取启动参数runtimeapi.ContainerConfig{Args...}
```
devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
if err != nil {
return nil, err
} else if devOpts == nil {
return opts, nil
}
opts.Devices = append(opts.Devices, devOpts.Devices...)
opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
opts.Envs = append(opts.Envs, devOpts.Envs...)
opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
```
GetDeviceRunContainerOptions()根据pod uuid和container name从podDevices缓存(device的分配过程中会设置缓存数据)中取出Envs、Mounts、Devices、PortMappings、Annotations等信息,另外对于一些PreStartRequired为true的device server,deviceManager需要在启动container之前调用device server的PreStartContainer grpc接口,做一些device的初始化工作,超时时间限制为30秒。
```
ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second)
defer cancel()
return e.client.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{
DevicesIDs: devs,
})
```
# device的状态管理
device的状态管理涉及到以下3个部分:
1.node上的device状态管理当kubelet更新node status时会调用GetCapacity更新device plugins对应的Resource信息。
kubelet\_node\_status.go调用deviceManager的GetCapacity()获取device的状态,将device状态添加到node info并通过kube-apiserver存入etcd,GetCapacity()返回device server含有的所有device、已经分配给pod使用的device、pod不能使用的device即no-active的device kubelet\_node\_status.go根据返回的数据更新node info
2.kubelet deviceManager服务的device状态管理其实在device的注册、device分配中都有讲解,即使用checkpoint机制默认是将podDevices以 PodDevicesEntry的格式存入_/var/lib/kubelet/device-plugins/kubelet\_internal\_checkpoint 文件_
```
type PodDevicesEntry struct {
PodUID string
ContainerName string
ResourceName string
DeviceIDs []string
AllocResp []byte //包含启动container时使用的Envs、Mounts、Devices、PortMappings、Annotations等信息
}
```
只要device的状态发生了变化(如注册新device、device被分配、device的健康状态发生变化、device被删除),就要将podDevices存入_kubelet\_internal\_checkpoint 文件。kubelet在启动或重启时,都需要读取kubelet\_internal\_checkpoint 文件里的数据,并以podDevices格式存入podDevices缓存_
_3.device server上报device状态_本小节的内容在device的注册部分已经讲解过,归纳为deviceManager注册完device server后,会跟device server建立长连接,持续获取device server的ListAndWatch结果,持续更新device状态;当获取异常时,deviceManager断开连接,将device设置为不健康的状态;device server默认会重启重新注册,重新上报device的状态。
参考文章:
[https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/](https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/)
[https://www.jianshu.com/p/cad4cd8bc237](https://www.jianshu.com/p/cad4cd8bc237)
[https://www.dazhuanlan.com/2019/12/18/5df9aeb084f93/](https://www.dazhuanlan.com/2019/12/18/5df9aeb084f93/)
[https://www.cnblogs.com/hezhiqiangTS/p/12011388.html](https://www.cnblogs.com/hezhiqiangTS/p/12011388.html)