-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24235][SS] Implement continuous shuffle writer for single reader partition. #21428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
1d6b718
b5d1008
af40769
46456dc
2ea8a6f
955ac79
8cefb72
f91bfe7
2590292
859e6e4
b23b7bb
97f7e8f
de21b1c
7dcf51a
ad0b5aa
c9adee5
63d38d8
331f437
f3ce675
e0108d7
f400651
1aaad8d
59890d4
af1508c
65837ac
a68fae2
98d55e4
e6b9118
629455b
cb6d42b
59d6ff7
f90388c
4bbdeae
e57531d
cff37c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,14 +22,14 @@ import org.apache.spark.rpc.RpcEndpointRef | |
| import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
|
|
||
| /** | ||
| * A [[ContinuousShuffleWriter]] sending data to [[UnsafeRowReceiver]] instances. | ||
| * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. | ||
| * | ||
| * @param writerId The partition ID of this writer. | ||
| * @param writerId The partition ID of this writer. | ||
|
||
| * @param outputPartitioner The partitioner on the reader side of the shuffle. | ||
| * @param endpoints The [[UnsafeRowReceiver]] endpoints to write to. Indexed by partition ID within | ||
| * outputPartitioner. | ||
| * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by | ||
| * partition ID within outputPartitioner. | ||
| */ | ||
| class UnsafeRowWriter( | ||
| class RPCContinuousShuffleWriter( | ||
| writerId: Int, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: rename to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I worry that partitionId is ambiguous with the partition to which the shuffle data is being written.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok makes sense. |
||
| outputPartitioner: Partitioner, | ||
| endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ import scala.collection.mutable | |
| import org.apache.spark.{HashPartitioner, Partition, TaskContext, TaskContextImpl} | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} | ||
| import org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition, ContinuousShuffleReadRDD, UnsafeRowWriter} | ||
| import org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition, ContinuousShuffleReadRDD, RPCContinuousShuffleWriter} | ||
| import org.apache.spark.sql.streaming.StreamTest | ||
| import org.apache.spark.sql.types.{DataType, IntegerType} | ||
|
|
||
|
|
@@ -85,7 +85,8 @@ class ContinuousShuffleSuite extends StreamTest { | |
|
|
||
| test("one epoch") { | ||
|
||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new UnsafeRowWriter(0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator(1, 2, 3)) | ||
|
|
||
|
|
@@ -94,7 +95,8 @@ class ContinuousShuffleSuite extends StreamTest { | |
|
|
||
| test("multiple epochs") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new UnsafeRowWriter(0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator(1, 2, 3)) | ||
| writer.write(Iterator(4, 5, 6)) | ||
|
|
@@ -107,7 +109,8 @@ class ContinuousShuffleSuite extends StreamTest { | |
| val data = new MultipleEpochRDD(1, Array(), Array(1, 2), Array(), Array(), Array(3, 4), Array()) | ||
|
|
||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new UnsafeRowWriter(0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator()) | ||
| writer.write(Iterator(1, 2)) | ||
|
|
@@ -126,7 +129,8 @@ class ContinuousShuffleSuite extends StreamTest { | |
|
|
||
| test("blocks waiting for writer") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new UnsafeRowWriter(0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| val readerEpoch = reader.compute(reader.partitions(0), ctx) | ||
|
|
||
|
|
@@ -152,7 +156,7 @@ class ContinuousShuffleSuite extends StreamTest { | |
| val reader = new ContinuousShuffleReadRDD( | ||
| sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) | ||
| val writers = (0 until 3).map { idx => | ||
| new UnsafeRowWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| } | ||
|
|
||
| writers(0).write(Iterator(1, 4, 7)) | ||
|
|
@@ -176,7 +180,7 @@ class ContinuousShuffleSuite extends StreamTest { | |
| val reader = new ContinuousShuffleReadRDD( | ||
| sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) | ||
| val writers = (0 until 3).map { idx => | ||
| new UnsafeRowWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| } | ||
|
|
||
| writers(1).write(Iterator()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If we need the rename here, how about other messages name and comments?
https://github.com/apache/spark/pull/21428/files#diff-4072457048f805637bfce2c779608756R29
https://github.com/apache/spark/pull/21428/files#diff-4072457048f805637bfce2c779608756R35
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Caught what I think are the rest.