kubernetes pv-controller 解析

2021 年 12 月 24 日 阿里技术


基于 kubernetes 1.23

一  简介


pv controller是 kcm 的组件之一,它负责处理集群中的pvc/pv对象,对pvc/pv 对象进行状态转换。

二  pvController 初始化


初始化代码在 pkg/controller/volume/persistentvolume/pv_controller_base.go 文件中,NewController 主要做了如下几件事情

  • 初始化 eventRecorder

  • 初始化 PersistentVolumeController 对象,

  • 调用 VolumePluginMgr.InitPlugins()  方法 初始化存储插件,代码存在于 pkg/volume/plugins.go 文件中

  • 开始创建 informer 监听集群内的资源,初始化了如下 informer

  • PersistentVolumeInformer 
  • PersistentVolumeClaimInformer 
  • StorageClassInformer
  • PodInformer
  • NodeInformer


  • 将 PV & PVC 的 event 分别放入 volumeQueue & claimQueue

  • 为了不每次都迭代 pods ,自定义一个通过 pvc 键索引 pod 的索引器

  • 初始化 intree 存储 -> csi 迁移相关功能的 manager 

NewController代码在cmd/kube-controller-manager代码里面被调用,初始化成功之后紧接着调用go Run()方法运行 pvController


三  开始运行


   
   
     
// 开始运行 pvController func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()  defer ctrl.claimQueue.ShutDown()  defer ctrl.volumeQueue.ShutDown()
klog.Infof("Starting persistent volume controller") defer klog.Infof("Shutting down persistent volume controller")
if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) { return }
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh) go wait.Until(ctrl.volumeWorker, time.Second, stopCh) go wait.Until(ctrl.claimWorker, time.Second, stopCh)
metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr)
<-stopCh}

同步缓存之后开始周期性执行 ctrl.resync,ctrl.volumeWorker , ctrl.claimWorker , 我们看下 initalizeCaches 方法

   
   
     
func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {  // 这里不访问 apiserver,是从本地缓存拿出的对象,这些对象不可以被外部函数修改  volumeList, err := volumeLister.List(labels.Everything())  if err != nil {    klog.Errorf("PersistentVolumeController can't initialize caches: %v", err)    return  }  for _, volume := range volumeList {    // 我们不能改变 volume 对象,所以这里我们copy一份新对象,对新对象进行操作    volumeClone := volume.DeepCopy()    if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {      klog.Errorf("error updating volume cache: %v", err)    }  }
claimList, err := claimLister.List(labels.Everything()) if err != nil { klog.Errorf("PersistentVolumeController can't initialize caches: %v", err) return } for _, claim := range claimList { if _, err = ctrl.storeClaimUpdate(claim.DeepCopy()); err != nil { klog.Errorf("error updating claim cache: %v", err) } } klog.V(4).Infof("controller initialized")}
type persistentVolumeOrderedIndex struct { store cache.Indexer}

该方法将 cache.listener 里面的缓存转存在 persistentVolumeOrderedIndex 中,它是按 AccessModes 索引并按存储容量排序的 persistentVolume 的缓存。

1  resync


   
   
     
func (ctrl *PersistentVolumeController) resync() {  klog.V(4).Infof("resyncing PV controller")
pvcs, err := ctrl.claimLister.List(labels.NewSelector()) if err != nil { klog.Warningf("cannot list claims: %s", err) return } for _, pvc := range pvcs { ctrl.enqueueWork(ctrl.claimQueue, pvc) }
pvs, err := ctrl.volumeLister.List(labels.NewSelector()) if err != nil { klog.Warningf("cannot list persistent volumes: %s", err) return } for _, pv := range pvs { ctrl.enqueueWork(ctrl.volumeQueue, pv) }}

这里将集群内所有的 pvc/pv 统一都放到对应的 claimQueue & volumeQueue 里面重新处理。 这个resyncPeriod 等于一个random time.Duration * config.time(在 kcm 启动时设置)。

2  volumeWorker


