What happens when a task lands
(RPC) LaunchTask
│
▼
CoarseGrainedExecutorBackend.receive → decode TaskDescription
│
▼
Executor.launchTask → new TaskRunner → threadPool.execute
│
▼
TaskRunner.run
├─ report TaskState.RUNNING
├─ updateDependencies (fetch jars/files)
├─ deserialize the Task with closureSerializer
├─ task.run(...) ◄── calls RDD.iterator / shuffle writer
├─ serialize the result
└─ statusUpdate(FINISHED, result) (inline, or via BlockManager if large)
CoarseGrainedExecutorBackend — the RPC endpoint
This is the object the driver talks to. On RegisteredExecutor it creates the Executor; on LaunchTask it decodes the serialized TaskDescription and calls executor.launchTask; and it forwards every task statusUpdate back to the driver. It is the thin networking shell around the real worker.
CoarseGrainedExecutorBackend.scala L167-L188 — receiving LaunchTask
Executor and TaskRunner — the work loop
The Executor owns a cached thread pool, the BlockManager, and a heartbeat loop. Each task becomes a TaskRunner (a Runnable) submitted to the pool, so an executor runs as many concurrent tasks as it has cores. launchTask registers the runner in runningTasks and submits it.
Inside TaskRunner.run, the task binary is deserialized, the actual computation is invoked, and block locks plus task memory are released in a finally block no matter what.
val value = Utils.tryWithSafeFinally {
task.run(taskAttemptId = taskId, attemptNumber = ...,
metricsSystem = env.metricsSystem, ...)
} {
env.blockManager.releaseAllLocksForTask(taskId) // always runs
}
Returning results and heartbeats
Small results travel inline in the status update. If a result exceeds spark.task.maxDirectResultSize it is stored in the BlockManager and only an IndirectTaskResult handle is sent; if it exceeds spark.driver.maxResultSize it is dropped with an error. Meanwhile a periodic heartbeat reports liveness and per-task metrics to the driver's HeartbeatReceiver; repeated heartbeat failures cause the executor to exit.
BlockManager — the per-node data service
A BlockManager runs on every node and is the universal data layer: it stores cached RDD partitions, broadcast variables, and shuffle blocks, serving them locally or fetching them from peers. get tries the local memory store, then the local disk store, then remote nodes. put writes to memory first and spills to disk when configured. A driver-side BlockManagerMaster tracks where every block lives cluster-wide.
get(blockId) = getLocalValues orElse getRemoteValues
putIterator → doPutIterator → memory first, then disk if allowed
Unified memory: execution and storage share one pool
The UnifiedMemoryManager draws a soft boundary between two kinds of memory. Execution memory is used transiently for shuffles, joins, sorts, and aggregations. Storage memory holds cached blocks. Either side can borrow from the other when it has spare capacity — with one asymmetry: storage that has borrowed execution memory can be evicted when execution needs it back, but execution memory is never evicted by storage.
The source comment states the defaults exactly:
shared region = (heap - 300MB) * spark.memory.fraction // default 0.6
storage region within it * spark.memory.storageFraction // default 0.5
⇒ storage ≈ 0.6 * 0.5 = 0.3 of the heap by default
300MB is RESERVED_SYSTEM_MEMORY_BYTES
UnifiedMemoryManager.scala L34-L57 — the model and fractions
UnifiedMemoryManager.scala L134-L248 — acquireExecution / acquireStorage
MemoryStore and DiskStore — the two tiers
The MemoryStore keeps blocks in a LinkedHashMap (so eviction is LRU) as either deserialized object arrays or serialized byte buffers. It "unrolls" an iterator gradually, requesting more memory as it goes, so a single huge partition cannot blow the heap. When memory is tight, the eviction handler spills blocks to the DiskStore, which writes them to local files via the DiskBlockManager. This memory-then-disk tiering is exactly what storage levels like MEMORY_AND_DISK select.
MemoryStore.scala L82-L88 — class
The memory budget, visualized
┌──────────────────────────── executor JVM heap ───────────────────────────┐
│ reserved 300MB │ spark.memory.fraction = 0.6 of (heap-300MB) │
│ (system) │ ┌───────────────────────────┬─────────────────────────┐ │
│ │ │ storage (cache blocks) │ execution (shuffle, │ │
│ │ │ storageFraction = 0.5 │ sort, join, agg) │ │
│ │ └───────────────────────────┴─────────────────────────┘ │
│ │ ◄── soft, borrowable boundary ──► │
│ user memory (objects you allocate) = remaining ~0.4 of (heap-300MB) │
└───────────────────────────────────────────────────────────────────────────┘
Spilling is normal and healthy: when execution memory is exhausted, sorts and aggregations spill sorted runs to disk rather than failing. Frequent spilling is the signal to add memory or partitions.
Key takeaways
- An executor runs one
TaskRunnerper core on a shared thread pool. - The
BlockManageris the single store for cache, broadcast, and shuffle blocks. - Execution and storage share one budget; storage is evictable, execution is not.
- Caching tiers through memory then disk; spilling to disk prevents out-of-memory failures.