Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-5750] Incoporated feedback from Sean Owen
  • Loading branch information
Ilya Ganelin committed Mar 19, 2015
commit a8adb57208266836e41a92f44f5c6edf6ee655ed
18 changes: 10 additions & 8 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -983,13 +983,13 @@ for details.
<td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
</tr>
<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="JoinLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
</td>
</tr>
<tr>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="CogroupLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt;, Iterable&lt;W&gt;)) tuples. This operation is also called <code>groupWith</code>. </td>
</tr>
<tr>
Expand Down Expand Up @@ -1088,22 +1088,24 @@ for details.

### Shuffle operations

Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-distributing data so data with the same key becomes co-located after a shuffle.
Certain operations within Spark trigger an operation known as the shuffle. The shuffle is Spark's mechanism for re-distributing data so that is grouped differently across partitions. This typically involves re-arranging and copying data across executors and machines, making shuffle a complex and costly operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would replace uses of "operation" that don't refer to actions or transformations to avoid confusion.


#### Background

To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a 2-tuple - the key and an Iterable object containing all the associated values. If we think of the map and reduce steps for `groupByKey()` then we can see that to generate the list of all values associated with a key, all of the values must reside on the same reducer, since the output of the reduce step is the complete array.
To understand what happens during the shuffle we can consider the example of the [`groupByKey`](#GroupByLink) operation. The `groupByKey` operation generates a new RDD where all values for a single key are combined into a tuple - the key and an `Iterable` object containing all the associated values. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to present a single array per key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of a nit, but because we try to de-emphasize the groupByKey operation, it might be better to go with a different example.


In Spark, by default, the way data is distributed across partitions is undefined. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**.
In Spark, data is generally not distributed across partitions to be in the ncessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single `groupByKey` reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then organize those such that all values for any key lie within the same partition - this is called the **shuffle**.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

necessary


Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a `Hadoop` style shuffle.
Although the set of elements in each partition of newly shuffled data will be deterministic, the ordering of these elements is not. If one desires predictably ordered data following shuffle operations, [`mapPartitions`](#MapPartLink) can be used to sort each partition or `sortBy` can be used to perform a global sort. A similar operation, [`repartitionAndSortWithinPartitions`](#Repartition2Link`) coupled with `mapPartitions`, may be used to enact a Hadoop style shuffle.

Operations, which cause a shuffle include [`groupByKey`](#GroupByLink), [`sortByKey`](#SortByLink), [`reduceByKey`](#ReduceByLink), [`aggregateByKey`](#AggregateByLink), [`repartition`](#RepartitionLink), [`repartitionAndSortWithinPartitions`](#Repartition2Link`), [`coalesce`](#CoalesceLink), and [`countByKey`](#CountByLink).
Operations which can cause a shuffle include **repartion** operations like [`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'byKey** operations (except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink) , and **join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repartition


#### Performance Impact
**Shuffle** is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, internally, Spark creates a hash table, which, for large operations, can consume significant amounts of heap memory. When data does not fit in memory, for all shuffle operations with the exception of `sortByKey`, Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection. Since `sortByKey` does not spill these intermediate tables to disk, the shuffle operation may cause OOM errors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • @sryza for example to review the content here.
    Why is sortByKey different? this is my knowledge gap if so. This isn't referring to sort-based shuffle is it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few factual edits:

  • Spark always writes all shuffle data to disk on the map side.
  • A hash table is only used on the map side for reduceByKey and aggregateByKey, and only on the reduce side for the ByKey operations.
  • sortByKey no longer can OOM.

Also, I would avoid mentioning hash-based shuffle at all because sort-based shuffle is now pretty much what we expect everybody to use.


Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context.
As of Spark 1.1, Spark provides an alternative implementation for the hash-based shuffle described above. This sort-based shuffle avoids generating a file for every combination of map and reduce tasks. Instead, results from individual map jobs are kept in memory until they can't fit. Then, they are organized by the target reduce task (the one that needs those results) and then spilled to a single file, which are subsequently aggregated on the reduce side. Sort-based shuffle can significantly improve reduce-stage performance at the expense of moderately increased map-stage run-time.

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are not cleaned up from Spark's temporary storage until Spark is stopped, which means that long-running Spark jobs may consume available disk space. This is done so shuffle doesn't need to be re-computed if the lineage is recomputed. The temporary storage directory is specified by the `spark.local.dir` configuration parameter when configuring the Spark context.

Shuffle behavior can be fine-tuned by adjusting a variety of configuration parameters. See the 'Shuffle Behavior' section within the Spark Configuration Guide.

Expand Down