kubelet — Data Flow

Node agent: drives the container runtime to match actual pod state to desired state

Startup

main() → RunKubelet() → Kubelet.Run()
The kubelet startup path is layered: the binary entry point → RunKubelet which creates the Kubelet struct → startKubelet which launches all background loops.
  1. main() — entry point
    // cmd/kubelet/kubelet.go L36
    func main() {
        command := app.NewKubeletCommand(ctx)
        code := cli.Run(command)
        os.Exit(code)
    }
    cmd/kubelet/kubelet.go
  2. RunKubelet() — create + start
    Creates the Kubelet instance via createAndInitKubelet() then hands it off to startKubelet().
    // cmd/kubelet/app/server.go L1234-1288 (condensed)
    func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, ...) error {
        k, err := createAndInitKubelet(ctx, kubeServer, ...)  // L1263
        startKubelet(ctx, k, podCfg, ...)                     // L1284
        return nil
    }
    server.go L1234–1288
  3. startKubelet() — launch all services
    // cmd/kubelet/app/server.go L1290-1304
    func startKubelet(ctx context.Context, k kubelet.Bootstrap, podCfg *config.PodConfig, ...) {
        go k.Run(ctx, podCfg.Updates())  // L1292 — main pod sync loop
        go k.ListenAndServe(...)         // L1296 — HTTPS kubelet API
        go k.ListenAndServeReadOnly(...) // L1299 — read-only HTTP
        go k.ListenAndServePodResources(...)  // L1302 — DRA resources
    }
    server.go L1290–1304
  4. Kubelet.Run() — module init + syncLoop
    Initializes all subsystems (imageManager, secretManager, probeManager, …), starts background managers, then enters the main syncLoop.
    // pkg/kubelet/kubelet.go L1858-~1977 (condensed)
    func (kl *Kubelet) Run(ctx context.Context, updates <-chan kubetypes.PodUpdate) {
        kl.initializeModules(ctx)       // L1899 — imageManager, secretManager, …
        kl.allocationManager.Run(ctx)   // L1911 — DRA allocation
        kl.volumeManager.Run(...)       // L1915 — volume attach/mount
    
        // Status update goroutines:
        go kl.syncNodeStatus(ctx)       // periodic node status
        go kl.fastStatusUpdateOnce(ctx)
        go kl.nodeLeaseController.Run(ctx)
    
        kl.statusManager.Start()        // L1958
        kl.pleg.Start(...)              // L1966 — Pod Lifecycle Event Generator
        kl.syncLoop(ctx, updates, kl)   // L1977 — never returns
    }
    kubelet.go L1858

Main Reconciliation Loop

syncLoop() — multiplex three pod sources
syncLoop never returns. It selects over multiple channels and dispatches pod events to the appropriate handler. The three config sources are merged into a single configCh by makePodSourceConfig.
API Server watch
primary source
Static pod files
/etc/kubernetes/manifests
HTTP endpoint
--manifest-url flag
↓ merged into configCh
// pkg/kubelet/kubelet.go L2630-2671
func (kl *Kubelet) syncLoop(ctx context.Context,
    updates <-chan kubetypes.PodUpdate, handler SyncHandler) {

    syncTicker      := time.NewTicker(time.Second)
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    plegCh          := kl.pleg.Watch()  // L2640 — PLEG events

    for {
        if err := kl.runtimeState.runtimeErrors(); err != nil {
            time.Sleep(duration)  // exponential backoff on runtime errors
            continue
        }
        if !kl.syncLoopIteration(ctx, updates, handler,
            syncTicker.C, housekeepingTicker.C, plegCh) {  // L2666
            break
        }
    }
}
kubelet.go L2630
syncLoopIteration() — select on channels, dispatch handlers
Each channel type maps to a specific handler. The config channel carries pod ADD/UPDATE/REMOVE/RECONCILE/DELETE events. The PLEG channel carries container lifecycle events. The sync ticker triggers periodic re-sync of running pods.
// pkg/kubelet/kubelet.go L2705-L~2800 (condensed)
func (kl *Kubelet) syncLoopIteration(ctx context.Context,
    configCh <-chan kubetypes.PodUpdate,
    handler SyncHandler,
    syncCh, housekeepingCh <-chan time.Time,
    plegCh <-chan *pleg.PodLifecycleEvent) bool {

    select {
    case u, open := <-configCh:  // pod config change
        switch u.Op {
        case kubetypes.ADD:
            handler.HandlePodAdditions(ctx, u.Pods)     // L2724
        case kubetypes.UPDATE:
            handler.HandlePodUpdates(ctx, u.Pods)       // L2727
        case kubetypes.REMOVE:
            handler.HandlePodRemoves(ctx, u.Pods)       // L2730
        case kubetypes.RECONCILE:
            handler.HandlePodReconcile(ctx, u.Pods)     // L2733
        case kubetypes.DELETE:
            handler.HandlePodUpdates(ctx, u.Pods)
        }
    case e := <-plegCh:          // container lifecycle event
        if e.Type == pleg.ContainerDied { ... }
        handler.HandlePodSyncs(ctx, []*v1.Pod{pod})
    case <-syncCh:                // periodic re-sync ticker
        podsToSync := kl.getPodsToSync()
        handler.HandlePodSyncs(ctx, podsToSync)
    case <-housekeepingCh:        // garbage collect old containers/volumes
        kl.HandlePodCleanups(ctx)
    }
    return true
}
kubelet.go L2705

