-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23099][SS] Migrate foreach sink to DataSourceV2 #20951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
a617186
6ddac36
f792d32
117a30a
ebd2580
d62ffa1
a2e9eb2
f2c3408
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,7 +131,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf | |
| .foreach(new TestForeachWriter() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe rename this to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And move this to streaming.sources package similar ConsoleWriterSuite |
||
| override def process(value: Int): Unit = { | ||
| super.process(value) | ||
| throw new RuntimeException("error") | ||
| throw new RuntimeException("ForeachSinkSuite error") | ||
| } | ||
| }).start() | ||
| input.addData(1, 2, 3, 4) | ||
|
|
@@ -141,7 +141,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf | |
| query.processAllAvailable() | ||
| } | ||
| assert(e.getCause.isInstanceOf[SparkException]) | ||
| assert(e.getCause.getCause.getMessage === "error") | ||
| assert(e.getCause.getCause.getCause.getMessage === "ForeachSinkSuite error") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why 3 levels? Can you paste the levels here in the PR comments?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [info] org.apache.spark.sql.streaming.StreamingQueryException: Query [id = c80c8860-d4f5-47c6-9a2b-33b5172e1735, runId = 81acd408-9028-41ee-9349-866ae2d67615] terminated with exception: Writing job aborted. [info] Cause: org.apache.spark.SparkException: Writing job aborted. [info] Cause: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.RuntimeException: ForeachSinkSuite error [info] Cause: java.lang.RuntimeException: ForeachSinkSuite error |
||
| assert(query.isActive === false) | ||
|
|
||
| val allEvents = ForeachSinkSuite.allEvents() | ||
|
|
@@ -152,7 +152,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf | |
| // `close` should be called with the error | ||
| val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close] | ||
| assert(errorEvent.error.get.isInstanceOf[RuntimeException]) | ||
| assert(errorEvent.error.get.getMessage === "error") | ||
| assert(errorEvent.error.get.getMessage === "ForeachSinkSuite error") | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename the file accordingly. and Add docs. Clarify why this is not a DataSource but still extends StreamWriteSupport
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, why not make it extend DataSourceV2 for consistency sake? Then it is easier to find all data sources in code by looking at who extends DataSourceV2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a DataSourceV2 - that interface extends it