一个无限循环, 不断的处理从 volumeQueue 里面获取到的 PersistentVolume

   
   
     
workFunc := func() bool {    keyObj, quit := ctrl.volumeQueue.Get()    if quit {      return true    }    defer ctrl.volumeQueue.Done(keyObj)    key := keyObj.(string)    klog.V(5).Infof("volumeWorker[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err) return false } volume, err := ctrl.volumeLister.Get(name) if err == nil { // The volume still exists in informer cache, the event must have // been add/update/sync ctrl.updateVolume(volume) return false } if !errors.IsNotFound(err) { klog.V(2).Infof("error getting volume %q from informer: %v", key, err) return false }
// The volume is not in informer cache, the event must have been // "delete" volumeObj, found, err := ctrl.volumes.store.GetByKey(key) if err != nil { klog.V(2).Infof("error getting volume %q from cache: %v", key, err) return false } if !found { // The controller has already processed the delete event and // deleted the volume from its cache klog.V(2).Infof("deletion of volume %q was already processed", key) return false } volume, ok := volumeObj.(*v1.PersistentVolume) if !ok { klog.Errorf("expected volume, got %+v", volumeObj) return false } ctrl.deleteVolume(volume) return false }

我们主要关注 ctrl.updateVolume(volume) 方法

updateVolume


updateVolume 方法是对于集群内的 events 实际 handler 方法,它里面主要调用了 ctrl.syncVolume 方法来处理

   
   
     
func (ctrl *PersistentVolumeController) syncVolume(ctx context.Context, volume *v1.PersistentVolume) error {  klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))
... // [Unit test set 4] if volume.Spec.ClaimRef == nil { // Volume is unused klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name) if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil { // Nothing was saved; we will fall back into the same // condition in the next call to this method return err } return nil } else /* pv.Spec.ClaimRef != nil */ { // Volume is bound to a claim. if volume.Spec.ClaimRef.UID == "" { // The PV is reserved for a PVC; that PVC has not yet been // bound to this PV; the PVC sync will handle it. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef)) if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil { // Nothing was saved; we will fall back into the same // condition in the next call to this method return err } return nil } klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef)) // Get the PVC by _name_ var claim *v1.PersistentVolumeClaim claimName := claimrefToClaimKey(volume.Spec.ClaimRef) obj, found, err := ctrl.claims.GetByKey(claimName) if err != nil { return err } if !found { if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed { obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name) if err != nil && !apierrors.IsNotFound(err) { return err } found = !apierrors.IsNotFound(err) if !found { obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err } found = !apierrors.IsNotFound(err) } } } if !found { klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef)) // Fall through with claim = nil } else { var ok bool claim, ok = obj.(*v1.PersistentVolumeClaim) if !ok { return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj) } klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim)) } if claim != nil && claim.UID != volume.Spec.ClaimRef.UID { klog.V(4).Infof("Maybe cached claim: %s is not the newest one, we should fetch it from apiserver", claimrefToClaimKey(volume.Spec.ClaimRef))
claim, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err } else if claim != nil { // Treat the volume as bound to a missing claim. if claim.UID != volume.Spec.ClaimRef.UID { klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a newer UID than pv.ClaimRef, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef)) claim = nil } else { klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has a same UID with pv.ClaimRef", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef)) } } }
if claim == nil { if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed { // Also, log this only once: klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy) if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil { // Nothing was saved; we will fall back into the same condition // in the next call to this method return err } } if err = ctrl.reclaimVolume(volume); err != nil { // Release failed, we will fall back into the same condition // in the next call to this method return err } if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain { // volume is being retained, it references a claim that does not exist now. klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID) } return nil } else if claim.Spec.VolumeName == "" { if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) { volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name) ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg) claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg) // Skipping syncClaim return nil }
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) { // The binding is not completed; let PVC sync handle it klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name) } else { // Dangling PV; try to re-establish the link in the PVC sync klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name) } ctrl.claimQueue.Add(claimToClaimKey(claim)) return nil } else if claim.Spec.VolumeName == volume.Name { // Volume is bound to a claim properly, update status if necessary klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name) if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil { // Nothing was saved; we will fall back into the same // condition in the next call to this method return err } return nil } else { // Volume is bound to a claim, but the claim is bound elsewhere if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete { if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed { // Also, log this only once: klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name) if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil { // Nothing was saved; we will fall back into the same condition // in the next call to this method return err } } if err = ctrl.reclaimVolume(volume); err != nil { return err } return nil } else { if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) { klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name) if err = ctrl.unbindVolume(volume); err != nil { return err } return nil } else { // The PV must have been created with this ptr; leave it alone. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name) if err = ctrl.unbindVolume(volume); err != nil { return err } return nil } } } }}

1、当 pv 的 Spec.ClaimRef 的值为空的时候,说明当前 pv 未被使用,调用 ctrl.updateVolumePhase 使得 pv 进入 Available 状态

2、当 pv 的 Spec.ClaimRef 的值不为空的时候, 说明当前 pv 已绑定一个pvc

  • 当Spec.ClaimRef.UID 为空的时候,说明 pvc 还未绑定 pv, 调用ctrl.updateVolumePhase 使得 pv 进入 Available 状态, 方法返回,等待 pvc syncClaim 方法处理


  • 使用 Spec.ClaimRef 相关的 pvc 信息获取 pv_controller缓存的pvc


  • 如果 pvc 没有找到


  • 有可能是集群压力过大缓存没有更新,则进一步从 informercache 中找,如果 informercache里面还是没有的话则进一步从apiserver中去找

  • 这里如果发现 非 Released & 非 Failed 的pv 经过上述步骤仍然找不到 pvc 的话,说明 pvc 被删除。在最新的kubernetes 版本中会检查reclaimPoilcy,对 pv的状态进行处理 

  • 找到 pvc 之后

1)如果 pvc 的 uid 和 Spec.ClaimRef.UID 不一致,这样一般是 pv 指向的 pvc 被删了,然后立即创建了一个同名的pvc, 而缓存还没有更新,这时我们需要doublecheck一下,若 double check 之后依旧不存在,则判断是pv绑定了一个不存在的pvc, 将pvc置为空,执行上述pvc 没有找到的逻辑

2)如果pvc 的 volumeName 为空

    • 检查 pvc的 volumeMode 和 pv 的 volumeMode是否一致,不一致报 event 出来


    • 如果发现有这个 pv 有 AnnBoundByController = "pv.kubernetes.io/bound-by-controller" 这个annotation 说明 pvc/pv 流程正在绑定中


    • 将 pvc 放到 claimQueue 里面, 让 claimWorker 进行处理


3)如果 pvc.Spec.volumeName == pv.volumeName 的时候,直接将 pv 设置为 bound 状态

