-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24235][SS] Implement continuous shuffle writer for single reader partition. #21428
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #91131 has finished for PR 21428 at commit
|
|
Test build #91132 has finished for PR 21428 at commit
|
|
Test build #91135 has finished for PR 21428 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, but needs a few improvements.
| import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochTracker} | ||
|
|
||
| /** | ||
| * An RDD which continuously writes epochs from its child into a continuous shuffle. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: writes epoch data
| var prev: RDD[UnsafeRow], | ||
| outputPartitioner: Partitioner, | ||
| endpoints: Seq[RpcEndpointRef]) | ||
| extends RDD[Unit](prev) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix indent.
| * @param endpoints The [[UnsafeRowReceiver]] endpoints to write to. Indexed by partition ID within | ||
| * outputPartitioner. | ||
| */ | ||
| class UnsafeRowWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this PR and prev PRs, I think the names UnsafeRowWriter and UnsafeRowReader are not right. The basic interfaces ContinuousShuffleReader/Writer take UnsafeRows, hence that's not unique to this implementation (that is, all implementation of these interfaces will read/write UnsafeRows). What's unique is that this implementation uses RPC mechanism to read/write. So it may be more accurate to name them RPCContinuousShuffleReader/Writer, or something like that.
| import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
|
|
||
| /** | ||
| * A [[ContinuousShuffleWriter]] sending data to [[UnsafeRowReceiver]] instances. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thought, not something that needs to be done now. But it might be overall cleaner if the ContinuousShuffleWriter and Reader are coupled together in a joint interface. This is because each writer implementation is always tied to a specific reader implementation, so they are always coupled together. Consider something like this.
trait ContinuousShuffleManager {
def createWriter(writerId: Int, numReaders: Int): ContinuousShuffleWriter
def createReader(readerId: Int, numWriters: Int): ContinuousShuffleReader
}
I am just guessing that the params on the createX interfaces, I might be missing something. But I feel that a small set of params should be sufficient for any implementation figure out everything else. Also, other management/control layer stuff will go into the manager implementation. Like, for example, if the writers and readers need to exchange initial setup information (e.g. RPC endpoint details) through the driver, then the implementation of that would go into the manager.
Think about it as your building out rest of the architecture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
| * Trait for writing to a continuous processing shuffle. | ||
| */ | ||
| trait ContinuousShuffleWriter { | ||
| def write(epoch: Iterator[UnsafeRow]): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think its the right interface. The ContinuousShuffleWriter interface should be for writing the shuffled rows. The implementation should not be responsible for actually deciding partitions (i.e. outputPartitioner.getPartition(row)), as you dont want to re-implement the partitioning in every implementation. So I think the interface should be def write(row: UnsafeRow, partitionId: Int)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better encapsulation to re-implement the partitioning in every ContinuousShuffleWriter implementation than to re-implement it in every ContinuousShuffleWriter user. (Note that the non-continuous ShuffleWriter has precedent for this: it uses the same interface, and all implementations of ShuffleWriter do re-implement partitioning.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. That's fair.
| EpochTracker.incrementCurrentEpoch() | ||
| } | ||
|
|
||
| Iterator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you dont really need a RDD here, you just need an action. You are consuming an iterator and returning nothing... that exactly like a rdd.foreachPartition. It may be so that wrapping it in this RDD is cleaner in the bigger picture, but I am unable to judge without having the bigger picture in mind (bigger picture = how are these Continuous*RDDs going to be create by SQL SparkPlan, and executed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly just did this to mirror ContinuousWriteRDD, which itself mirrored WriteToDataSourceV2Exec returning an empty RDD. We can take it out of the current PR - it's not being used anywhere yet, and I agree that where it ends up being used will determine the right interface.
| import org.apache.spark.sql.streaming.StreamTest | ||
| import org.apache.spark.sql.types.{DataType, IntegerType} | ||
|
|
||
| class ContinuousShuffleSuite extends StreamTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Merged these tests into the earlier test suite. Name the combined one appropriately.
|
addressed comments |
|
Test build #91174 has finished for PR 21428 at commit
|
| TaskContext.unset() | ||
| ctx = null | ||
| super.afterEach() | ||
| test("one epoch") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i generally put the simplest test first (likely to be the reader tests since they dont depend on writer) and the more complex, e2e-ish tests later (writers since they needs readers).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reordered.
| eventually(timeout(streamingTimeout)) { | ||
| assert(receiver.asInstanceOf[RPCContinuousShuffleReader].stopped.get()) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there isnt a test where a RPCContinuousShuffleWriter writes to multiple reader endpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline and above - this is a deliberate limitation of the PR.
| * partition ID within outputPartitioner. | ||
| */ | ||
| class RPCContinuousShuffleWriter( | ||
| writerId: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to partitionId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry that partitionId is ambiguous with the partition to which the shuffle data is being written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok makes sense.
| def write(epoch: Iterator[UnsafeRow]): Unit = { | ||
| while (epoch.hasNext) { | ||
| val row = epoch.next() | ||
| endpoints(outputPartitioner.getPartition(row)).ask[Unit](ReceiverRow(writerId, row)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the case where the send fails? the result seem to be ignored here..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @zsxwing
It's my understanding that the RPC framework guarantees messages will be sent in the order that they're ask()ed, and that it's therefore not possible for a single row to fail to be sent while the ones before and after it succeed. If this is the case, then we don't need to handle it here - the query will just start failing to make progress.
If it's not the case, we'll need a more clever solution. Maybe have the epoch marker message contain a count for the number of rows that are supposed to be in the epoch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A reliable channel (first case) seems like a requirement for correctness. In that case I think the query can just be restarted from the last successful epoch as soon as a failure is detected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline with @zsxwing. It's actually not valid to be sending these async at all - the framework will retry e.g. connection failures on the next row, so we can end up committing an epoch before we detect that a row within it has failed to send. We need to just make these synchronous.
This will incur a slight round-trip latency penalty for now, but as mentioned earlier the TCP-based shuffle is what we actually plan to be production quality. I'm hoping to begin work on it after I finish one more PR on top of this. So I think the latency should be fine for now.
| outputPartitioner: Partitioner, | ||
| endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter { | ||
|
|
||
| if (outputPartitioner.numPartitions != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to disable it ? this should work rt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe so, but there's no way to test whether it will work until we implement the scheduling support for distributing the addresses of each of the multiple readers.
|
Test build #91278 has finished for PR 21428 at commit
|
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Only minor and nits.
| ReceiverEpochMarker(0), | ||
| ReceiverRow(0, unsafeRow(111)) | ||
| ) | ||
| private implicit def unsafeRow(value: Int) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: is there a reason to rearrange functions, this and below twos? Looks like they're same except changing this function to implicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And where it leverages the implicit attribute of this method? I'm not sure it is really needed, but I'm review on Github page so I might be missing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writer.write(Iterator(1, 2, 3)) and such leverages the implicit.
| readRowThread.join() | ||
| } | ||
|
|
||
| test("multiple writer partitions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we want to have another test which covers out-of-order epoch between writers (if that's valid case for us), or rely on the test in ContinuousShuffleReadRDD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think "reader epoch only ends when all writer partitions write it" is a sufficient test for that.
| * source tasks have sent one. | ||
| */ | ||
| private[shuffle] class UnsafeRowReceiver( | ||
| private[shuffle] class RPCContinuousShuffleReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If we need the rename here, how about other messages name and comments?
https://github.com/apache/spark/pull/21428/files#diff-4072457048f805637bfce2c779608756R29
https://github.com/apache/spark/pull/21428/files#diff-4072457048f805637bfce2c779608756R35
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Caught what I think are the rest.
|
Test build #91368 has finished for PR 21428 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Left some minor comments.
| override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { | ||
| case r: UnsafeRowReceiverMessage => | ||
| case r: RPCContinuousShuffleMessage => | ||
| queues(r.writerId).put(r) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line may block PRC threads and cause some critical RPC messages delayed. In addition, if the reader fails, this line may block forever if the queue is full.
I'm okey with this right now since it's an experimental feature. Could you create a SPARK ticket and add a TODO here to comment the potential issue so that we won't forget this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what a critical RPC message is in this context. This line is intended to block forever if the queue is full; the receiver should not take any action or accept any other messages until the queue stops being full.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All RPC messages inside Spark are processed in a shared fixed thread pool, hence we cannot run blocking calls inside a RPC thread.
I think we need to design a backpressure mechanism in future fundamentally because a receiver cannot block a sender sending data. For example, even if we block here, we still cannot prevent the sender sending data and they will finally fulfill the TCP buffer. We cannot just count on TCP backpressure here as we need to use the same TCP connection in order to support thousands of machines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very strange characteristic for an RPC framework.
I don't know what backpressure could mean other than a receiver blocking a sender from sending more data. In any case, the final shuffle mechanism isn't going to use the RPC framework, so I added a reference to it. (We can discuss in a later PR whether we want to leave this mechanism lying around or remove it once we're confident the TCP-based one is working.)
| /** | ||
| * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. | ||
| * | ||
| * @param writerId The partition ID of this writer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't use vertical alignment as they will introduce unnecessary changes in future.
| endpoints(outputPartitioner.getPartition(row)).askSync[Unit](ReceiverRow(writerId, row)) | ||
| } | ||
|
|
||
| endpoints.foreach(_.askSync[Unit](ReceiverEpochMarker(writerId))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use Future.sequence to send messages in parallel, such as
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import org.apache.spark.util.ThreadUtils
val futures = endpoints.map(_.ask[Unit](ReceiverEpochMarker(writerId)))
implicit val ec = ThreadUtils.sameThread
ThreadUtils.awaitResult(Future.sequence(futures), Duration.Inf)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but I don't think there's any benefit to doing so. We need to sequence the messages across epochs too, so there's little parallelization available that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand, the code here is to send a ReceiverEpochMarker to each endpoint and wait for all of them to response. You can send ReceiverEpochMarkers in parallel rather than send and wait one by one.
| val endpoint = env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID()}", receiver) | ||
| val receiver = new RPCContinuousShuffleReader( | ||
| queueSize, numShuffleWriters, epochIntervalMs, env) | ||
| val endpoint = env.setupEndpoint(s"RPCContinuousShuffleReader-${UUID.randomUUID()}", receiver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to get the query run id here? It would be helpful to debug if the endpoint name contains the query run id and partition id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It requires a reasonable amount of extra code. As mentioned, this is not the final shuffle mechanism (and I intend to have the TCP-based shuffle ready to go in the next Spark release).
|
|
||
| // Once we write the epoch the thread should stop waiting and succeed. | ||
| writer.write(Iterator(1)) | ||
| readRowThread.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's better to add a timeout here, such as readRowThread.join(streamingTimeout.toMillis). Without a timeout, if there is a bug causing this hang, we will need to wait until the jenkins build timeout, which is much longer.
| } | ||
|
|
||
| writers(0).write(Iterator()) | ||
| readEpochMarkerThread.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
|
||
| private val executor = Executors.newFixedThreadPool(numShuffleWriters) | ||
| private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) | ||
| private val completion = new ExecutorCompletionService[RPCContinuousShuffleMessage](executor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to implement round-robin here? Otherwise, using an array of queries + a thread pool can be just replaced with a blocking queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It cannot be. There's a deadlock scenario where the queue is filled with records from epoch N before all writers have sent the marker for epoch N - 1.
|
Test build #91745 has finished for PR 21428 at commit
|
|
Test build #91748 has finished for PR 21428 at commit
|
|
Test build #91742 has finished for PR 21428 at commit
|
|
retest this please |
|
Test build #91772 has finished for PR 21428 at commit
|
|
LGTM pending tests |
|
Test build #91781 has finished for PR 21428 at commit
|
|
Thanks! Merging to master. |
What changes were proposed in this pull request?
https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit
Implement continuous shuffle write RDD for a single reader partition. (I don't believe any implementation changes are actually required for multiple reader partitions, but this PR is already very large, so I want to exclude those for now to keep the size down.)
How was this patch tested?
new unit tests