-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-24235][SS] Implement continuous shuffle writer for single reader partition. #21428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
1d6b718
b5d1008
af40769
46456dc
2ea8a6f
955ac79
8cefb72
f91bfe7
2590292
859e6e4
b23b7bb
97f7e8f
de21b1c
7dcf51a
ad0b5aa
c9adee5
63d38d8
331f437
f3ce675
e0108d7
f400651
1aaad8d
59890d4
af1508c
65837ac
a68fae2
98d55e4
e6b9118
629455b
cb6d42b
59d6ff7
f90388c
4bbdeae
e57531d
cff37c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,155 +68,6 @@ class ContinuousShuffleSuite extends StreamTest { | |
| rdd.compute(rdd.partitions(0), ctx).toSeq.map(_.getInt(0)) | ||
| } | ||
|
|
||
| test("one epoch") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator(1, 2, 3)) | ||
|
|
||
| assert(readEpoch(reader) == Seq(1, 2, 3)) | ||
| } | ||
|
|
||
| test("multiple epochs") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator(1, 2, 3)) | ||
| writer.write(Iterator(4, 5, 6)) | ||
|
|
||
| assert(readEpoch(reader) == Seq(1, 2, 3)) | ||
| assert(readEpoch(reader) == Seq(4, 5, 6)) | ||
| } | ||
|
|
||
| test("empty epochs") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator()) | ||
| writer.write(Iterator(1, 2)) | ||
| writer.write(Iterator()) | ||
| writer.write(Iterator()) | ||
| writer.write(Iterator(3, 4)) | ||
| writer.write(Iterator()) | ||
|
|
||
| assert(readEpoch(reader) == Seq()) | ||
| assert(readEpoch(reader) == Seq(1, 2)) | ||
| assert(readEpoch(reader) == Seq()) | ||
| assert(readEpoch(reader) == Seq()) | ||
| assert(readEpoch(reader) == Seq(3, 4)) | ||
| assert(readEpoch(reader) == Seq()) | ||
| } | ||
|
|
||
| test("blocks waiting for writer") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| val readerEpoch = reader.compute(reader.partitions(0), ctx) | ||
|
|
||
| val readRowThread = new Thread { | ||
| override def run(): Unit = { | ||
| assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1)) | ||
| } | ||
| } | ||
| readRowThread.start() | ||
|
|
||
| eventually(timeout(streamingTimeout)) { | ||
| assert(readRowThread.getState == Thread.State.TIMED_WAITING) | ||
| } | ||
|
|
||
| // Once we write the epoch the thread should stop waiting and succeed. | ||
| writer.write(Iterator(1)) | ||
| readRowThread.join() | ||
| } | ||
|
|
||
| test("multiple writer partitions") { | ||
| val numWriterPartitions = 3 | ||
|
|
||
| val reader = new ContinuousShuffleReadRDD( | ||
| sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) | ||
| val writers = (0 until 3).map { idx => | ||
| new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| } | ||
|
|
||
| writers(0).write(Iterator(1, 4, 7)) | ||
| writers(1).write(Iterator(2, 5)) | ||
| writers(2).write(Iterator(3, 6)) | ||
|
|
||
| writers(0).write(Iterator(4, 7, 10)) | ||
| writers(1).write(Iterator(5, 8)) | ||
| writers(2).write(Iterator(6, 9)) | ||
|
|
||
| // Since there are multiple asynchronous writers, the original row sequencing is not guaranteed. | ||
| // The epochs should be deterministically preserved, however. | ||
| assert(readEpoch(reader).toSet == Seq(1, 2, 3, 4, 5, 6, 7).toSet) | ||
| assert(readEpoch(reader).toSet == Seq(4, 5, 6, 7, 8, 9, 10).toSet) | ||
| } | ||
|
|
||
| test("reader epoch only ends when all writer partitions write it") { | ||
| val numWriterPartitions = 3 | ||
|
|
||
| val reader = new ContinuousShuffleReadRDD( | ||
| sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) | ||
| val writers = (0 until 3).map { idx => | ||
| new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| } | ||
|
|
||
| writers(1).write(Iterator()) | ||
| writers(2).write(Iterator()) | ||
|
|
||
| val readerEpoch = reader.compute(reader.partitions(0), ctx) | ||
|
|
||
| val readEpochMarkerThread = new Thread { | ||
| override def run(): Unit = { | ||
| assert(!readerEpoch.hasNext) | ||
| } | ||
| } | ||
|
|
||
| readEpochMarkerThread.start() | ||
| eventually(timeout(streamingTimeout)) { | ||
| assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING) | ||
| } | ||
|
|
||
| writers(0).write(Iterator()) | ||
| readEpochMarkerThread.join() | ||
| } | ||
|
|
||
| test("receiver stopped with row last") { | ||
| val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint | ||
| send( | ||
| endpoint, | ||
| ReceiverEpochMarker(0), | ||
| ReceiverRow(0, unsafeRow(111)) | ||
| ) | ||
|
|
||
| ctx.markTaskCompleted(None) | ||
| val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader | ||
| eventually(timeout(streamingTimeout)) { | ||
| assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get()) | ||
| } | ||
| } | ||
|
|
||
| test("receiver stopped with marker last") { | ||
| val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint | ||
| send( | ||
| endpoint, | ||
| ReceiverRow(0, unsafeRow(111)), | ||
| ReceiverEpochMarker(0) | ||
| ) | ||
|
|
||
| ctx.markTaskCompleted(None) | ||
| val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader | ||
| eventually(timeout(streamingTimeout)) { | ||
| assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get()) | ||
| } | ||
| } | ||
|
|
||
| test("reader - one epoch") { | ||
| val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint | ||
|
|
@@ -416,4 +267,153 @@ class ContinuousShuffleSuite extends StreamTest { | |
| val thirdEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet | ||
| assert(thirdEpoch == Set("writer1-row1", "writer2-row0")) | ||
| } | ||
|
|
||
| test("one epoch") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator(1, 2, 3)) | ||
|
|
||
| assert(readEpoch(reader) == Seq(1, 2, 3)) | ||
| } | ||
|
|
||
| test("multiple epochs") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator(1, 2, 3)) | ||
| writer.write(Iterator(4, 5, 6)) | ||
|
|
||
| assert(readEpoch(reader) == Seq(1, 2, 3)) | ||
| assert(readEpoch(reader) == Seq(4, 5, 6)) | ||
| } | ||
|
|
||
| test("empty epochs") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| writer.write(Iterator()) | ||
| writer.write(Iterator(1, 2)) | ||
| writer.write(Iterator()) | ||
| writer.write(Iterator()) | ||
| writer.write(Iterator(3, 4)) | ||
| writer.write(Iterator()) | ||
|
|
||
| assert(readEpoch(reader) == Seq()) | ||
| assert(readEpoch(reader) == Seq(1, 2)) | ||
| assert(readEpoch(reader) == Seq()) | ||
| assert(readEpoch(reader) == Seq()) | ||
| assert(readEpoch(reader) == Seq(3, 4)) | ||
| assert(readEpoch(reader) == Seq()) | ||
| } | ||
|
|
||
| test("blocks waiting for writer") { | ||
| val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val writer = new RPCContinuousShuffleWriter( | ||
| 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
|
|
||
| val readerEpoch = reader.compute(reader.partitions(0), ctx) | ||
|
|
||
| val readRowThread = new Thread { | ||
| override def run(): Unit = { | ||
| assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1)) | ||
| } | ||
| } | ||
| readRowThread.start() | ||
|
|
||
| eventually(timeout(streamingTimeout)) { | ||
| assert(readRowThread.getState == Thread.State.TIMED_WAITING) | ||
| } | ||
|
|
||
| // Once we write the epoch the thread should stop waiting and succeed. | ||
| writer.write(Iterator(1)) | ||
| readRowThread.join() | ||
| } | ||
|
|
||
| test("multiple writer partitions") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would we want to have another test which covers out-of-order epoch between writers (if that's valid case for us), or rely on the test in ContinuousShuffleReadRDD?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think "reader epoch only ends when all writer partitions write it" is a sufficient test for that. |
||
| val numWriterPartitions = 3 | ||
|
|
||
| val reader = new ContinuousShuffleReadRDD( | ||
| sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) | ||
| val writers = (0 until 3).map { idx => | ||
| new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| } | ||
|
|
||
| writers(0).write(Iterator(1, 4, 7)) | ||
| writers(1).write(Iterator(2, 5)) | ||
| writers(2).write(Iterator(3, 6)) | ||
|
|
||
| writers(0).write(Iterator(4, 7, 10)) | ||
| writers(1).write(Iterator(5, 8)) | ||
| writers(2).write(Iterator(6, 9)) | ||
|
|
||
| // Since there are multiple asynchronous writers, the original row sequencing is not guaranteed. | ||
| // The epochs should be deterministically preserved, however. | ||
| assert(readEpoch(reader).toSet == Seq(1, 2, 3, 4, 5, 6, 7).toSet) | ||
| assert(readEpoch(reader).toSet == Seq(4, 5, 6, 7, 8, 9, 10).toSet) | ||
| } | ||
|
|
||
| test("reader epoch only ends when all writer partitions write it") { | ||
| val numWriterPartitions = 3 | ||
|
|
||
| val reader = new ContinuousShuffleReadRDD( | ||
| sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) | ||
| val writers = (0 until 3).map { idx => | ||
| new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) | ||
| } | ||
|
|
||
| writers(1).write(Iterator()) | ||
| writers(2).write(Iterator()) | ||
|
|
||
| val readerEpoch = reader.compute(reader.partitions(0), ctx) | ||
|
|
||
| val readEpochMarkerThread = new Thread { | ||
| override def run(): Unit = { | ||
| assert(!readerEpoch.hasNext) | ||
| } | ||
| } | ||
|
|
||
| readEpochMarkerThread.start() | ||
| eventually(timeout(streamingTimeout)) { | ||
| assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING) | ||
| } | ||
|
|
||
| writers(0).write(Iterator()) | ||
| readEpochMarkerThread.join() | ||
|
||
| } | ||
|
|
||
| test("receiver stopped with row last") { | ||
| val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint | ||
| send( | ||
| endpoint, | ||
| ReceiverEpochMarker(0), | ||
| ReceiverRow(0, unsafeRow(111)) | ||
| ) | ||
|
|
||
| ctx.markTaskCompleted(None) | ||
| val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader | ||
| eventually(timeout(streamingTimeout)) { | ||
| assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get()) | ||
| } | ||
| } | ||
|
|
||
| test("receiver stopped with marker last") { | ||
| val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) | ||
| val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint | ||
| send( | ||
| endpoint, | ||
| ReceiverRow(0, unsafeRow(111)), | ||
| ReceiverEpochMarker(0) | ||
| ) | ||
|
|
||
| ctx.markTaskCompleted(None) | ||
| val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader | ||
| eventually(timeout(streamingTimeout)) { | ||
| assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get()) | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there isnt a test where a RPCContinuousShuffleWriter writes to multiple reader endpoints.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline and above - this is a deliberate limitation of the PR. |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's better to add a timeout here, such as
readRowThread.join(streamingTimeout.toMillis). Without a timeout, if there is a bug causing this hang, we will need to wait until the jenkins build timeout, which is much longer.