Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1d6b718
continuous shuffle read RDD
jose-torres May 15, 2018
b5d1008
docs
jose-torres May 17, 2018
af40769
Merge remote-tracking branch 'apache/master' into readerRddMaster
jose-torres May 17, 2018
46456dc
fix ctor
jose-torres May 17, 2018
2ea8a6f
multiple partition test
jose-torres May 17, 2018
955ac79
unset task context after test
jose-torres May 17, 2018
8cefb72
conf from RDD
jose-torres May 18, 2018
f91bfe7
endpoint name
jose-torres May 18, 2018
2590292
testing bool
jose-torres May 18, 2018
859e6e4
tests
jose-torres May 18, 2018
b23b7bb
take instead of poll
jose-torres May 18, 2018
97f7e8f
add interface
jose-torres May 18, 2018
de21b1c
clarify comment
jose-torres May 18, 2018
7dcf51a
multiple
jose-torres May 18, 2018
ad0b5aa
writer with 1 reader partition
jose-torres May 25, 2018
c9adee5
docs and iface
jose-torres May 25, 2018
63d38d8
Merge remote-tracking branch 'apache/master' into writerTask
jose-torres May 25, 2018
331f437
increment epoch
jose-torres May 25, 2018
f3ce675
undo oop
jose-torres May 25, 2018
e0108d7
make rdd loop
jose-torres May 25, 2018
f400651
remote write RDD
jose-torres May 25, 2018
1aaad8d
rename classes
jose-torres May 25, 2018
59890d4
combine suites
jose-torres May 25, 2018
af1508c
fully rm old suite
jose-torres May 25, 2018
65837ac
reorder tests
jose-torres May 29, 2018
a68fae2
return future
jose-torres May 31, 2018
98d55e4
finish getting rid of old name
jose-torres May 31, 2018
e6b9118
synchronous
jose-torres May 31, 2018
629455b
finish rename
jose-torres May 31, 2018
cb6d42b
add timeouts
jose-torres Jun 13, 2018
59d6ff7
unalign
jose-torres Jun 13, 2018
f90388c
add note
jose-torres Jun 13, 2018
4bbdeae
parallel
jose-torres Jun 13, 2018
e57531d
fix compile
jose-torres Jun 13, 2018
cff37c4
fix compile
jose-torres Jun 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
finish getting rid of old name
  • Loading branch information
jose-torres committed May 31, 2018
commit 98d55e4190d42ca651f7e620aa72c35c3f4b335c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class ContinuousShuffleReadPartition(
val env = SparkEnv.get.rpcEnv
val receiver = new RPCContinuousShuffleReader(
queueSize, numShuffleWriters, epochIntervalMs, env)
val endpoint = env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID()}", receiver)
val endpoint = env.setupEndpoint(s"RPCContinuousShuffleReader-${UUID.randomUUID()}", receiver)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get the query run id here? It would be helpful to debug if the endpoint name contains the query run id and partition id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It requires a reasonable amount of extra code. As mentioned, this is not the final shuffle mechanism (and I intend to have the TCP-based shuffle ready to go in the next Spark release).


TaskContext.get().addTaskCompletionListener { ctx =>
env.stop(endpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.util.NextIterator

/**
* Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
* Messages for the RPCContinuousShuffleReader endpoint. Either an incoming row or an epoch marker.
*
* Each message comes tagged with writerId, identifying which writer the message is coming
* from. The receiver will only begin the next epoch once all writers have sent an epoch
* marker ending the current epoch.
*/
private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable {
private[shuffle] sealed trait RPCContinuousShuffleMessage extends Serializable {
def writerId: Int
}
private[shuffle] case class ReceiverRow(writerId: Int, row: UnsafeRow)
extends UnsafeRowReceiverMessage
private[shuffle] case class ReceiverEpochMarker(writerId: Int) extends UnsafeRowReceiverMessage
extends RPCContinuousShuffleMessage
private[shuffle] case class ReceiverEpochMarker(writerId: Int) extends RPCContinuousShuffleMessage

/**
* RPC endpoint for receiving rows into a continuous processing shuffle task. Continuous shuffle
Expand All @@ -55,7 +55,7 @@ private[shuffle] class RPCContinuousShuffleReader(
// Note that this queue will be drained from the main task thread and populated in the RPC
// response thread.
private val queues = Array.fill(numShuffleWriters) {
new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
new ArrayBlockingQueue[RPCContinuousShuffleMessage](queueSize)
}

// Exposed for testing to determine if the endpoint gets stopped on task end.
Expand All @@ -66,7 +66,7 @@ private[shuffle] class RPCContinuousShuffleReader(
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case r: UnsafeRowReceiverMessage =>
case r: RPCContinuousShuffleMessage =>
queues(r.writerId).put(r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line may block PRC threads and cause some critical RPC messages delayed. In addition, if the reader fails, this line may block forever if the queue is full.

I'm okey with this right now since it's an experimental feature. Could you create a SPARK ticket and add a TODO here to comment the potential issue so that we won't forget this issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what a critical RPC message is in this context. This line is intended to block forever if the queue is full; the receiver should not take any action or accept any other messages until the queue stops being full.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All RPC messages inside Spark are processed in a shared fixed thread pool, hence we cannot run blocking calls inside a RPC thread.

I think we need to design a backpressure mechanism in future fundamentally because a receiver cannot block a sender sending data. For example, even if we block here, we still cannot prevent the sender sending data and they will finally fulfill the TCP buffer. We cannot just count on TCP backpressure here as we need to use the same TCP connection in order to support thousands of machines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very strange characteristic for an RPC framework.

I don't know what backpressure could mean other than a receiver blocking a sender from sending more data. In any case, the final shuffle mechanism isn't going to use the RPC framework, so I added a reference to it. (We can discuss in a later PR whether we want to leave this mechanism lying around or remove it once we're confident the TCP-based one is working.)

context.reply(())
}
Expand All @@ -77,10 +77,10 @@ private[shuffle] class RPCContinuousShuffleReader(
private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false)

private val executor = Executors.newFixedThreadPool(numShuffleWriters)
private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
private val completion = new ExecutorCompletionService[RPCContinuousShuffleMessage](executor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to implement round-robin here? Otherwise, using an array of queries + a thread pool can be just replaced with a blocking queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot be. There's a deadlock scenario where the queue is filled with records from epoch N before all writers have sent the marker for epoch N - 1.


private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] {
override def call(): UnsafeRowReceiverMessage = queues(writerId).take()
private def completionTask(writerId: Int) = new Callable[RPCContinuousShuffleMessage] {
override def call(): RPCContinuousShuffleMessage = queues(writerId).take()
}

// Initialize by submitting tasks to read the first row from each writer.
Expand Down