-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24039][SS] Do continuous processing writes with multiple compute() calls #21200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| override def run(): Unit = { | ||
| try { | ||
| val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) | ||
| for (i <- currentEpoch to newEpoch - 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the diff between "new" and "current" epoch be more than one ? This means the reader missed some epochs and maybe then it should trigger a recovery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's any need to trigger a recovery. The reader can (and currently will) just treat the epochs it missed as empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm missing. My understanding is that the situation (gap bigger than 1) should only occur when array queue gets full and blocks epoch thread to put marker more than trigger interval. Any other situations (error cases) should just crash the whole query so that recovery happens from the scratch: that's why we can ignore the missing case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes the queue getting full can be one, I think trigger interval < executorPollIntervalMs could be another. Anyways I guess it would just cause the reader to report the same offsets for multiple epochs which may be ok (but not desirable) since it will cause the epoch coordinator to block the other epochs from committing and commit one after the other when the commit message arrives for the missing partition.
Not sure if there are any checks to ensure trigger interval > executorPollIntervalMs. Maybe this this should be added or executorPollIntervalMs should be calculated based on trigger interval.
I don't know the flow enough to understand what happens when an executor crashes - how the epoch gets reset and the newly launched tasks continue from the last successful commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can also happen if GetCurrentEpoch just takes a long time for some reason.
I agree it'd make sense to add a check to ensure trigger interval is greater than executorPollIntervalMs. I'd even argue for some small multiplier on top of that poll interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly suggest adding more docs here to explain this logic.
| readerForPartition.currentOffset)) | ||
| readerForPartition.currentEpoch += 1 | ||
| currentEntry = null | ||
| false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if the iterator hack would lead to more hacks when there are multiple stages so that the intermediate stages do not terminate. Is there a plan to change this approach later (Say something like an unbounded RDD that never terminates but passes the epoch markers along with the data in the pipeline) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my perspective your proposal is already the current behavior. The RDD will never terminate, and the epoch markers are propagated in the pipeline, represented as the end of each successive compute() call.
It's true that RDD.compute() will terminate, but I wouldn't expect this to cause any problems. The compute() implementation for the writer node (and eventually the node at the top of every continuous task) just calls child.compute() again instead of letting the task terminate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, I'm still not familiar with Spark's internal so would need help to understand how it is guaranteed to work. Thanks in advance for any helps.
As far as I understand the comment from @jose-torres, it sounds like RDD.compute() is OK to be called multiple times (Is there such case of usage in batch/streaming before?), and works for both case: same stage as well as multi-stages. It also guarantees that any tasks are not terminated and resubmitted again while compute() are called multiple times.
Do I understand correctly? If above things are guaranteed, the approach looks good to me.
We might also want to check how states will be snapshotted in this approach, but I'm OK to delegate it to its own issue (existing one).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with any specific usages of RDD.compute() in this way, but this isn't something where the Spark framework has a chance to intervene. When the writer calls compute() on its child, the compute() will bubble down the tree until it reaches the reader, and those are the only two places we're touching. The intermediate nodes shouldn't notice or care that compute() has been called before.
Streaming stateful operators snapshot at the end of their compute(), so this works out well in that case. The only thing we'll need to change there is how a stateful operator determines the current epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is not that trivial to discuss in this PR, because it concerns about multiple stages which current continuous mode is trying to avoid. We may want to continue this discussion from dev. mailing list with design doc for SPARK-24036.
Btw, looks like RDD works just opposite way what another streaming frameworks work: downstream requests records from the upstream(s) instead of upstream flows records through the downstream(s).
If my understanding is right, there's always a batch here by nature, regardless of its size. New ideas like "unbounded RDD" also may not work with multiple stages. We might be able to put major changes to the core to handle "epoch marker" naturally, but it is going to be another hack which both batch and micro-batch don't need to have. Moreover, by nature, RDD.compute() needs to be terminated to handle others like state checkpoint as well.
That's the reason I asked about "the goal of continuous mode" to see the direction/plan. If the goal is targeted to "streaming", we might eventually end up with having another execution model. If the goal is targeted to "heavily latency-oriented optimized micro-batch", what we might want to focus is allowing tasks to be alive and handle multiple batches. I'm not sure it is possible without major changes (sadly not familiar with internal), but once we address it, it can be just "micro-batch" mode again, but with major improvements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would like to make clear about this to avoid miscommunication: I never intended to say about any points that something is superior. Just 2 cents about conceptual differences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's any expectation here that upstream systems will fill as many records as possible. DataReaderThread pushes rows into the queue (and thus into the ContinuousDataSourceRDD iterator) as soon as the connector produces them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will need a bigger discussion to understand the pros/cons of iterator approach and the push vs pull models. In streaming the sources continuously generate data so has traditionally been push systems. It may not be best to request the source for data only when the downstream requires it so thats why theres need for queues at the reader effectively making it a kind of push system. I think we can take this discussion outside the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's any expectation here that upstream systems will fill as many records as possible.
ContinuousDataSourceRDD works like push model, but when we consider multiple stages, unlike new source and sink, intermediate stages don't know about the continuous mode and try to keep working with the pull model.
Btw, totally agreed that the discussion here is going to be much bigger (and maybe out of topic) than the PR proposes to fix. We could discuss this again via SPARK-24036.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. The doc for that JIRA https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE contains a proposal for cross-stage communication, so let's discuss there if there's some additional streaming property that needs to be satisfied.
|
Jenkins, add to whitelist |
|
ok to test |
| override def hasNext(): Boolean = { | ||
| while (currentEntry == null) { | ||
| if (context.isInterrupted() || context.isCompleted()) { | ||
| currentEntry = (null, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is effectively no-op given that it still pulls from queue again. So better to clarify the behavior and fix it.
I know this code block is just same as of now so it might be out of topic. If we would like to address it from other issue, I'm happy to file an issue and also work on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a no-op because it hooks into part of the writer. Added a comment clarifying what's happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant current logic still call queue.poll again instead of using assigned epoch marker value, even if it matches the if statement. It looks like unintended, right?
We can arrange the logic to fail-fast on exception cases, and if-else to fix that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! Yeah, that's definitely not intended.
I don't want to fully rearrange, since we should still enable clean shutdown if the data reader or epoch poll threads have been shutdown from the interrupt earlier. But I can fix the logic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I also missed. Thanks for correcting. :)
| readerForPartition.currentOffset)) | ||
| readerForPartition.currentEpoch += 1 | ||
| currentEntry = null | ||
| false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, I'm still not familiar with Spark's internal so would need help to understand how it is guaranteed to work. Thanks in advance for any helps.
As far as I understand the comment from @jose-torres, it sounds like RDD.compute() is OK to be called multiple times (Is there such case of usage in batch/streaming before?), and works for both case: same stage as well as multi-stages. It also guarantees that any tasks are not terminated and resubmitted again while compute() are called multiple times.
Do I understand correctly? If above things are guaranteed, the approach looks good to me.
We might also want to check how states will be snapshotted in this approach, but I'm OK to delegate it to its own issue (existing one).
| dataReaderThread.setDaemon(true) | ||
| dataReaderThread.start() | ||
|
|
||
| context.addTaskCompletionListener(_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to just call close if this is visible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
| override def run(): Unit = { | ||
| try { | ||
| val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) | ||
| for (i <- currentEpoch to newEpoch - 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm missing. My understanding is that the situation (gap bigger than 1) should only occur when array queue gets full and blocks epoch thread to put marker more than trigger interval. Any other situations (error cases) should just crash the whole query so that recovery happens from the scratch: that's why we can ignore the missing case.
| // Interruption is how continuous queries are ended, so accept and ignore the exception. | ||
| // Interruption is how continuous queries are ended, so accept and ignore the exception. | ||
| case cause: Throwable => | ||
| logError(s"Data source writer $writer is aborting.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain the needs of additional handling? Since ContinuousWriteRDD is still handling the error case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I rebased wrong. This change wasn't meant to be here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(It was in an older version of WriteToContinuousDataSourceExec, and has since been removed.)
| }.toArray | ||
| } | ||
|
|
||
| // Initializes the per-task reader if not already done, and then produces the UnsafeRow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For class docs and method docs we use /** ... */
See Code documentation style in http://spark.apache.org/contributing.html
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the refactoring. I still have a few more comments. Here are the high-level points.
-
Failure flags not needed, checking failureReason is sufficient.
-
Constructors paramters in DataReaderThread and EpochMarkerGenerator can be removed. That was the point of putting it inside the ContinuousQueuedDataReader.
-
The ContinuousQueuedDataReader is pretty complex. What is its test coverage? I doubt the normal testing with continuous queries have full branch coverage of all the conditions in it and its subclasses.
| queue: BlockingQueue[ContinuousRecord], | ||
| context: TaskContext, | ||
| failedFlag: AtomicBoolean) | ||
| extends Thread with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think should ideally just extend Runnable
|
|
||
| /** | ||
| * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with | ||
| * (null, null) when a new epoch marker arrives. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update docs, not (null, null) any more.
| null | ||
| // real row | ||
| case ContinuousRow(row, offset) => | ||
| readerForPartition.currentOffset = offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be set by this class? The reader can set it as it returns ContinuousRows. So this can be a publicly read-only field for the RDD to report the current offset.
Actually, if you add lastOffset to the EpochMarker (and remove offset from ContinuousRow since its not needed as the currentOffset is updated internally by the reader), then currentOffset public method is not needed at all. More minimal interface.
| private val reader = factory.createDataReader() | ||
|
|
||
| // Important sequencing - we must get our starting point before the provider threads start running | ||
| var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I commented above, currentOffset does not need to be exposed at all.
| currentEntry = EpochMarker | ||
| } else { | ||
| if (dataReaderFailed.get()) { | ||
| throw new SparkException("data read failed", dataReaderThread.failureReason) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: data -> Data
| } | ||
|
|
||
| object ContinuousDataSourceRDD { | ||
| private[continuous] def getBaseReader(reader: DataReader[UnsafeRow]): ContinuousDataReader[_] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe getContinuousReader
| reader match { | ||
| case r: ContinuousDataReader[UnsafeRow] => r | ||
| case wrapped: RowToUnsafeDataReader => | ||
| wrapped.rowReader.asInstanceOf[ContinuousDataReader[Row]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will a wrapped.rowReader always be an instance ContinuousDataReader?
if not, then this will throw a different kind of error than the error below. That's confusing. Might as well check this explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will always be an instance of ContinuousDataReader if it's being run here.
| TaskContext.setTaskContext(context) | ||
| val baseReader = ContinuousDataSourceRDD.getBaseReader(reader) | ||
| try { | ||
| while (!context.isInterrupted && !context.isCompleted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition can be deduped. Mentioned earlier in this class as well.
| class EpochMarkerGenerator( | ||
| queue: BlockingQueue[ContinuousRecord], | ||
| context: TaskContext, | ||
| failedFlag: AtomicBoolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to pass these, they are available in the enclosing class.
| reader: DataReader[UnsafeRow], | ||
| queue: BlockingQueue[ContinuousRecord], | ||
| context: TaskContext, | ||
| failedFlag: AtomicBoolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to pass these, they are available in the enclosing class.
|
Test build #90074 has finished for PR 21200 at commit
|
|
Test build #90146 has finished for PR 21200 at commit
|
|
Test build #90154 has finished for PR 21200 at commit
|
|
Test build #90155 has finished for PR 21200 at commit
|
sql/core/pom.xml
Outdated
| <groupId>org.mockito</groupId> | ||
| <artifactId>mockito-core</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need this? mockito is already present in test scope for spark/core/pom.xml which is inherited by spark/sql/core test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess not. My IDE reported that I did.
|
Test build #90201 has finished for PR 21200 at commit
|
|
LGTM. Merging to master. |
What changes were proposed in this pull request?
Do continuous processing writes with multiple compute() calls.
The current strategy (before this PR) is hacky; we just call next() on an iterator which has already returned hasNext = false, knowing that all the nodes we whitelist handle this properly. This will have to be changed before we can support more complex query plans. (In particular, I have a WIP jose-torres#13 which should be able to support aggregates in a single partition with minimal additional work.)
Most of the changes here are just refactoring to accommodate the new model. The behavioral changes are:
How was this patch tested?
existing unit tests