kubeGenericRuntimeManager

kubeGenericRuntimeManager为kubelet提供Runtime接口, 管理pods与containers生命周期, 调用remote runtime api完成操作;Kubelet与CRI交互如下图

cri

SyncPod

接上文Pod处理走到调用kubeGenericRuntimeManager.SyncPod

pkg/kubelet/kuberuntime/kuberuntime_manager.go:661

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// 获得sandbox、container changes
	podContainerChanges := m.computePodActions(pod, podStatus)
	......
	// kill pod
	if podContainerChanges.KillPod {
	    .......
	    // 先停止pod中所有conatiners,执行preStop,再停止sandbox
		killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
		result.AddPodSyncResult(killResult)
		if killResult.Error() != nil {
			klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
			return
		}
		// 如果新建sandbox则清楚init containers
		if podContainerChanges.CreateSandbox {
			m.purgeInitContainers(pod, podStatus)
		}
	} else {
		// kill掉列表中所有containers,主要是处于未知状态的containers
		for containerID, containerInfo := range podContainerChanges.ContainersToKill {
			klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
			killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
			result.AddSyncResult(killContainerResult)
			if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
				killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
				klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
				return
			}
		}
	}
	......
	// 创建sandbox
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		var msg string
		var err error

		klog.V(4).Infof("Creating PodSandbox for pod %q", format.Pod(pod))
		createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
		result.AddSyncResult(createSandboxResult)
		// 创建sandbox配置、日志目录,调用runtime runSandbox
		podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
		......
	}
	podIP := ""
	if len(podIPs) != 0 {
		podIP = podIPs[0]
	}
	configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
	result.AddSyncResult(configPodSandboxResult)
	// 为container运行生成sandbox config配置,如dns、hostname、端口映射等
	podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
	if err != nil {
		message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
		klog.Error(message)
		configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
		return
	}
	// 启动container函数
	start := func(typeName string, spec *startSpec) error {
		// 拉取镜像,创建并启动container,执行post start hook
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {......}
		return nil
	}

	// 启动临时container
	if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
		for _, idx := range podContainerChanges.EphemeralContainersToStart {
			start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
		}
	}

	// 启动init container
	if container := podContainerChanges.NextInitContainerToStart; container != nil {
		// Start the next init container.
		if err := start("init container", containerStartSpec(container)); err != nil {
			return
		}
		// Successfully started the container; clear the entry in the failure
		klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
	}

	// 启动队列中的containers
	for _, idx := range podContainerChanges.ContainersToStart {
		start("container", containerStartSpec(&pod.Spec.Containers[idx]))
	}

	return
}

SyncPod

  1. 对比pod、podStatus获得pod、container改变的podContainerChanges
  2. 根据podContainerChanges处理已改变的Pod
  3. 如果有Pod需要kill,则调用m.killPodWithSyncResult,如果有container需要kill调用m.killContainer
  4. 如果有Pod需要创建,先创建sandbox、之后是临时container、init container 最后启动container

computePodActions

pkg/kubelet/kuberuntime/kuberuntime_manager.go:480

