-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21765] Set isStreaming on leaf nodes for streaming plans. #18973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #80794 has finished for PR 18973 at commit
|
|
Test build #80796 has finished for PR 18973 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add docs to explain what isStreaming is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. (I think this is a correct summary?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure this is same as the updated isStreaming docs (see my other comments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than change this, just use the 3 param version of LocalRelation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just isStreaming is fine. isStreaming = isStreaming is overkill. Its only useful when the value is a constant. E.g. isStreaming = true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's necessary here because there are two other default arguments in the constructor.
|
Test build #80798 has finished for PR 18973 at commit
|
|
Test build #80811 has finished for PR 18973 at commit
|
60a3586 to
28c2f4b
Compare
|
Test build #80864 has finished for PR 18973 at commit
|
|
Test build #80863 has finished for PR 18973 at commit
|
|
Test build #80867 has finished for PR 18973 at commit
|
|
test this please. |
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the scala docs in LogicalPlan.isStreaming to say that isStreaming = has data from a streaming source (i.e. need not have a streaming source).
Accordingly update other comments defined on isStreaming in the leaves.
|
We should not require |
It's now redundant with LogicalPlan.isStreaming.
|
Addressed comments from @tdas |
|
|
||
| logDebug( | ||
| s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") | ||
| logDebug({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this a separate function. It's weird to have so much code inside logDebug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this does not need to be so complicated. See how I have disabled UninterruptedOperationChecker to do a collect() in FileStreamSourceSuite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| override def getBatch(start: Option[Offset], end: Offset): DataFrame = { | ||
| val startOffset = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) + 1 | ||
| spark.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a") | ||
| val ds = new Dataset[java.lang.Long]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cant you use createInternalDataFrame out here?
Also add a comment about the fact you are trying to ensure isStreaming is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You dont even need Range LogicalPlan. Since its for debugging, you can directly create a DF from local seq startOffset to endOffset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried addressing this a few different ways, and I can't come up with anything cleaner than the current solution. Directly creating a DF doesn't set the isStreaming bit, and a bunch of copying and casting is required to get it set; using LocalRelation requires explicitly handling the encoding of the rows, since LocalRelation requires InternalRow input.
| private[sql] | ||
| def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = { | ||
| sparkSession.internalCreateDataFrame(catalystRows, schema) | ||
| def internalCreateDataFrame(catalystRows: RDD[InternalRow], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The correct code style for multiline param definition is
def function(
param1: type1, // double indent, i.e. 4 spaces
param2: type2)
See the indentation section in http://spark.apache.org/contributing.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
Test build #80939 has finished for PR 18973 at commit
|
|
Test build #80942 has finished for PR 18973 at commit
|
|
Test build #80943 has finished for PR 18973 at commit
|
|
Test build #80952 has finished for PR 18973 at commit
|
| assert(progress.sources(0).numInputRows === 10) | ||
| } | ||
|
|
||
| test("[SPARK-19690] stream join with aggregate batch query succeeds") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this to the StreamingAggregationSuite? Because that suite is closely related to this aggregation bug. And I would rename it to "SPARK-19690: do not convert batch aggregation in streaming query to streaming aggregation"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I would actually test whether the output is correct or not. See other tests in StreamingAggregationSuite
|
One comment regarding location of the aggregation test. Other than that LGTM. |
|
LGTM pending tests. |
|
Test build #81008 has finished for PR 18973 at commit
|
|
Test build #81010 has finished for PR 18973 at commit
|
|
Merging this to master. Thank you @Joseph-Torres ! |
| numSlices: Option[Int], | ||
| output: Seq[Attribute]) | ||
| output: Seq[Attribute], | ||
| override val isStreaming: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can a Range have data from a streaming source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's necessarily a reason it shouldn't be able to; streaming sources are free to define getBatch() however they'd like.
Right now the only source actually doing that is a fake source in StreamSuite.
What changes were proposed in this pull request?
All streaming logical plans will now have isStreaming set. This involved adding isStreaming as a case class arg in a few cases, since a node might be logically streaming depending on where it came from.
How was this patch tested?
Existing unit tests - no functional change is intended in this PR.