Apache Spark — Execution Flows

4. Key execution flows

Flow A: spark-submit → user application

StepComponentDetail
TriggerShellbin/spark-submit --class com.example.App app.jar
1spark-classRuns org.apache.spark.launcher.Main with minimal classpath (launcher/target/.../classes + jars)
2SparkSubmitCommandBuilderBuilds full java command: classpath, memory, main class SparkSubmit
3SparkSubmit.doSubmitParses args via SparkSubmitArguments, builds SparkConf
4prepareSubmitEnvironmentSets master URL routing, deploy mode (client/cluster), child main class for cluster mode
5runMainLoads user JAR; reflects main method (client) or submits to YARN/K8s/Standalone (cluster)

Validation: Master URL format, deploy mode compatibility, primary resource existence — in SparkSubmitArguments and SparkSubmitUtils.

Failure modes: Missing JAR, invalid master, classpath conflicts, cluster manager rejection.

Flow B: RDD action (e.g. rdd.count())

Trigger: rdd.count() │ ├─ RDD.count() [RDD.scala ~1320] │ └─ sc.runJob(rdd, getIteratorSize) │ ├─ SparkContext.runJob [SparkContext.scala ~2481] │ └─ dagScheduler.runJob │ ├─ DAGScheduler.submitJob [DAGScheduler.scala ~984] │ └─ eventProcessLoop.post(JobSubmitted) │ ├─ handleJobSubmitted [~1400] │ └─ createResultStage from final RDD │ └─ submitStage (recursive for missing parents) │ ├─ submitMissingTasks [~1635] │ └─ new TaskSet(ShuffleMapTask | ResultTask) │ ├─ TaskSchedulerImpl.submitTasks [TaskSchedulerImpl.scala ~243] │ └─ schedulableBuilder.addTaskSetManager │ └─ backend.reviveOffers() │ ├─ CoarseGrainedSchedulerBackend.launchTasks │ └─ executorEndpoint.send(LaunchTask) │ └─ Executor.TaskRunner.run [Executor.scala ~806] ├─ deserialize Task ├─ task.run(taskAttemptId, attemptNumber, ...) │ └─ rdd.compute(partition, context) └─ statusUpdate(FINISHED) → DAGScheduler.handleTaskCompletion

Data structures: ActiveJob, Stage, TaskSet, TaskDescription (serialized to executor).

Side effects: Shuffle files written (map stages), blocks cached if persisted, metrics posted to LiveListenerBus.

Output: Aggregated count returned to driver via JobWaiter.

Failures: Task retry (spark.task.maxFailures); stage retry on fetch failure; job fails if stage exceeds spark.stage.maxConsecutiveAttempts.

Flow C: SQL query (spark.sql("SELECT ...").collect())

Trigger: spark.sql(...).collect() │ ├─ SparkSession.sql [classic/SparkSession.scala ~528] │ └─ sqlParser.parsePlanWithParameters → LogicalPlan │ └─ Dataset.ofRows(session, plan) │ ├─ SessionState.executePlan │ └─ new QueryExecution(session, logical) │ ├─ QueryExecution phases (lazy): │ analyzed ← analyzer.executeAndCheck │ optimizedPlan ← SparkOptimizer.executeAndTrack │ sparkPlan ← SparkPlanner.plan (strategies) │ executedPlan ← prepareForExecution (AQE, exchanges, codegen) │ ├─ Dataset.collect [classic/Dataset.scala] │ └─ SQLExecution.withNewExecutionId(qe) │ └─ executedPlan.executeCollect() │ ├─ SparkPlan.execute → RDD[InternalRow] │ └─ (may include WholeStageCodegenExec, FileSourceScanExec, ...) │ └─ Same DAGScheduler path as Flow B

Validation: Catalyst CheckAnalysis during analyzer.executeAndCheck; type checking, unresolved attributes rejected.

Business logic: Optimizer rules (predicate pushdown, join reorder) in Catalyst; physical strategy selection in SparkStrategies.

Flow D: PySpark action

Trigger: df.count() in Python │ ├─ pyspark/sql/dataframe.py calls JVM via Py4J │ ├─ classic Dataset.count() on JVM (same as Flow C) │ └─ Python UDF partitions (if any): Executor launches python/worker.py Socket communication + serialized rows

Gateway startup: python/pyspark/java_gateway.py:launch_gateway spawns bin/spark-submit pyspark-shell-main, connects Py4J to JVM.

Failure modes: Py4J connection loss, Python worker OOM, serializer mismatch.

Flow E: Structured Streaming micro-batch

Trigger: StreamingQueryManager.start() + ProcessingTime trigger │ ├─ MicroBatchExecution [MicroBatchExecution.scala] │ └─ StreamExecution.runBatch loop │ ├─ Read offsets from checkpoint (offsets/ log) │ └─ IncrementalExecution extends QueryExecution │ ├─ Plan batch: logical + offset metadata │ └─ execute write plan (WriteToDataSourceV2Exec) │ ├─ Commit offsets + state store checkpoint │ └─ Standard Spark job scheduling per batch

State: StateStore on executors, checkpoint dir on durable FS (spark.sql.streaming.checkpointLocation).

Failures: Batch retry from checkpoint; state schema evolution errors; sink commit failures.

Flow F: Spark Connect client query

Client: connect.SparkSession.sql(...).collect() │ ├─ Builds proto.Plan locally (connect/Dataset.scala) │ ├─ SparkConnectClient.execute → gRPC ExecutePlanRequest │ Server: ├─ SparkConnectService.executePlan ├─ SparkConnectExecutePlanHandler → ExecuteHolder ├─ SparkConnectPlanner.transformRelation → LogicalPlan ├─ Dataset.ofRows → QueryExecution (classic path) └─ Results streamed as Arrow batches in ExecutePlanResponse

10. Important code walkthroughs

Walkthrough 1: RDD five properties

File: core/src/main/scala/org/apache/spark/rdd/RDD.scala

The class docstring (lines 69–76) defines the contract every scheduler relies on:

// Partitions, compute(split, context), dependencies,
// optional Partitioner, optional preferred locations

map creates MapPartitionsRDD with OneToOneDependency — narrow, same stage. reduceByKey introduces ShuffleDependency which registers shuffle with ShuffleManager at construction time (Dependency.scala).

If changed incorrectly: Wrong partitions → incorrect results; missing shuffle registration → fetch failures; broken preferred locations → slow scans.

Walkthrough 2: DAGScheduler stage submission

File: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

The header comment (lines 58–123) is essential reading — defines Jobs, Stages, Tasks, cache tracking, and failure recovery invariants.

submitStage walks backward until all parent shuffle map stages are complete, then calls submitMissingTasks. Shuffle map stages produce output tracked by MapOutputTrackerMaster.

If changed incorrectly: Memory leaks in long sessions (failure to clear jobIdToStageIds); duplicate task submission; incorrect stage retry logic.

Walkthrough 3: QueryExecution lazy phases

File: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Each phase is a separate lazy val instrumented by QueryPlanningTracker. Transaction-aware analysis (lines 103–120) shows nested queries must inherit analyzer for connector transactions — subtle coupling easy to miss.

executedPlan.execute() produces RDD[InternalRow], bridging SQL to core.

Walkthrough 4: Executor TaskRunner

File: core/src/main/scala/org/apache/spark/executor/Executor.scala (~806+)

Per-task: classloader isolation for REPL/artifacts, deserialize task, update epoch for shuffle map output cache, run task.run, report metrics. Kill checks before and during execution via TaskKilledException.

Next: Core modules → · Runtime & errors →