4)如果 pvc.Spec.volumeName != pv.volumeName 的时候


  • 如果是 pv 是动态创建的情况下,并且 pv 的 ReclaimPolicy 是 delete 的情况下, 说明 pvc 已经绑定了其他pv, 将 pv 置为 released 的状态, 等待deleters 删除


  • 如果 pv 不是动态创建的情况下,将 pv 的 ClaimRef 字段置为空,将其 unbound 掉


3  claimWorker


一个无限循环,不断的处理从 claimQueue 里面获取到的 PersistentVolumeClaim

   
   
     
    workFunc := func() bool {    keyObj, quit := ctrl.claimQueue.Get()    if quit {      return true    }    defer ctrl.claimQueue.Done(keyObj)    key := keyObj.(string)    klog.V(5).Infof("claimWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err) return false } claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name) if err == nil { // The claim still exists in informer cache, the event must have // been add/update/sync ctrl.updateClaim(claim) return false } if !errors.IsNotFound(err) { klog.V(2).Infof("error getting claim %q from informer: %v", key, err) return false }
// The claim is not in informer cache, the event must have been "delete" claimObj, found, err := ctrl.claims.GetByKey(key) if err != nil { klog.V(2).Infof("error getting claim %q from cache: %v", key, err) return false } if !found { // The controller has already processed the delete event and // deleted the claim from its cache klog.V(2).Infof("deletion of claim %q was already processed", key) return false } claim, ok := claimObj.(*v1.PersistentVolumeClaim) if !ok { klog.Errorf("expected claim, got %+v", claimObj) return false } ctrl.deleteClaim(claim) return false }

我们主要关注 ctrl.updateClaim(claim) 方法, 与上面同样,它里面主要调用了 ctrl.syncClaim 方法来处理, 在 syncClaim 里面根据 pvc 的状态分别调用了 ctrl.syncUnboundClaim & ctrl.syncBoundClaim 方法来处理

syncUnboundClaim


   
   
     
func (ctrl *PersistentVolumeController) syncUnboundClaim(ctx context.Context, claim *v1.PersistentVolumeClaim) error {  if claim.Spec.VolumeName == "" {    // User did not care which PV they get.    delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister)    if err != nil {      return err    }
// [Unit test set 1] volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) if err != nil { klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err) return fmt.Errorf("error finding PV for claim %q: %w", claimToClaimKey(claim), err) } if volume == nil { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim)) switch { case delayBinding && !pvutil.IsDelayBindingProvisioning(claim): if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil { return err } case storagehelpers.GetPersistentVolumeClaimClass(claim) != "": if err = ctrl.provisionClaim(ctx, claim); err != nil { return err } return nil default: ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set") }
// Mark the claim as Pending and try to find a match in the next // periodic syncClaim if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } return nil } else /* pv != nil */ { // Found a PV for this claim // OBSERVATION: pvc is "Pending", pv is "Available" claimKey := claimToClaimKey(claim) klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume)) if err = ctrl.bind(volume, claim); err != nil { metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err) return err } metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil) return nil } } else /* pvc.Spec.VolumeName != nil */ { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName) obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName) if err != nil { return err } if !found { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName) if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } return nil } else { volume, ok := obj.(*v1.PersistentVolume) if !ok { return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj) } klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume)) if volume.Spec.ClaimRef == nil { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim)) if err = checkVolumeSatisfyClaim(volume, claim); err != nil { klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err) // send an event msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg) // volume does not satisfy the requirements of the claim if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } } else if err = ctrl.bind(volume, claim); err != nil { // On any error saving the volume or the claim, subsequent // syncClaim will finish the binding. return err } // OBSERVATION: pvc is "Bound", pv is "Bound" return nil } else if pvutil.IsVolumeBoundToClaim(volume, claim) { // User asked for a PV that is claimed by this PVC // OBSERVATION: pvc is "Pending", pv is "Bound" klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
// Finish the volume binding by adding claim UID. if err = ctrl.bind(volume, claim); err != nil { return err } // OBSERVATION: pvc is "Bound", pv is "Bound" return nil } else { // User asked for a PV that is claimed by someone else // OBSERVATION: pvc is "Pending", pv is "Bound" if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim)) claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg) // User asked for a specific PV, retry later if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } return nil } else { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef)) claimMsg := fmt.Sprintf("volume %q already bound to a different claim.", volume.Name) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.FailedBinding, claimMsg)
return fmt.Errorf("invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef)) } } } }}

梳理下整体流程

  • 如果当前 pvc 的 volumeName 为空


  • 判断当前pvc 是否是延迟绑定的


  • 调用 volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) 找出对应的 pv


  • 如果找到 volume 的话


  • 调用 ctrl.bind(volume, claim) 方法进行绑定

  • 如果没有找到 volume 的话


  • 如果是延迟绑定, 并且还未触发(pod 未引用)则 emit event 到 pvc 上


  • 如果 pvc 绑定了 sc, 调用 ctrl.provisionClaim(ctx, claim) 方法


  • 分析 pvc yaml, 找到 provisioner driver


  • 启动一个 goroutine 


  • 调用 ctrl.provisionClaimOperation(ctx, claim, plugin, storageClass) 进行创建工作


provisionClaimOperation


   
   
     
func (ctrl *PersistentVolumeController) provisionClaimOperation(  ctx context.Context,  claim *v1.PersistentVolumeClaim,  plugin vol.ProvisionableVolumePlugin,  storageClass *storage.StorageClass) (string, error) {  claimClass := storagehelpers.GetPersistentVolumeClaimClass(claim)  klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)
pluginName := plugin.GetPluginName() if pluginName != "kubernetes.io/csi" && claim.Spec.DataSource != nil { strerr := fmt.Sprintf("plugin %q is not a CSI plugin. Only CSI plugin can provision a claim with a datasource", pluginName) return pluginName, fmt.Errorf(strerr)
} provisionerName := storageClass.Provisioner // Add provisioner annotation to be consistent with external provisioner workflow newClaim, err := ctrl.setClaimProvisioner(ctx, claim, provisionerName) if err != nil { // Save failed, the controller will retry in the next sync klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err) return pluginName, err } claim = newClaim

pvName := ctrl.getProvisionedVolumeNameForClaim(claim) volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { klog.V(3).Infof("error reading persistent volume %q: %v", pvName, err) return pluginName, err } if err == nil && volume != nil { // Volume has been already provisioned, nothing to do. klog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim)) return pluginName, err }
// Prepare a claimRef to the claim early (to fail before a volume is // provisioned) claimRef, err := ref.GetReference(scheme.Scheme, claim) if err != nil { klog.V(3).Infof("unexpected error getting claim reference: %v", err) return pluginName, err }
// Gather provisioning options tags := make(map[string]string) tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace tags[CloudVolumeCreatedForClaimNameTag] = claim.Name tags[CloudVolumeCreatedForVolumeNameTag] = pvName
options := vol.VolumeOptions{ PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy, MountOptions: storageClass.MountOptions, CloudTags: &tags, ClusterName: ctrl.clusterName, PVName: pvName, PVC: claim, Parameters: storageClass.Parameters, }
// Refuse to provision if the plugin doesn't support mount options, creation // of PV would be rejected by validation anyway if !plugin.SupportsMountOption() && len(options.MountOptions) > 0 { strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions) klog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr) return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName()) }
// Provision the volume provisioner, err := plugin.NewProvisioner(options) if err != nil { strerr := fmt.Sprintf("Failed to create provisioner: %v", err) klog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr) return pluginName, err }
var selectedNode *v1.Node = nil if nodeName, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok { selectedNode, err = ctrl.NodeLister.Get(nodeName) if err != nil { strerr := fmt.Sprintf("Failed to get target node: %v", err) klog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr) return pluginName, err } } allowedTopologies := storageClass.AllowedTopologies
opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision") volume, err = provisioner.Provision(selectedNode, allowedTopologies) opComplete(volumetypes.CompleteFuncParam{Err: &err}) if err != nil { ctrl.rescheduleProvisioning(claim)
strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err) klog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr) return pluginName, err }
klog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))
// Create Kubernetes PV object for the volume. if volume.Name == "" { volume.Name = pvName } // Bind it to the claim volume.Spec.ClaimRef = claimRef volume.Status.Phase = v1.VolumeBound volume.Spec.StorageClassName = claimClass
// Add AnnBoundByController (used in deleting the volume) metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnBoundByController, "yes") metav1.SetMetaDataAnnotation(&volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned, plugin.GetPluginName())
// Try to create the PV object several times for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { klog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name) var newVol *v1.PersistentVolume if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), volume, metav1.CreateOptions{}); err == nil || apierrors.IsAlreadyExists(err) { // Save succeeded. if err != nil { klog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim)) err = nil } else { klog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))
_, updateErr := ctrl.storeVolumeUpdate(newVol) if updateErr != nil { // We will get an "volume added" event soon, this is not a big error klog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr) } } break } // Save failed, try again after a while. klog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err) time.Sleep(ctrl.createProvisionedPVInterval) }
if err != nil { strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err) klog.V(3).Info(strerr) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
var deleteErr error var deleted bool for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { _, deleted, deleteErr = ctrl.doDeleteVolume(volume) if deleteErr == nil && deleted { // Delete succeeded klog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name) break } if !deleted { klog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName()) break } // Delete failed, try again after a while. klog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr) time.Sleep(ctrl.createProvisionedPVInterval) }
if deleteErr != nil { strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr) klog.V(2).Info(strerr) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr) } } else { klog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim)) msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName()) ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg) } return pluginName, nil}