Pod Sync

SyncPod() — reconcile a single pod against the runtime
SyncPod is the core reconcile function. It is called by pod workers (one goroutine per pod) and ensures volumes are mounted, secrets pulled, and the container runtime matches the desired pod spec.
// pkg/kubelet/kubelet.go L2029 (signature)
func (kl *Kubelet) SyncPod(
    ctx context.Context,
    updateType kubetypes.SyncPodType,
    pod, mirrorPod *v1.Pod,
    podStatus *kubecontainer.PodStatus,
) (isTerminal bool, postSync func(), err error) {

    // 1. Compute pod status
    // 2. Update status manager (reflects to API server)
    // 3. Kill pod if it should not be running
    // 4. Ensure network namespace / sandbox exists (via CRI RunPodSandbox)
    // 5. Pull images for all containers
    // 6. Mount volumes (call volume manager)
    // 7. Create & start containers (via CRI CreateContainer / StartContainer)
}
kubelet.go L2029

Supporting Subsystems

PLEG — Pod Lifecycle Event Generator
PLEG polls the container runtime (via CRI ListPodSandbox / ListContainers) and diffs the result against its previous snapshot. Detected changes become PodLifecycleEvent values on plegCh.
// pkg/kubelet/pleg/generic.go (abbreviated)
type GenericPLEG struct {
    runtime kubecontainer.Runtime
    eventChannel chan *PodLifecycleEvent
    podRecords   podRecords   // previous state snapshot
}

func (g *GenericPLEG) relist(ctx context.Context) {
    pods, _ := g.runtime.GetPods(ctx, true)  // CRI list call
    for _, pod := range pods {
        for _, container := range pod.Containers {
            // diff old vs new state → emit event
            g.updateEvents(pod.ID, generateEvents(container, ...))
        }
    }
}
// Events: ContainerStarted, ContainerDied, ContainerRemoved, ContainerChanged
pkg/kubelet/pleg/generic.go
Volume Manager — attach, mount, and track volumes
The volume manager runs a reconcile loop that ensures volumes required by scheduled pods are attached (via the API server's node object) and mounted (locally on the node) before SyncPod is allowed to create containers.
// pkg/kubelet/volumemanager/volume_manager.go
// Two goroutines:
//   desiredStateOfWorldPopulator — watches pods, populates desired volumes
//   reconciler — diffs actual vs desired, issues attach/detach/mount/unmount

func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
    go vm.reconciler.Run(stopCh)
}
pkg/kubelet/volumemanager/volume_manager.go
Status Manager — reflect pod status to API server
The status manager owns the authoritative in-memory pod status on the node and periodically PATCHes it back to the API server so controllers and kubectl can observe actual container state.
// pkg/kubelet/status/status_manager.go
// Maintains a map: pod UID → versionedPodStatus
// Flushes changes to API server via:
//   client.CoreV1().Pods(ns).UpdateStatus(ctx, pod, ...)
type Manager interface {
    SetPodStatus(pod *v1.Pod, status v1.PodStatus)
    SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
    Start()
}
pkg/kubelet/status/status_manager.go