-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24653][tests] Avoid cross-job pollution in TestUtils / SpillListener. #21639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…stener. There is a narrow race in this code that is caused when the code being run in assertSpilled / assertNotSpilled runs more than a single job. SpillListener assumed that only a single job was run, and so would only block waiting for that single job to finish when `numSpilledStages` was called. But some tests (like SQL tests that call `checkAnswer`) run more than one job, and so that wait was basically a no-op. This could cause the next test to install a listener to receive events from the previous job. Which could cause test failures in certain cases. The change fixes that race, and also uninstalls listeners after the test runs, so they don't accumulate when the SparkContext is shared among multiple tests.
|
Test build #92312 has finished for PR 21639 at commit
|
|
Seems the JIRA number is not related? |
| body | ||
| assert(spillListener.numSpilledStages > 0, s"expected $identifier to spill, but did not") | ||
| withListener(sc, new SpillListener) { listener => | ||
| val ret = body |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something obvious, but why shall we need the return value here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the return type in the closure, but the method itself returns Unit, so all that can be cleaned up.
|
Oops, no idea how I got the wrong bug. |
|
Test build #92345 has finished for PR 21639 at commit
|
|
lgtm |
|
retest this please |
|
Test build #93063 has finished for PR 21639 at commit
|
|
retest this please |
|
Test build #93091 has finished for PR 21639 at commit
|
|
retest this please |
|
Test build #93185 has started for PR 21639 at commit |
| * this method will wait until all events posted to the listener bus are processed, and then | ||
| * remove the listener from the bus. | ||
| */ | ||
| def withListener[L <: SparkListener](sc: SparkContext, listener: L) (body: L => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private? hardly matters.
|
Test build #4226 has finished for PR 21639 at commit
|
|
retest this please |
|
Test build #93861 has finished for PR 21639 at commit
|
|
Merged to master |
There is a narrow race in this code that is caused when the code being
run in assertSpilled / assertNotSpilled runs more than a single job.
SpillListener assumed that only a single job was run, and so would only
block waiting for that single job to finish when
numSpilledStageswascalled. But some tests (like SQL tests that call
checkAnswer) run morethan one job, and so that wait was basically a no-op.
This could cause the next test to install a listener to receive events
from the previous job. Which could cause test failures in certain cases.
The change fixes that race, and also uninstalls listeners after the
test runs, so they don't accumulate when the SparkContext is shared
among multiple tests.