provisionClaimOperation 的基本逻辑如下

  • 检查driver,只有 csi 类型的 driver 才允许使用 dataSource 字段


  • 为 pvc 加 claim.Annotations["volume.kubernetes.io/storage-provisioner"] = class.Provisioner annotation


  • 根据规则拼出 pv Name = "pvc-" + pvc.UID


  • 如果找到了 pv, 则说明 pv已经存在,跳过 provision


  • 收集pvc/pv 基本信息封装到 options 中


  • 对 plugin 进行校验, 如果plugin不支持mount操作,则直接拒绝provision 请求


  • 调用plugin.NewProvisioner(options) 创建 provisioner, 接口实现了Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) 方法,注意,该方法为同步方法


  • Provision 方法返回了 PersistentVolume实例


  • 为创建出来的 pv 关联 pvc 对象(ClaimRef),尝试创建 pv 对象 (重复多次)


  • 如果创建 pv 失败,则尝试调用 Delete 方法删除创建的volume资源


syncBoundClaim


   
   
     
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {
if claim.Spec.VolumeName == "" { // Claim was bound before but not any more. if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil { return err } return nil } obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName) if err != nil { return err } if !found { // Claim is bound to a non-existing volume. if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil { return err } return nil } else { volume, ok := obj.(*v1.PersistentVolume) if !ok { return fmt.Errorf("cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj) }
klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume)) if volume.Spec.ClaimRef == nil { // Claim is bound but volume has come unbound. // Or, a claim was bound and the controller has not received updated // volume yet. We can't distinguish these cases. // Bind the volume again and set all states to Bound. klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim)) if err = ctrl.bind(volume, claim); err != nil { // Objects not saved, next syncPV or syncClaim will try again return err } return nil } else if volume.Spec.ClaimRef.UID == claim.UID { // All is well // NOTE: syncPV can handle this so it can be left out. // NOTE: bind() call here will do nothing in most cases as // everything should be already set. klog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim)) if err = ctrl.bind(volume, claim); err != nil { // Objects not saved, next syncPV or syncClaim will try again return err } return nil } else { // Claim is bound but volume has a different claimant. // Set the claim phase to 'Lost', which is a terminal // phase. if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil { return err } return nil } }}


1)如果 pvc.Spec.VolumeName 为空, 说明这个 pvc 之前被 bound 过,但是已经不存在指向的pv, 报出event并返回


