8. Error handling and edge cases
Error handling patterns
| Layer | Pattern | Examples |
|---|---|---|
| Core | Typed error objects | SparkCoreErrors, SparkException |
| SQL | AnalysisException, QueryExecutionErrors | Unresolved column, type mismatch |
| Tasks | Serialize failure reason back to driver | ExceptionFailure, FetchFailed |
| Scheduler | Retry with limits | maxTaskFailures, stage abort |
| Logging | Structured logging (Log4j2) | spark.log.structuredLogging.enabled |
Task failure flow
Executor: task throws
→ TaskRunner catches → statusUpdate(FAILED, reason)
→ TaskSchedulerImpl.statusUpdate
→ if FetchFailed → DAGScheduler.handleTaskCompletion (resubmit stage)
→ else if attempts < max → retry on another executor
→ else → abort stage → fail job
Shuffle fetch failure
When a map output file is missing (executor lost), FetchFailed propagates to DAGScheduler, which invalidates the map stage and resubmits it. This is distinct from generic task failure — evidence in DAGScheduler.scala header comments (lines 106–114).
SQL analysis vs execution errors
- Analysis: Thrown during
analyzer.executeAndCheckbefore any job — fail fast in driver. - Execution: Data issues (divide by zero, corrupt files) surface as task failures or
SparkExceptionat action time. - Lazy analysis: Some errors deferred until action if
isLazyAnalysisis true.
Security-sensitive paths
SecurityManager— RPC auth, servlet filters for UIspark.authenticate, network crypto settings- Delegation tokens for YARN/HDFS (
HadoopDelegationTokenManager) - User-provided code in UDFs — runs with full executor privileges (sandboxing not provided by default)
- Spark Connect — session isolation via
SparkConnectSessionManager
9. Concurrency and lifecycle
Concurrency model
| Component | Model |
|---|---|
DAGScheduler | Single-threaded event loop (DAGSchedulerEventProcessLoop) |
TaskSchedulerImpl | Thread-safe; synchronized task set managers |
Executor | Thread pool — one thread per task slot (spark.executor.cores) |
BlockManager | Fine-grained locks per block; master RPC serialized |
SparkContext | Documented as not thread-safe for all ops; SQL uses withActive session guard |
| Structured Streaming | Micro-batch driver thread + concurrent state store maintenance |
Retries and timeouts
spark.task.maxFailures(default 4) — per-task retriesspark.network.timeout— RPC and shuffle timeoutspark.speculation— duplicate slow tasks on other nodesRpcTimeouton endpoint asks —RpcTimeoutException- Tests:
SparkFunSuitewraps tests in 20-minute timeout (spark.test.timeout)
Cancellation
sc.cancelJob(jobId),sc.cancelAllJobs()- SQL:
spark.sql.execution.interruptOnCancel - Task kill via
TaskScheduler.cancelTasks→ executorKillTaskmessage - Streaming:
query.stop()gracefully commits or rolls back batch
Resource cleanup
SparkContext.stop()— stops DAGScheduler, TaskScheduler, RpcEnv, BlockManagerContextCleaner— weak references to RDDs/shuffles for GC of unused lineageShutdownHookManager— JVM shutdown hook registered in SparkContext- Stage/job data structures cleared on completion (DAGScheduler invariant)
SQLExecutionclears execution ID thread locals after actions
Dynamic allocation
ExecutorAllocationManager (core/.../scheduler/dynalloc/) requests/kills executors based on load when spark.dynamicAllocation.enabled=true. Requires external shuffle service for safe shrink.
Consistency assumptions
- Not transactional across stages — output committers provide file-level exactly-once for writes
- Streaming: At-least-once by default; exactly-once with transactional sinks + checkpoint
- Cache: Best-effort replication (
StorageLevel); not durable - Idempotency: Task retries must be deterministic or use commit protocols
11. Non-obvious insights
Hidden coupling
SparkEnv.getused throughout executors — hard to unit test without full env- SQL
QueryExecutionnested instances must share transaction-awareAnalyzer - Python UDF performance depends on batch size and Arrow enablement — crosses Python/JVM boundary
- Shuffle registration happens at
ShuffleDependencyconstruction, not at run time
Implicit conventions
- Physical operators end with
Execsuffix - Config keys defined as typed
ConfigBuilderentries ininternal/config - Tests named
*Suite.scalaextendingSparkFunSuite - Module names in
dev/sparktestsupport/modules.pydrive CI test selection
Magic constants / globals
SparkContext.activeContext— one context per JVM- Default parallelism from
spark.default.parallelismor executor cores × instances - UI port 4040 increments if busy
PYTHONHASHSEED=0set inbin/spark-submitfor deterministic Python hashing
Generated code
- Catalyst expression codegen produces Java source strings, compiled at runtime via Janino
- Connect protobufs generated at build time
- Antlr SQL parser from grammar files
Performance-sensitive code
WholeStageCodegenExec— hot query pathUnsafeRow/common/unsafe— off-heap columnar bytesSortShuffleWriter— disk I/O patterns for shuffleTungstenAggregation— hash aggregation in SQL- Broadcast join threshold (
spark.sql.autoBroadcastJoinThreshold)
Backward compatibility
- Config entries often have legacy fallbacks (
LEGACY_*configs) - MiMa (binary compatibility) checks on public artifacts via SBT plugin
- Spark Connect protocol versioned separately from core
- R API marked deprecated in README
Architecture diagram: failure domains
┌──────────── Driver failure ────────────┐
│ Lose entire app unless checkpoint/ │
│ streaming recovery from durable log │
└───────────────────────────────────────────┘
┌──────────── Executor failure ─────────────┐
│ Tasks retried; shuffle blocks rebuilt │
│ if not using external shuffle service │
└───────────────────────────────────────────┘
┌──────────── Task failure ─────────────────┐
│ Retry on another executor (bounded) │
└───────────────────────────────────────────┘