-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API #25007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
1957e82
857552a
d13037f
8f5fb60
e17c7ea
3f0c131
f982df7
6891197
7b44ed2
df75f1f
a8558af
806d7bb
3167030
70f59db
3083d86
4c3d692
2421c92
982f207
594d1e2
66aae91
8b432f9
9f597dd
86c1829
a7885ae
9893c6c
cd897e7
9f17b9b
e53a001
56fa450
b8b7b8d
2d29404
06ea01a
7dceec9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -383,13 +383,19 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC | |
| // simultaneously, and everything is still OK | ||
|
|
||
| def writeAndClose( | ||
| writer: ShuffleWriter[Int, Int])( | ||
| writer: ShuffleWriter[Int, Int], | ||
| taskContext: TaskContext)( | ||
| iter: Iterator[(Int, Int)]): Option[MapStatus] = { | ||
| val files = writer.write(iter) | ||
| writer.stop(true) | ||
| TaskContext.setTaskContext(taskContext) | ||
|
||
| try { | ||
| val files = writer.write(iter) | ||
| writer.stop(true) | ||
| } finally { | ||
| TaskContext.unset() | ||
| } | ||
| } | ||
| val interleaver = new InterleaveIterators( | ||
| data1, writeAndClose(writer1), data2, writeAndClose(writer2)) | ||
| data1, writeAndClose(writer1, context1), data2, writeAndClose(writer2, context2)) | ||
| val (mapOutput1, mapOutput2) = interleaver.run() | ||
|
|
||
| // check that we can read the map output and it has the right data | ||
|
|
@@ -405,8 +411,10 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC | |
|
|
||
| val taskContext = new TaskContextImpl( | ||
| 1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem) | ||
| TaskContext.setTaskContext(taskContext) | ||
| val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics() | ||
| val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1, taskContext, metrics) | ||
| TaskContext.unset() | ||
| val readData = reader.read().toIndexedSeq | ||
| assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as you're touching this, can you fix the indentation of the parameters?