kubelet源码解析之PLEG
kubelet源码分析之PLEG
PLEG介绍
Pleg(Pod Lifecycle Event Generator) 是kubelet 的核心模块,PLEG周期性调用container runtime获取本节点containers/sandboxes的信息(像docker ps),并与自身维护的podRecords信息进行对比,生成对应的 PodLifecycleEvent并发送到plegCh中,在kubelet syncLoop中对plegCh进行消费处理,最终达到用户的期望状态
在之前的kubelet版本中由每个pod的podWorker的goroutine来查询runtime进行对比,当pod数量逐渐增大时,大量周期性的查询导致高cpu使用率、低性能、难以扩展等问题,为了改进以上问题,引入了PLEG
创建
在/pkg/kubelet/kubelet.go:660 在函数KubeletNewMainKubelet中伴随Kubelet结构体创建
// 传入runtime、chan缓冲大小、定时周期、podCache创建pleg
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
pkg/kubelet/pleg/generic.go:109 工厂函数NewGenericPLEG创建返回接口类型PodLifecycleEventGenerator
// NewGenericPLEG instantiates a new GenericPLEG object and return it.
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
return &GenericPLEG{
// 定时relist周期
relistPeriod: relistPeriod,
// KubeGenericRuntimeManager
runtime: runtime,
// 与kubelet通信带缓冲chan
eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
podRecords: make(podRecords),
// podCache
cache: cache,
clock: clock,
}
}
// podRecords定义
type podRecord struct {
// 历史pod
old *kubecontainer.Pod
// 当前pod
current *kubecontainer.Pod
}
type podRecords map[types.UID]*podRecor
启动
在/pkg/kubelet/kubelet.go:1364 Run中启动
kl.pleg.Start()
具体逻辑
- 消费端
在Kubelet的循环syncLoop中 plegCh := kl.pleg.Watch()获得pleg的chan由Kubelet进行消费处理
- 生产端
pkg/kubelet/pleg/generic.go:130 定期执行g.relist,默认1s
func (g *GenericPLEG) Start() {
// 每隔g.relistPeriod默认1s执行g.relist
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
pkg/kubelet/pleg/generic.go:190 relist
- 请求runtime获得pod,根据sandbox/container构建pod
- 更新metrics、podRecords的current
- 对比podRecords中的old、current生成event
更新cache,发送event
// relist queries the container runtime for list of pods/containers, compare // with the internal pods/containers, and generates events accordingly. func (g *GenericPLEG) relist() { klog.V(5).Infof("GenericPLEG: Relisting") if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() { metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime)) } timestamp := g.clock.Now() defer func() { metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp)) }() // 获得所有pods,根据runtime获得sandbox/containers并以此构建pod podList, err := g.runtime.GetPods(true) if err != nil { klog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err) return } g.updateRelistTime(timestamp) pods := kubecontainer.Pods(podList) // 更新运行的container、sandbox数量 updateRunningPodAndContainerMetrics(pods) // 更新podRecords中current g.podRecords.setCurrent(pods) // 对比并生成event eventsByPodID := map[types.UID][]*PodLifecycleEvent{} for pid := range g.podRecords { oldPod := g.podRecords.getOld(pid) pod := g.podRecords.getCurrent(pid) // 获取oldPod、pod中所有container、sandbox allContainers := getContainersFromPods(oldPod, pod) for _, container := range allContainers { // 获取container在oldPod、pod中的state对比生成event events := computeEvents(oldPod, pod, &container.ID) for _, e := range events { // 添加到eventsByPodID updateEvents(eventsByPodID, e) } } } var needsReinspection map[types.UID]*kubecontainer.Pod if g.cacheEnabled() { needsReinspection = make(map[types.UID]*kubecontainer.Pod) } // If there are events associated with a pod, we should update the // podCache. for pid, events := range eventsByPodID { pod := g.podRecords.getCurrent(pid) if g.cacheEnabled() { // 更新cache,如果失败下次重试 if err := g.updateCache(pod, pid); err != nil { klog.V(4).Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) needsReinspection[pid] = pod continue } else { delete(g.podsToReinspect, pid) } } // 更新podRecords,old更新为current g.podRecords.update(pid) for i := range events { // Filter out events that are not reliable and no other components use yet. if events[i].Type == ContainerChanged { continue } select { // 发送event case g.eventChannel <- events[i]: default: metrics.PLEGDiscardEvents.Inc() klog.Error("event channel is full, discard this relist() cycle event") } } } if g.cacheEnabled() { // 重试更新cache if len(g.podsToReinspect) > 0 { klog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection") for pid, pod := range g.podsToReinspect { if err := g.updateCache(pod, pid); err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. klog.V(5).Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err) needsReinspection[pid] = pod } } } // Update the cache timestamp. This needs to happen *after* // all pods have been properly updated in the cache. g.cache.UpdateTime(timestamp) } // 更新待重试 g.podsToReinspect = needsReinspection }
pkg/kubelet/pleg/generic.go:333 computeEvents对比生成event,参数为oldPod、current pod及pod内的container
func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
var pid types.UID
if oldPod != nil {
pid = oldPod.ID
} else if newPod != nil {
pid = newPod.ID
}
// 根据cid获得container将state转为plegState
oldState := getContainerState(oldPod, cid)
newState := getContainerState(newPod, cid)
// 对比生成event
return generateEvents(pid, cid.ID, oldState, newState)
}
pkg/kubelet/pleg/generic.go:150 generateEvents 对比生成event
func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
// 相同返回nil
if newState == oldState {
return nil
}
// 不相同时,根据newState判断
klog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
switch newState {
case plegContainerRunning:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
case plegContainerExited:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
case plegContainerUnknown:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
case plegContainerNonExistent:
switch oldState {
case plegContainerExited:
// We already reported that the container died before.
return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
default:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
}
default:
panic(fmt.Sprintf("unrecognized container state: %v", newState))
}
}