func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
    // 判断是否需要新建sandbox,sandbox ready数量!=1、sandbox network namespace change、sandbox 没有ip会重建sandbox
	createPodSandbox, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)
	changes := podActions{
		KillPod:           createPodSandbox,
		CreateSandbox:     createPodSandbox,
		SandboxID:         sandboxID,
		Attempt:           attempt,
		ContainersToStart: []int{},
		ContainersToKill:  make(map[kubecontainer.ContainerID]containerToKillInfo),
	}
	// 需新建sandbox
	if createPodSandbox {
		if !shouldRestartOnFailure(pod) && attempt != 0 && len(podStatus.ContainerStatuses) != 0 {
			// pod存在、containers done不需要创建sandbox
			changes.CreateSandbox = false
			return changes
		}
		// 获取需要启动的container
		var containersToStart []int
		for idx, c := range pod.Spec.Containers {
			if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure && containerSucceeded(&c, podStatus) {
				continue
			}
			containersToStart = append(containersToStart, idx)
		}
		// 没有container的pod不需要创建sandbox
		if len(containersToStart) == 0 {
			_, _, done := findNextInitContainerToRun(pod, podStatus)
			if done {
				changes.CreateSandbox = false
				return changes
			}
		}
		// 如果有initContainers 加入到podActions
		if len(pod.Spec.InitContainers) != 0 {
			// Pod has init containers, return the first one.
			changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
			return changes
		}
		changes.ContainersToStart = containersToStart
		return changes
	}
	// 找到init containers
	initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)
	// 处理未完成的init containers
	if !done {
		if next != nil {
			initFailed := initLastStatus != nil && isInitContainerFailed(initLastStatus)
			// 如果fail并且restartonfailure则需要置位killPod
			if initFailed && !shouldRestartOnFailure(pod) {
				changes.KillPod = true
			} else {
				// 将未知状态container加入start队列
				if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown {
					changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{
						name:      next.Name,
						container: next,
						message: fmt.Sprintf("Init container is in %q state, try killing it before restart",
							initLastStatus.State),
					}
				}
				changes.NextInitContainerToStart = next
			}
		}
		return changes
	}
	keepCount := 0
	// 遍历检查container的状态
	for idx, container := range pod.Spec.Containers {
		containerStatus := podStatus.FindContainerStatusByName(container.Name)
		// 调用postStop hook,加快资源分配
		if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning {
			if err := m.internalLifecycle.PostStopContainer(containerStatus.ID.ID); err != nil {
				klog.Errorf("internal container post-stop lifecycle hook failed for container %v in pod %v with error %v",
					container.Name, pod.Name, err)
			}
		}
		// 如果container不存在,或则不再running状态,根据状态、restartPolicy决定是否重启
		if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
			if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
				message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
				klog.V(3).Infof(message)
				changes.ContainersToStart = append(changes.ContainersToStart, idx)
				// 未知状态container需要kill后重启
				if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown {
					changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
						name:      containerStatus.Name,
						container: &pod.Spec.Containers[idx],
						message: fmt.Sprintf("Container is in %q state, try killing it before restart",
							containerStatus.State),
					}
				}
			}
			continue
		}
		// 容器当前状态为running,但满足以下条件需要重启
		var message string
		// policy是否为RestartNever
		restart := shouldRestartOnFailure(pod)
		// 通过对比hash,container spec变了则需要重启
		if _, _, changed := containerChanged(&container, containerStatus); changed {
			message = fmt.Sprintf("Container %s definition changed", container.Name)
			// Restart regardless of the restart policy because the container
			// spec changed.
			restart = true
		// liveness检查,失败需要kill,不需重启
		} else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {
			// If the container failed the liveness probe, we should kill it.
			message = fmt.Sprintf("Container %s failed liveness probe", container.Name)
		// startup探针失败,需要kill,不需重启
		} else if startup, found := m.startupManager.Get(containerStatus.ID); found && startup == proberesults.Failure {
			// If the container failed the startup probe, we should kill it.
			message = fmt.Sprintf("Container %s failed startup probe", container.Name)
		} else {
			// 其他不需要重启
			keepCount++
			continue
		}
		// 处理需重启或kill的container
		// 需要重启的加入重启队列,在被调用函数中总是先kill再start
		if restart {
			message = fmt.Sprintf("%s, will be restarted", message)
			changes.ContainersToStart = append(changes.ContainersToStart, idx)
		}
		// 添加需要kill的container
		changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
			name:      containerStatus.Name,
			container: &pod.Spec.Containers[idx],
			message:   message,
		}
		klog.V(2).Infof("Container %q (%q) of pod %s: %s", container.Name, containerStatus.ID, format.Pod(pod), message)
	}
	if keepCount == 0 && len(changes.ContainersToStart) == 0 {
		changes.KillPod = true
	}
	return changes
}

computePodActions 依此检查sandbox、init container、container的状态,返回变更列表;在SyncPod中是先处理kill Pod再执行创建,所以在重启列表中会的也会加入kill队列

