Skip to content

Conversation

@jiangxb1987
Copy link
Contributor

What changes were proposed in this pull request?

The RDD repartition also uses the round-robin way to distribute data, this can also cause incorrect answers on RDD workload the similar way as in #20393

However, the approach that fixes DataFrame.repartition() doesn't apply on the RDD repartition issue, because the input data can be non-comparable, as discussed in #20393 (comment)

Here, I propose a quick fix that distribute elements use their hashes, this will cause perf regression if you have highly skewed input data, but it will ensure result correctness.

How was this patch tested?

Added test case in RDDSuite to ensure RDD.repartition() generate consistent answers.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will necessarily be a breaking change [1] - though definitely worth doing given the data corruption/loss issue.
One alternative is to force a checkpoint() which will result in predictability, while avoiding the output skew.

Unfortunately, since checkpoint support is optional (based on whether checkpoint directory is configured), we cannot force this.

Perhaps a flag for users who want to preserve 'earlier' behavior via checkpoint ?

[1] Breaking change because in a lot of cases, coalasce with shuffle is done explicitly to prevent skew :-(

@mridulm
Copy link
Contributor

mridulm commented Jan 27, 2018

In addition, any use of random in spark code will get affected by this - unless input is an idempotent source; even if random initialization is done predictably with the partition index (which we were doing here anyway).
We might want to look at mllib and other places as well.

@SparkQA
Copy link

SparkQA commented Jan 27, 2018

Test build #86728 has finished for PR 20414 at commit 6910ed6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

felixcheung commented Jan 28, 2018

Just for context, I'm seeing RDD.repartition being used a lot, at the scale of almost every single job.

@shivaram
Copy link
Contributor

@jiangxb1987 @mridulm Could we have a special case of using the sort-based approach when the RDD type is comparable ? I think that should cover a bunch of the common cases and the hash version will only be used when keys are not comparable.

Also @mridulm your point about more things other than repartition being affected is definitely true (just in this file randomSampleWithRange I think is affected). I think the only way to solve this in general is to enforce deterministic ordering when constructing ShuffleRDDs ?

@jiangxb1987
Copy link
Contributor Author

Talked to @yanboliang offline, he claimed that the major use cases of RDD/DataFrame.repartition() in ml workloads he has observed are:

  1. During save models, you may need repartition() to reduce the number of output files, a typical special case is xxx.repartition(1);
  2. You may use repartition() to let the original data set to have more partitions, to gain a higher parallelism for following operations.

Actually for the first case, you shall use coalesce() instead of repartition() to get a similar effect, without need of another shuffle! Also, the scene don't strictly require the data set to distribute evenly, so the change from round-robin partitioning to hash partitioning should be fine.
For the latter case, if you have a bunch of data with the same values, the change may lead to high data skew and brings performance regression, currently the best-effort-approach we can choose is to perform a local sort if the data type is comparable (and that also requires a lot of work refactoring the ExternalSorter).

Another approach is that we may let the ShuffleBlockFetcherIterator to remember the sequence of block fetches, and force the blocks to be fetched one-by-one. This actually involves more issues because we may face memory limit and therefore have to spill the fetched blocks. IIUC this should resolve the issue for general cases.

@felixcheung
Copy link
Member

felixcheung commented Jan 29, 2018

Actually for the first case, you shall use coalesce() instead of repartition() to get a similar effect, without need of another shuffle!

Not quite - coalesce will not combine partitions across executors (aka shuffle) so you could still end up having many many files.

I have seen that ^ quite a bit with large scale ML. But FWIW, my comment earlier was for both "regular" use cases and ML use cases.

@jiangxb1987
Copy link
Contributor Author

@felixcheung You are right that I didn't make it clear there should be still many shuffle blocks, and if you have the read task retried it should be slower than using repartition(1) directly.

Now I tend to fix the issue following the latter fix-shuffle-fetch-order way, since it may resolve for general cases.

@cloud-fan
Copy link
Contributor

Not quite - coalesce will not combine partitions across executors (aka shuffle) so you could still end up having many many files.

I'm not sure if I follow here. For coalesce(1) Spark just launches a single task to handle all the data partitions. If the final action is saving file, we still have only one file at the end. Compared to repartition(1), I think the only difference is the cost of task retry.

I think we can special case repartition(1), if there is only one reducer, we don't need to add the local sort.

@jiangxb1987
Copy link
Contributor Author

@cloud-fan Yea you provide a more clear statement here, and I totally agree!

@mridulm
Copy link
Contributor

mridulm commented Jan 29, 2018

@shivaram Thinking more, this might affect everything which does a zip (or variants/similar idioms like limit K, etc) on partition should be affected - with random + index in coalesce + shuffle=true being one special case.

Essentially anything which assumes that order of records in a partition will always be the same - currently,

  • Reading from an external immutable source like hdfs, etc (including checkpoint)
  • Reading from block store
  • Sorted partitions
    should guarantee this - others need not.

The more I think about it, I like @sameeragarwal's suggestion in #20393, a general solution for this could be introduce deterministic output for shuffle fetch - when enabled takes a more expensive but repeatable iteration of shuffle fetch.

This assumes that spark shuffle is always repeatable given same input (I am yet to look into this in detail when spills are involved - any thoughts @sameeragarwal ?), which could be an implementation detail; but we could make it a requirement for shuffle.

Note that we might be able to avoid this additional cost for most of the current usecases (otherwise we would have faced this problem 2 major releases ago !); so actual user impact, hopefully, might not be as high.

@jiangxb1987
Copy link
Contributor Author

@mridulm I also agree we should follow @sameeragarwal 's suggestion to let shuffle fetch produce deterministic output, and only do this for a few operations (e.g. repartition/zipWithIndex, do we have more?) IIUC spill should NOT affect the result, but if you find any suspects, please kindly share them with us. :)

@mridulm
Copy link
Contributor

mridulm commented Jan 29, 2018

@jiangxb1987 Unfortunately I am unable to analyze this in detail; but hopefully can give some pointers, which I hope, helps !

One example I can think of is, for shuffle which uses Aggregator (like combineByKey), via ExternalAppendOnlyMap.
The order in which we replay the keys with the same hash is non deterministic from what I remember - for example if first run did not result in any spills, second run had 3 spills and third run had 7, the order of keys (with same hash) could be different in each.

Similarly, with sort based shuffle, depending on the length of the data array in AppendOnlyMap (which is determined by whether we spilt or not) we can get different sort order's ?
Similarly for the actual sort itself, the merge quite clearly is sensitive to number of spills (for example when no aggregator or ordering, it is simply iterators.iterator.flatten).

There might be other cases where this is happening - I have not regularly looked at this part of the codebase in a while now unfortunately.

Please note that all the cases above, there is no ordering defined.

@jiangxb1987
Copy link
Contributor Author

Hey I searched the ExternalAppendOnlyMap and here are the findings:
The ExternalAppendOnlyMap claims it keeps the sorted content, but it actually uses a HashComparator that compare the elements by their hashes. Luckily, it sort the elements using TimSort which is stable, that means, even if there exists hash collisions, the output sequence should still be deterministic, as long as the inputs are (which we can achieve by modifying ShuffleBlockFetcherIterator per previous discussion).

We may need to check for all the other places we may spill/compare objects to ensure we generate deterministic output sequence everywhere, though.

@mridulm
Copy link
Contributor

mridulm commented Jan 30, 2018

@jiangxb1987 You are correct when the sizes of the map's are same.
But if the map sizes are different, the resulting order can be different - which can happen when requests for additional memory follows different patterns on re-execution (trigger'ing spill).

@jiangxb1987
Copy link
Contributor Author

Ouch... Yea, we have to think out a way to make it deterministic under hash collisions.

@sameeragarwal
Copy link
Member

sameeragarwal commented Jan 30, 2018

Thanks @mridulm, all great points! We should investigate what's needed to guarantee ordering for spilled fetches.

@SparkQA
Copy link

SparkQA commented Jul 25, 2018

Test build #93558 has finished for PR 20414 at commit 6910ed6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Hi, @jiangxb1987 . Could you close this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants