Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1d6b718
continuous shuffle read RDD
jose-torres May 15, 2018
b5d1008
docs
jose-torres May 17, 2018
af40769
Merge remote-tracking branch 'apache/master' into readerRddMaster
jose-torres May 17, 2018
46456dc
fix ctor
jose-torres May 17, 2018
2ea8a6f
multiple partition test
jose-torres May 17, 2018
955ac79
unset task context after test
jose-torres May 17, 2018
8cefb72
conf from RDD
jose-torres May 18, 2018
f91bfe7
endpoint name
jose-torres May 18, 2018
2590292
testing bool
jose-torres May 18, 2018
859e6e4
tests
jose-torres May 18, 2018
b23b7bb
take instead of poll
jose-torres May 18, 2018
97f7e8f
add interface
jose-torres May 18, 2018
de21b1c
clarify comment
jose-torres May 18, 2018
7dcf51a
multiple
jose-torres May 18, 2018
ad0b5aa
writer with 1 reader partition
jose-torres May 25, 2018
c9adee5
docs and iface
jose-torres May 25, 2018
63d38d8
Merge remote-tracking branch 'apache/master' into writerTask
jose-torres May 25, 2018
331f437
increment epoch
jose-torres May 25, 2018
f3ce675
undo oop
jose-torres May 25, 2018
e0108d7
make rdd loop
jose-torres May 25, 2018
f400651
remote write RDD
jose-torres May 25, 2018
1aaad8d
rename classes
jose-torres May 25, 2018
59890d4
combine suites
jose-torres May 25, 2018
af1508c
fully rm old suite
jose-torres May 25, 2018
65837ac
reorder tests
jose-torres May 29, 2018
a68fae2
return future
jose-torres May 31, 2018
98d55e4
finish getting rid of old name
jose-torres May 31, 2018
e6b9118
synchronous
jose-torres May 31, 2018
629455b
finish rename
jose-torres May 31, 2018
cb6d42b
add timeouts
jose-torres Jun 13, 2018
59d6ff7
unalign
jose-torres Jun 13, 2018
f90388c
add note
jose-torres Jun 13, 2018
4bbdeae
parallel
jose-torres Jun 13, 2018
e57531d
fix compile
jose-torres Jun 13, 2018
cff37c4
fix compile
jose-torres Jun 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'apache/master' into writerTask
  • Loading branch information
jose-torres committed May 25, 2018
commit 63d38d849107eed226449cec8d24c2241cd583c9
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.NextIterator

case class ContinuousShuffleReadPartition(index: Int, queueSize: Int, numShuffleWriters: Int)
case class ContinuousShuffleReadPartition(
index: Int,
queueSize: Int,
numShuffleWriters: Int,
epochIntervalMs: Long)
extends Partition {
// Initialized only on the executor, and only once even as we call compute() multiple times.
lazy val (reader: ContinuousShuffleReader, endpoint) = {
val env = SparkEnv.get.rpcEnv
val receiver = new UnsafeRowReceiver(queueSize, numShuffleWriters, env)
val endpoint = env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID().toString}", receiver)
val receiver = new UnsafeRowReceiver(queueSize, numShuffleWriters, epochIntervalMs, env)
val endpoint = env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID()}", receiver)