创建Pod(createPodSandbox与startContainer)

Kubelet                  KubeletGenericRuntimeManager       RemoteRuntime
   +                              +                               +
   |                              |                               |
   +---------SyncPod------------->+                               |
   |                              |                               |
   |                              +---- Create PodSandbox ------->+
   |                              +<------------------------------+
   |                              |                               |
   |                              XXXXXXXXXXXX                    |
   |                              |          X                    |
   |                              |    NetworkPlugin.             |
   |                              |       SetupPod                |
   |                              |          X                    |
   |                              XXXXXXXXXXXX                    |
   |                              |                               |
   |                              +<------------------------------+
   |                              +----    Pull image1   -------->+
   |                              +<------------------------------+
   |                              +---- Create container1 ------->+
   |                              +<------------------------------+
   |                              +---- Start container1 -------->+
   |                              +<------------------------------+
   |                              |                               |
   |                              +<------------------------------+
   |                              +----    Pull image2   -------->+
   |                              +<------------------------------+
   |                              +---- Create container2 ------->+
   |                              +<------------------------------+
   |                              +---- Start container2 -------->+
   |                              +<------------------------------+
   |                              |                               |
   | <-------Success--------------+                               |
   |                              |                               |
   +                              +                               +
  • createPodSandbox

sandbox(沙箱)是一种程序隔离运行机制,为了安全在限定权限下的运行环境;在k8s中可以理解为pause容器,pause容器是pod共享cgroup、namespace的基础

pkg/kubelet/kuberuntime/kuberuntime_sandbox.go:38

func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
	// 生成pod sandbox配置,包括DNS、hostname、pod log dir、端口映射
	podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
	if err != nil {
		message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
		klog.Error(message)
		return "", message, err
	}
	// 根据sandbox配置创建日志目录
	err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
	if err != nil {
		message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
		klog.Errorf(message)
		return "", message, err
	}
	runtimeHandler := ""
	if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
		runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
		if err != nil {
			message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
			return "", message, err
		}
		if runtimeHandler != "" {
			klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
		}
	}
	// 通过runtimeClient启动sandbox,network在这里会设置好
	podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
	if err != nil {
		message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
		klog.Error(message)
		return "", message, err
	}

	return podSandBoxID, "", nil
}

createPodSandbox 函数

1 生成pod sandbox配置,包括DNS、hostname、log dir、端口映射,以及generatePodSandboxLinuxConfig中的cgroup、sysctls、namespace、linux权限;

2 根据配置创建日志目录/var/log/pods/…

3 调用runtimeService借口启动sandbox

  • startContainer

pkg/kubelet/kuberuntime/kuberuntime_container.go:134

func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
	container := spec.container
	// 拉取镜像
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
		return msg, err
	}
	// 更新restartCount
	restartCount := 0
	containerStatus := podStatus.FindContainerStatusByName(container.Name)
	if containerStatus != nil {
		restartCount = containerStatus.RestartCount + 1
	}
	target, err := spec.getTargetID(podStatus)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
		return s.Message(), ErrCreateContainerConfig
	}
	// 生成容器配置
	containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
	if cleanupAction != nil {
		defer cleanupAction()
	}
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
		return s.Message(), ErrCreateContainerConfig
	}
	// 调用接口创建容器
	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
		return s.Message(), ErrCreateContainer
	}
	// 容器启动前初始化,主要是cpuManager亲和性加入等
	err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())
		return s.Message(), ErrPreStartHook
	}
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))
	// 启动容器
	err = m.runtimeService.StartContainer(containerID)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
		return s.Message(), kubecontainer.ErrRunContainer
	}
	m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))
	// 执行post hook
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
		kubeContainerID := kubecontainer.ContainerID{
			Type: m.runtimeName,
			ID:   containerID,
		}
		msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
		if handlerErr != nil {
			m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
			// 执行失败需要kill container
			if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
				klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
					container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
			}
			return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
		}
	}

	return "", nil
}

startContainer 负责拉取镜像、根据生成的容器配置创建容器,调用启动前hook,然后启动容器,执行postStart hook

