Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1d6b718
continuous shuffle read RDD
jose-torres May 15, 2018
b5d1008
docs
jose-torres May 17, 2018
af40769
Merge remote-tracking branch 'apache/master' into readerRddMaster
jose-torres May 17, 2018
46456dc
fix ctor
jose-torres May 17, 2018
2ea8a6f
multiple partition test
jose-torres May 17, 2018
955ac79
unset task context after test
jose-torres May 17, 2018
8cefb72
conf from RDD
jose-torres May 18, 2018
f91bfe7
endpoint name
jose-torres May 18, 2018
2590292
testing bool
jose-torres May 18, 2018
859e6e4
tests
jose-torres May 18, 2018
b23b7bb
take instead of poll
jose-torres May 18, 2018
97f7e8f
add interface
jose-torres May 18, 2018
de21b1c
clarify comment
jose-torres May 18, 2018
7dcf51a
multiple
jose-torres May 18, 2018
ad0b5aa
writer with 1 reader partition
jose-torres May 25, 2018
c9adee5
docs and iface
jose-torres May 25, 2018
63d38d8
Merge remote-tracking branch 'apache/master' into writerTask
jose-torres May 25, 2018
331f437
increment epoch
jose-torres May 25, 2018
f3ce675
undo oop
jose-torres May 25, 2018
e0108d7
make rdd loop
jose-torres May 25, 2018
f400651
remote write RDD
jose-torres May 25, 2018
1aaad8d
rename classes
jose-torres May 25, 2018
59890d4
combine suites
jose-torres May 25, 2018
af1508c
fully rm old suite
jose-torres May 25, 2018
65837ac
reorder tests
jose-torres May 29, 2018
a68fae2
return future
jose-torres May 31, 2018
98d55e4
finish getting rid of old name
jose-torres May 31, 2018
e6b9118
synchronous
jose-torres May 31, 2018
629455b
finish rename
jose-torres May 31, 2018
cb6d42b
add timeouts
jose-torres Jun 13, 2018
59d6ff7
unalign
jose-torres Jun 13, 2018
f90388c
add note
jose-torres Jun 13, 2018
4bbdeae
parallel
jose-torres Jun 13, 2018
e57531d
fix compile
jose-torres Jun 13, 2018
cff37c4
fix compile
jose-torres Jun 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
combine suites
  • Loading branch information
jose-torres committed May 25, 2018
commit 59890d47a1f34fe9de6c16d03e8d644a40f6180b
Original file line number Diff line number Diff line change
Expand Up @@ -58,235 +58,5 @@ class ContinuousShuffleReadSuite extends StreamTest {
super.afterEach()
}

test("receiver stopped with row last") {
val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
send(
endpoint,
ReceiverEpochMarker(0),
ReceiverRow(0, unsafeRow(111))
)

ctx.markTaskCompleted(None)
val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
eventually(timeout(streamingTimeout)) {
assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get())
}
}

test("receiver stopped with marker last") {
val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
send(
endpoint,
ReceiverRow(0, unsafeRow(111)),
ReceiverEpochMarker(0)
)

ctx.markTaskCompleted(None)
val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
eventually(timeout(streamingTimeout)) {
assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get())
}
}

test("one epoch") {
val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
send(
endpoint,
ReceiverRow(0, unsafeRow(111)),
ReceiverRow(0, unsafeRow(222)),
ReceiverRow(0, unsafeRow(333)),
ReceiverEpochMarker(0)
)

val iter = rdd.compute(rdd.partitions(0), ctx)
assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
}

test("multiple epochs") {
val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
send(
endpoint,
ReceiverRow(0, unsafeRow(111)),
ReceiverEpochMarker(0),
ReceiverRow(0, unsafeRow(222)),
ReceiverRow(0, unsafeRow(333)),
ReceiverEpochMarker(0)
)

val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111))

val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333))
}

test("empty epochs") {
val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint

send(
endpoint,
ReceiverEpochMarker(0),
ReceiverEpochMarker(0),
ReceiverRow(0, unsafeRow(111)),
ReceiverEpochMarker(0),
ReceiverEpochMarker(0),
ReceiverEpochMarker(0)
)

assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)

val thirdEpoch = rdd.compute(rdd.partitions(0), ctx)
assert(thirdEpoch.toSeq.map(_.getInt(0)) == Seq(111))

assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
}