TaskContext.get().addTaskCompletionListener { ctx =>
env.stop(endpoint)
}
Expand All @@ -43,15 +48,24 @@ case class ContinuousShuffleReadPartition(index: Int, queueSize: Int, numShuffle
* RDD at the map side of each continuous processing shuffle task. Upstream tasks send their
* shuffle output to the wrapped receivers in partitions of this RDD; each of the RDD's tasks
* poll from their receiver until an epoch marker is sent.
*
* @param sc the RDD context
* @param numPartitions the number of read partitions for this RDD
* @param queueSize the size of the row buffers to use
* @param numShuffleWriters the number of continuous shuffle writers feeding into this RDD
* @param epochIntervalMs the checkpoint interval of the streaming query
*/
class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int, numShuffleWriters: Int = 1)
extends RDD[UnsafeRow](sc, Nil) {

private val queueSize = sc.conf.get(SQLConf.CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE)
class ContinuousShuffleReadRDD(
sc: SparkContext,
numPartitions: Int,
queueSize: Int = 1024,
numShuffleWriters: Int = 1,
epochIntervalMs: Long = 1000)
extends RDD[UnsafeRow](sc, Nil) {

override protected def getPartitions: Array[Partition] = {
(0 until numPartitions).map { partIndex =>
ContinuousShuffleReadPartition(partIndex, queueSize, numShuffleWriters)
ContinuousShuffleReadPartition(partIndex, queueSize, numShuffleWriters, epochIntervalMs)
}.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.spark.sql.execution.streaming.continuous.shuffle

import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}

import scala.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
Expand All @@ -29,6 +27,10 @@ import org.apache.spark.util.NextIterator

/**
* Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
*
* Each message comes tagged with writerId, identifying which writer the message is coming
* from. The receiver will only begin the next epoch once all writers have sent an epoch
* marker ending the current epoch.
*/
private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable {
def writerId: Int
Expand All @@ -47,6 +49,7 @@ private[shuffle] case class ReceiverEpochMarker(writerId: Int) extends UnsafeRow
private[shuffle] class UnsafeRowReceiver(
queueSize: Int,
numShuffleWriters: Int,
epochIntervalMs: Long,
override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging {
// Note that this queue will be drained from the main task thread and populated in the RPC
Expand All @@ -70,7 +73,8 @@ private[shuffle] class UnsafeRowReceiver(

override def read(): Iterator[UnsafeRow] = {
new NextIterator[UnsafeRow] {
private val numWriterEpochMarkers = new AtomicInteger(0)
// An array of flags for whether each writer ID has gotten an epoch marker.
private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false)

private val executor = Executors.newFixedThreadPool(numShuffleWriters)
private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
Expand All @@ -79,26 +83,49 @@ private[shuffle] class UnsafeRowReceiver(
override def call(): UnsafeRowReceiverMessage = queues(writerId).take()
}

// Initialize by submitting tasks to read the first row from each writer.
(0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId)))

/**
* In each call to getNext(), we pull the next row available in the completion queue, and then
* submit another task to read the next row from the writer which returned it.
*
* When a writer sends an epoch marker, we note that it's finished and don't submit another
* task for it in this epoch. The iterator is over once all writers have sent an epoch marker.
*/
override def getNext(): UnsafeRow = {
completion.take().get() match {
case ReceiverRow(writerId, r) =>
// Start reading the next element in the queue we just took from.
completion.submit(completionTask(writerId))
r
// TODO use writerId
case ReceiverEpochMarker(writerId) =>
// Don't read any more from this queue. If all the writers have sent epoch markers,
// the epoch is over; otherwise we need rows from one of the remaining writers.
val writersCompleted = numWriterEpochMarkers.incrementAndGet()
if (writersCompleted == numShuffleWriters) {
finished = true
null
} else {
getNext()
var nextRow: UnsafeRow = null
while (!finished && nextRow == null) {
completion.poll(epochIntervalMs, TimeUnit.MILLISECONDS) match {
case null =>
// Try again if the poll didn't wait long enough to get a real result.
// But we should be getting at least an epoch marker every checkpoint interval.
val writerIdsUncommitted = writerEpochMarkersReceived.zipWithIndex.collect {
case (flag, idx) if !flag => idx
}
logWarning(
s"Completion service failed to make progress after $epochIntervalMs ms. Waiting " +
s"for writers $writerIdsUncommitted to send epoch markers.")

// The completion service guarantees this future will be available immediately.
case future => future.get() match {
case ReceiverRow(writerId, r) =>
// Start reading the next element in the queue we just took from.
completion.submit(completionTask(writerId))
nextRow = r
case ReceiverEpochMarker(writerId) =>
// Don't read any more from this queue. If all the writers have sent epoch markers,
// the epoch is over; otherwise we need to loop again to poll from the remaining
// writers.
writerEpochMarkersReceived(writerId) = true
if (writerEpochMarkersReceived.forall(_ == true)) {
finished = true
}
}
}
}

nextRow
}

override def close(): Unit = {
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.