删除Pod(killPodWithSyncResult与killContainer

Kubelet                  KubeletGenericRuntimeManager      RemoteRuntime
   +                              +                               +
   |                              |                               |
   +---------SyncPod------------->+                               |
   |                              |                               |
   |                              +----   Stop container1   ----->+
   |                              +<------------------------------+
   |                              +----  Delete container1  ----->+
   |                              +<------------------------------+
   |                              |                               |
   |                              +----   Stop container2   ------>+
   |                              +<------------------------------+
   |                              +----  Delete container2  ------>+
   |                              +<------------------------------+
   |                              |                               |
   |                              XXXXXXXXXXXX                    |
   |                              |          X                    |
   |                              |    NetworkPlugin.             |
   |                              |       TeardownPod             |
   |                              |          X                    |
   |                              XXXXXXXXXXXX                    |
   |                              |                               |
   |                              |                               |
   |                              +---- Delete PodSandbox  ------>+
   |                              +<------------------------------+
   |                              |                               |
   | <-------Success--------------+                               |
   |                              |                               |
   +                              +                               +

分两种情况,一种是sandbox改变走killPodWithSyncResult,一种是kill container走killContainer而sandbox不变

  • killPodWithSyncResult

pkg/kubelet/kuberuntime/kuberuntime_manager.go:896

func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
	// 遍历pod container调用killContainer并等待容器停止
	killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
	for _, containerResult := range killContainerResults {
		result.AddSyncResult(containerResult)
	}
	killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)
	result.AddSyncResult(killSandboxResult)
	// 停止所有sandbox
	for _, podSandbox := range runningPod.Sandboxes {
		// pod network在这里被清理
		if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
			killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
			klog.Errorf("Failed to stop sandbox %q", podSandbox.ID)
		}
	}

	return
}

killPodWithSyncResult 会先遍历pod container 等待完成kill containers,再调用runtimeService停止sandbox

  • killContainer

pkg/kubelet/kuberuntime/kuberuntime_container.go:587

func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, gracePeriodOverride *int64) error {
	// 获取containerSpec
	var containerSpec *v1.Container
	if pod != nil {
		if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {
			return fmt.Errorf("failed to get containerSpec %q(id=%q) in pod %q when killing container for reason %q",
				containerName, containerID.String(), format.Pod(pod), message)
		}
	} else {
		// Restore necessary information if one of the specs is nil.
		restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID)
		if err != nil {
			return err
		}
		pod, containerSpec = restoredPod, restoredContainer
	}
	// 优雅停止
	gracePeriod := int64(minimumGracePeriodInSeconds)
	switch {
	case pod.DeletionGracePeriodSeconds != nil:
		gracePeriod = *pod.DeletionGracePeriodSeconds
	case pod.Spec.TerminationGracePeriodSeconds != nil:
		gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
	}
	......
	// 执行preStop
	if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
		return err
	}
	if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
		gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
	}
	if gracePeriod < minimumGracePeriodInSeconds {
		gracePeriod = minimumGracePeriodInSeconds
	}
	if gracePeriodOverride != nil {
		gracePeriod = *gracePeriodOverride
		klog.V(3).Infof("Killing container %q, but using a %d second grace period override", containerID, gracePeriod)
	}
	klog.V(2).Infof("Killing container %q with a %d second grace period", containerID.String(), gracePeriod)
	// 调用接口停止容器
	err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)
	if err != nil {
		klog.Errorf("Container %q termination failed with gracePeriod %d: %v", containerID.String(), gracePeriod, err)
	} else {
		klog.V(3).Infof("Container %q exited normally", containerID.String())
	}
	return err
}

killContainer 先执行preStop hook再通过runtimeService停止容器

以上是Kubelet创建删除POD的主流程,之后是通过CRI RuntimeService完成容器的创建与删除,具体说明可以参考Introducing Container Runtime Interface (CRI) in KubernetesSPEC;其中比较特殊的是dockershim其相当于实现了一个cantainer runtime server,有机会可以深入看下dockershim的实现。