Shuffle: the all-to-all data exchange

A shuffle is how data moves between stages when a child partition depends on many parent partitions — the work behind reduceByKey, groupBy, join, and repartitioning. It is the most expensive operation in Spark, so its design (sort-based, file-backed, location-tracked) is worth understanding in detail.

Map side writes, reduce side fetches

MAP STAGE (M tasks)                      REDUCE STAGE (R tasks)
each map task:                           each reduce task r:
  sort records by target partition         ask MapOutputTracker:
  write ONE data file + ONE index file       "where are blocks for partition r?"
  data: [part0][part1]...[partR-1]         fetch contiguous byte ranges from
  index: byte offsets of each part           every map output (local + remote)
  report MapStatus to driver               merge / aggregate the fetched records

        map output file (per map task)
        ┌──────┬──────┬──────┬──────┐      reduce task r reads the r-th slice
        │ p0   │ p1   │ p2   │ p3   │ ───► from each map file using the index
        └──────┴──────┴──────┴──────┘
ShuffleManager — the pluggable interface

The ShuffleManager trait is the contract between the engine and a shuffle implementation, selected by spark.shuffle.manager and created on both driver and executors. The driver registerShuffles; map tasks call getWriter; reduce tasks call getReader for a range of map indices and partitions; and shuffleBlockResolver turns block ids into physical locations.

ShuffleManager.scala L38-L96 — the trait

SortShuffleManager — why one file per map task

Sort-based shuffle sorts incoming records by their target partition id and writes them to a single map output file, with reducers fetching contiguous regions. This keeps the number of files at O(map tasks) instead of O(map × reduce), which is what made shuffles scale to large clusters. If the data is too large to fit in memory, sorted runs spill to disk and are merged.

There are two write paths. The serialized path operates on serialized bytes (less GC, cache-efficient pointer sorting) and is used when there is no map-side combine, the serializer supports relocation (e.g. Kryo), and there are ≤ 16,777,216 partitions. Everything else uses the deserialized path.

SortShuffleManager.scala L30-L73 — design & write paths

SortShuffleManager.scala L90-L176 — registerShuffle, getReader, getWriter

The writer: ExternalSorter to a partitioned file

SortShuffleWriter.write feeds all records into an ExternalSorter (which sorts and spills as needed), opens a map output writer, writes each partition's bytes in order, and produces a MapStatus describing the block manager that holds the file and the byte length of each partition. That MapStatus is what gets registered with the driver and later read by reducers.

SortShuffleWriter.scala L65-L89 — write

IndexShuffleBlockResolver.scala L46-L63 — data + index layout

The reader: fetch, then optionally re-sort

BlockStoreShuffleReader.read opens a ShuffleBlockFetcherIterator over the (BlockManagerId, blocks) list that the manager obtained from the tracker. It streams remote blocks (with limits on in-flight bytes), deserializes each into key/value pairs, applies a map-side combiner if present, and re-sorts on the reduce side only when a key ordering is required (e.g. sortByKey).

BlockStoreShuffleReader.scala L72-L112 — read

IndexShuffleBlockResolver — reading just your slice

Each map task writes a .data file and a .index file of partition offsets. When a reducer requests partition r, getBlockData reads the start and end offsets from the index and returns a FileSegmentManagedBuffer pointing at exactly that region of the data file — no copy, no scan of other partitions. This is the mechanism that makes the single-file design efficient to read.

IndexShuffleBlockResolver.scala L616-L660 — getBlockData

MapOutputTracker — the location directory

The tracker answers the reduce side's central question: "for shuffle X and partition r, which executors hold the blocks and how big are they?" The MapOutputTrackerMaster on the driver records a MapStatus per map task; the MapOutputTrackerWorker on executors caches that information, fetching it over RPC on a miss. An epoch counter is incremented whenever outputs are lost, so executors know to discard stale cached locations after a failure.

reduce task → MapOutputTrackerWorker.getMapSizesByExecutorId(shuffleId, ...)
            → (cache miss) RPC to MapOutputTrackerMaster
            → Seq[(BlockManagerId, Seq[(blockId, size, mapIndex)])]
            → ShuffleBlockFetcherIterator

MapOutputTracker.scala L549-L623 — base tracker & the reduce-side API

MapOutputTracker.scala L1314-L1324 — getMapSizesByExecutorId

Why shuffles dominate cost

A shuffle materializes intermediate data to disk and moves much of it across the network. It is also a hard synchronization barrier: the reduce stage cannot start until enough map output exists. That is why so much of Spark tuning — partition counts, partitioners, broadcast joins, adaptive query execution, and push-based shuffle — is ultimately about avoiding or shrinking shuffles. The number of reduce partitions is set by the partitioner (for RDDs) or spark.sql.shuffle.partitions (for SQL, default 200).

Key takeaways

  • Map tasks write one sorted data file plus an index; reducers fetch contiguous slices.
  • The single-file design keeps file counts O(map) instead of O(map × reduce).
  • The MapOutputTracker directory plus its epoch make fetches correct under failure.
  • Shuffles are a barrier and the main target of performance tuning.