test("multiple partitions") {
val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 5)
// Send all data before processing to ensure there's no crossover.
for (p <- rdd.partitions) {
val part = p.asInstanceOf[ContinuousShuffleReadPartition]
// Send index for identification.
send(
part.endpoint,
ReceiverRow(0, unsafeRow(part.index)),
ReceiverEpochMarker(0)
)
}

for (p <- rdd.partitions) {
val part = p.asInstanceOf[ContinuousShuffleReadPartition]
val iter = rdd.compute(part, ctx)
assert(iter.next().getInt(0) == part.index)
assert(!iter.hasNext)
}
}

test("blocks waiting for new rows") {
val rdd = new ContinuousShuffleReadRDD(
sparkContext, numPartitions = 1, epochIntervalMs = Long.MaxValue)
val epoch = rdd.compute(rdd.partitions(0), ctx)

val readRowThread = new Thread {
override def run(): Unit = {
try {
epoch.next().getInt(0)
} catch {
case _: InterruptedException => // do nothing - expected at test ending
}
}
}

try {
readRowThread.start()
eventually(timeout(streamingTimeout)) {
assert(readRowThread.getState == Thread.State.TIMED_WAITING)
}
} finally {
readRowThread.interrupt()
readRowThread.join()
}
}

test("multiple writers") {
val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1, numShuffleWriters = 3)
val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
send(
endpoint,
ReceiverRow(0, unsafeRow("writer0-row0")),
ReceiverRow(1, unsafeRow("writer1-row0")),
ReceiverRow(2, unsafeRow("writer2-row0")),
ReceiverEpochMarker(0),
ReceiverEpochMarker(1),
ReceiverEpochMarker(2)
)

val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
assert(firstEpoch.toSeq.map(_.getUTF8String(0).toString).toSet ==
Set("writer0-row0", "writer1-row0", "writer2-row0"))
}

test("epoch only ends when all writers send markers") {
val rdd = new ContinuousShuffleReadRDD(
sparkContext, numPartitions = 1, numShuffleWriters = 3, epochIntervalMs = Long.MaxValue)
val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
send(
endpoint,
ReceiverRow(0, unsafeRow("writer0-row0")),
ReceiverRow(1, unsafeRow("writer1-row0")),
ReceiverRow(2, unsafeRow("writer2-row0")),
ReceiverEpochMarker(0),
ReceiverEpochMarker(2)
)

val epoch = rdd.compute(rdd.partitions(0), ctx)
val rows = (0 until 3).map(_ => epoch.next()).toSet
assert(rows.map(_.getUTF8String(0).toString) ==
Set("writer0-row0", "writer1-row0", "writer2-row0"))

// After checking the right rows, block until we get an epoch marker indicating there's no next.
// (Also fail the assertion if for some reason we get a row.)

val readEpochMarkerThread = new Thread {
override def run(): Unit = {
assert(!epoch.hasNext)
}
}

readEpochMarkerThread.start()
eventually(timeout(streamingTimeout)) {
assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING)
}

// Send the last epoch marker - now the epoch should finish.
send(endpoint, ReceiverEpochMarker(1))
eventually(timeout(streamingTimeout)) {
!readEpochMarkerThread.isAlive
}

// Join to pick up assertion failures.
readEpochMarkerThread.join()
}

test("writer epochs non aligned") {
val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1, numShuffleWriters = 3)
val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
// We send multiple epochs for 0, then multiple for 1, then multiple for 2. The receiver should
// collate them as though the markers were aligned in the first place.
send(
endpoint,
ReceiverRow(0, unsafeRow("writer0-row0")),
ReceiverEpochMarker(0),
ReceiverRow(0, unsafeRow("writer0-row1")),
ReceiverEpochMarker(0),
ReceiverEpochMarker(0),

ReceiverEpochMarker(1),
ReceiverRow(1, unsafeRow("writer1-row0")),
ReceiverEpochMarker(1),
ReceiverRow(1, unsafeRow("writer1-row1")),
ReceiverEpochMarker(1),

ReceiverEpochMarker(2),
ReceiverEpochMarker(2),
ReceiverRow(2, unsafeRow("writer2-row0")),
ReceiverEpochMarker(2)
)

val firstEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet
assert(firstEpoch == Set("writer0-row0"))

val secondEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet
assert(secondEpoch == Set("writer0-row1", "writer1-row0"))

val thirdEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet
assert(thirdEpoch == Set("writer1-row1", "writer2-row0"))
}
}
Loading