2)从 cache 里面找 pvc 绑定的 pv

  • 如果没找到, 说明 pvc 绑定了一个不存在的pv,报 event 并返回


  • 如果找到了pv


  • 检查 pv.Spec.ClaimRef 字段, 如果 为空,说明 pv 还没有绑定 pvc, 调用 ctrl.bind(volume, claim); 方法进行绑定


  • pv.ClaimRef.UID == pvc.UID, 调用 bind 方法,但是大多数情况会直接返回(因为所有的操作都已经做完了)


  • 其他情况说明 volume 绑定了其他的 pvc, 更新pvc 的状态 为 lost 并报出 event

四  总结


最后用一张 pvc/pv 的状态流转图来总结一下




《三步赢大礼,跟着冰河学Java》

由乘风者专家博主——冰河倾力打造的《Java8从入门到精通》电子书已在全网受到数十万追捧,热度空前。这本最全Java8新特性知识全解可谓是程序员快速进阶的标配手册。即日起,下载电子书,发文并分享,三步完成即可领取活动奖励,更有机会获得冰河亲笔签名的新书《深入理解分布式事务:原理与实战》。福利多多,赶快拉动你的小伙伴一起参与进来给自己一个年终奖吧!


活动详情如下:步骤一:点击下载电子书;步骤二:在开发者社区发表一篇Java相关的文章并将链接填在问卷中;步骤三:邀请三位好友下载电子书,即可完成打卡领取小米无线鼠标一个(限量100个)活动结束由冰河挑选十篇优质文章,作者获冰河亲笔签名图书一本以及阿里云定制蓝牙音箱一个。(活动时间:12月23日-1月14日,优质文章将于1月17日公布,奖品将在活动结束后7个工作日内发放)

