-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29635][SS] Extract base test suites between Kafka micro-batch sink and Kafka continuous sink #26292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
| } | ||
|
|
||
| test("streaming - write aggregation w/o topic field, with topic option") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We simplify the query because aggregation is not needed for testing origin intention, as well as aggregation is not supported in continuous mode.
| } | ||
| } | ||
|
|
||
| test("streaming - aggregation with topic field and topic option") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
|
|
||
| override protected def defaultTrigger: Option[Trigger] = None | ||
|
|
||
| test("streaming - sink progress is produced") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from KafkaSinkStreamingSuite as it's only relevant to micro-batch.
|
|
||
| override protected def defaultTrigger: Option[Trigger] = Some(Trigger.Continuous(1000)) | ||
|
|
||
| test("generic - write big data with small producer buffer") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from KafkaContinuousSinkSuite. I guess it's not necessarily needed to be put here, but would like to leave it as it was.
| override val streamingTimeout = 30.seconds | ||
| protected val streamingTimeout = 30.seconds | ||
|
|
||
| // TODO: this is set to "Int" since ContinuousMemoryStream cannot deal with String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might file another issue soon for fixing ContinuousMemoryStream to support String. There seems to some hurdles to fix that, so I just defer to the another issue. Please let me know if we want to fix it altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When #26300 is merged, I'll apply TODO describing here and remove TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK-29642 is resolved - I'm making the change.
|
Test build #112838 has finished for PR 26292 at commit
|
|
Hmm... weird error on the build. I'll post to the dev mailing list if it fails again. |
|
retest this, please |
|
Test build #112839 has finished for PR 26292 at commit
|
|
Yeah, weird issue. I'm going to file a PR soon and will take a look whether this comes there as well... |
|
retest this, please |
|
Test build #112853 has finished for PR 26292 at commit
|
|
cc. @tdas @zsxwing @jose-torres @gaborgsomogyi |
|
Test build #112869 has finished for PR 26292 at commit
|
| */ | ||
| def assertExceptionLowercaseMsg(exception: Throwable, msg: String): Unit = { | ||
| assertExceptionMsgInternal(exception, msg) { case (exMsg, m) => | ||
| exMsg.toLowerCase(Locale.ROOT).contains(m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A boolean argument saying whether you want case-insensitive matching would be cleaner.
Or maybe you don't even need the argument, and just make the existing method case-insensitive... (just call toLowerCase on the expected message too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I also agree it wouldn't matter if we change existing one to case-insensitive. Will apply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking some of the callers, there're some checks on class name or so which is still good to have case-sensitive matching. I'll add the parameter.
|
Test build #113292 has finished for PR 26292 at commit
|
| def assertExceptionMsg(exception: Throwable, msg: String): Unit = { | ||
| def assertExceptionMsg(exception: Throwable, msg: String, ignoreCase: Boolean = false): Unit = { | ||
| def contain(msg1: String, msg2: String): Boolean = { | ||
| if (ignoreCase) msg1.toLowerCase(Locale.ROOT).contains(msg2.toLowerCase(Locale.ROOT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use braces in multi-line conditions.
55860d6 to
e4e36ed
Compare
|
The last commit applies the change of SPARK-29642 to simplify the code. |
|
Test build #113342 has finished for PR 26292 at commit
|
|
Merging to master. |
|
Thanks for reviewing and merging! |
What changes were proposed in this pull request?
This patch leverages V2 continuous memory stream to extract tests from Kafka micro-batch sink suite and continuous sink suite and deduplicate them. These tests are basically doing the same, except how to run and verify the result.
Why are the changes needed?
We no longer have same tests spotted on two places - brings 300 lines deletion.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.