-
Notifications
You must be signed in to change notification settings - Fork 1
Initial draft of Streaming Dataframe infrastructure #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Conflicts: sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
c6539b4 to
b1c1dc6
Compare
2084f39 to
9205068
Compare
64a6b42 to
9205068
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: volatile and private[streaming]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the user reuse a Source? E.g., inputData1.toDS().union(inputData1.toDS())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that works already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are the basic abstractions of streaming stuff like Offset in sql.execution package? From user point of view, its not intuitive to have "execution" in the middle, and painful to import that deep a package name.
I think basic abstractions like Offset, Source, Sink should be in sql.streaming, and things like StreamExecution can be in sql.execution.streaming or sql.streaming.execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought was to by default put everything in execution which is hidden from scala doc / etc. We can move stuff out as we decide to make it public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am cool with that for now.
|
High level question. Should I care of about class scopes and stuff right now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore, it
|
Test FAILed. |
|
Jenkins, retest this please. |
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
|
@marmbrus, looks like we have style errors: https://amplab.cs.berkeley.edu/jenkins/job/spark-streaming-df-test/11/console |
|
Wait, I can fix this myself! I'll push a fix now. |
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But its not just about serialization. StreamProgress is a map of Source --> Offset. And only offset should be serialized and deserialized, not the sources. Now at the time of recovery, if the sink deserialized only the offsets, how will create a StreamProgress (that is the map of source --> offset) with that?
There needs to be separate class which is just a seq of Offsets, which is passed on to the Sink. The ordering of the offsets when putting in the seq will be deterministic, so that the ordering is preserved after recovery. And as long as the sources can ordered deterministically, the recovered offsets can be matched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While that is a reasonable solution, I'm not sure thats exactly how we want to do it. Here are the requirements as I see them:
- User created Offsets should be serializable
- A collection of offsets needs to be serializable in a way that we can reassociate them with their sources on deserialization
- There should be some class (probably StreamProgress) that acts as a container and makes this opaque to Sinks
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
|
Merged build finished. Test FAILed. |
|
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to call this getOffset to keep consistent with verb-noun format. Especially since this is not expected to be a static return value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we pretty much never use get and this function should not have side-effects. If anything I think I'd change getNextBatch to fetchNextBatch to make it clear that it can do work like caching if needed.
|
Merged build finished. Test PASSed. |
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is invalid, why not make currentProgress return Option[CompositeOffset]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I thought about that. I wasn't sure how much I wanted to expose to the user. It might actually be valid someday to save a different type of offset into a sink.
|
Going to merge and address further comments in a follow-up. |
Initial draft of Streaming Dataframe infrastructure
This PR adds the initial infrastructure for specifying streaming Sources, Sinks and executing queries as new data arrives. Additionally, it adds a test framework that can be used to test Sources and execution. The goal here is to get an initial version of the API committed to a shared branch so that we can continue to iterate on the various APIs.