点击阅读原文查看详情页!
登录查看更多
0

相关内容

医学人工智能AIM(Artificial Intelligence in Medicine)杂志发表了多学科领域的原创文章,涉及医学中的人工智能理论和实践,以医学为导向的人类生物学和卫生保健。医学中的人工智能可以被描述为与研究、项目和应用相关的科学学科,旨在通过基于知识或数据密集型的计算机解决方案支持基于决策的医疗任务,最终支持和改善人类护理提供者的性能。 官网地址:http://dblp.uni-trier.de/db/journals/artmed/
【2021新书】ApachePulsar 实战,402页pdf
专知会员服务
70+阅读 · 2021年12月29日
专知会员服务
52+阅读 · 2021年6月30日
专知会员服务
19+阅读 · 2021年6月10日
【经典书】精通Linux,394页pdf
专知会员服务
93+阅读 · 2021年2月19日
专知会员服务
40+阅读 · 2020年9月6日
报告 |事理图谱的构建及应用,附61页pdf
专知会员服务
191+阅读 · 2020年1月17日
【干货】大数据入门指南:Hadoop、Hive、Spark、 Storm等
专知会员服务
96+阅读 · 2019年12月4日
【电子书】Flutter实战305页PDF免费下载
专知会员服务
23+阅读 · 2019年11月7日
PyTorch 之 Checkpoint 机制解析
极市平台
0+阅读 · 2022年1月8日
如何在golang代码里面解析容器镜像
阿里技术
0+阅读 · 2022年1月5日
一文理解 K8s 容器网络虚拟化
阿里技术
0+阅读 · 2021年11月29日
Kubernetes 入门教程
阿里技术
0+阅读 · 2021年11月16日
Spring Cloud Gateway一次请求调用源码解析
阿里技术
0+阅读 · 2021年11月15日
浅谈 Kubernetes 在生产环境中的架构
DevOps时代
11+阅读 · 2019年5月8日
使用tinc构建full mesh结构的VPN
运维帮
68+阅读 · 2018年12月1日
国家自然科学基金
0+阅读 · 2015年12月31日
国家自然科学基金
0+阅读 · 2013年12月31日
国家自然科学基金
0+阅读 · 2012年12月31日
国家自然科学基金
0+阅读 · 2012年12月31日
国家自然科学基金
0+阅读 · 2012年12月31日
国家自然科学基金
1+阅读 · 2012年12月31日
国家自然科学基金
0+阅读 · 2011年12月31日
Arxiv
0+阅读 · 2022年4月18日
Arxiv
0+阅读 · 2022年4月17日
Arxiv
0+阅读 · 2022年4月14日
Arxiv
31+阅读 · 2018年11月13日
VIP会员
相关VIP内容
【2021新书】ApachePulsar 实战,402页pdf
专知会员服务
70+阅读 · 2021年12月29日
专知会员服务
52+阅读 · 2021年6月30日
专知会员服务
19+阅读 · 2021年6月10日
【经典书】精通Linux,394页pdf
专知会员服务
93+阅读 · 2021年2月19日
专知会员服务
40+阅读 · 2020年9月6日
报告 |事理图谱的构建及应用,附61页pdf
专知会员服务
191+阅读 · 2020年1月17日
【干货】大数据入门指南:Hadoop、Hive、Spark、 Storm等
专知会员服务
96+阅读 · 2019年12月4日
【电子书】Flutter实战305页PDF免费下载
专知会员服务
23+阅读 · 2019年11月7日
相关资讯
PyTorch 之 Checkpoint 机制解析
极市平台
0+阅读 · 2022年1月8日
如何在golang代码里面解析容器镜像
阿里技术
0+阅读 · 2022年1月5日
一文理解 K8s 容器网络虚拟化
阿里技术
0+阅读 · 2021年11月29日
Kubernetes 入门教程
阿里技术
0+阅读 · 2021年11月16日
Spring Cloud Gateway一次请求调用源码解析
阿里技术
0+阅读 · 2021年11月15日
浅谈 Kubernetes 在生产环境中的架构
DevOps时代
11+阅读 · 2019年5月8日
使用tinc构建full mesh结构的VPN
运维帮
68+阅读 · 2018年12月1日
相关基金
国家自然科学基金
0+阅读 · 2015年12月31日
国家自然科学基金
0+阅读 · 2013年12月31日
国家自然科学基金
0+阅读 · 2012年12月31日
国家自然科学基金
0+阅读 · 2012年12月31日
国家自然科学基金
0+阅读 · 2012年12月31日
国家自然科学基金
1+阅读 · 2012年12月31日
国家自然科学基金
0+阅读 · 2011年12月31日
Top
微信扫码咨询专知VIP会员