-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store #28707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #123447 has finished for PR 28707 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this change is valid based on the fact Spark doesn't store the schema of state (and there's no validation between actual schema and the actual row), but this should be considered as a last resort because of the huge limitations. Safety guards must be placed in front of this - like SPARK-27237, which I think it covers various general issues with providing clearer guide of schema incompatibility between state and the query being run.
| override def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit = { | ||
| if (checkFormat && SQLConf.get.getConf( | ||
| SQLConf.STREAMING_STATE_FORMAT_CHECK_ENABLED) && row != null) { | ||
| if (schema.fields.length != row.numFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method exposes implementation details of UnsafeRow directly. Could we please let UnsafeRow have such check method? UnsafeRow itself is aware of data types so the check method can receive the list of data types and do the assertion by its own.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually that's the first version I did. Since the checking logic is only used for streaming aggregation query and also depends on the streaming config, I choose to put it in StreamingAggregationStateManager, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping we could move the core validation logic to either UnsafeRow itself, or some sort of UnsafeRowUtils, maybe somewhere in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util.
This util function would either return a boolean indicating passed/failed integrity check, or it could return more details. I'd probably go with the former first. It would not do any conf checks -- that's the caller's responsibility. This utility is useful for debugging low-level stuff in general, and would come in handy in both Spark SQL and Structured Streaming debugging.
Then we can call that util function from here, after checking the confs. And the exception throwing logic can be left here too.
| */ | ||
| class InvalidUnsafeRowException | ||
| extends SparkException("The UnsafeRow format is invalid. This may happen when using the old " + | ||
| "version or broken checkpoint file. To resolve this problem, you can try to restart the " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand with the possible root causes and the proposed solutions. The problem comes either schema is incompatible (probably due to the change of the query, or change of the underlying aggregation function) or row is corrupted, which any solution described here can not resolve.
"Old version" here is ambiguous, because there's another semantic of "version" here, state format, which is not expected to introduce such incompatible format issue. Did you see the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments, I rephrase the error message to make it clearer. Yep, there are several ways that can lead to the invalid format and we need to list them all. Done in ee048bc
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val STREAMING_STATE_FORMAT_CHECK_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is misleading - we're only detecting the case from streaming aggregation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW should we have configuration for this, given that this only does essential check which all rows must have been passed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, rename it in ee048bc. Considering it's an extra checking and still have overhead, I keep the feature flag for safety.
|
@xuanyuanking , can you please explain how this will fix the issue where we have changed something in the internal implementation of sum in SPARK-28067, how does that affect previous states and what would be the expected behavior. From a query level, the sum schema is same. Is the checkpoint storing information that is coming from intermediate states. Are we storing unsafe rows from the updateExpression/ or the merge phases of aggregation? |
|
And personally I'd rather do the check in StateStore with additional overhead of reading "a" row in prior to achieve the same in all stateful operations. For streaming aggregations it initializes "two" state stores so the overhead goes to "two" rows, but I don't think the overhead matters much. If we really concern about the overhead of making additional "iterator" or do the validation on early phase (where it might be possible the state store may not be accessed), just have a StateStore wrapper wrapping |
|
@skambha it doesn't fix the issue, it gives a better error message when we hit the issue. |
|
I think this PR and SPARK-27237 are orthogonal, and we should have both. SPARK-27237 is a bit hard to be merged as it changes the checkpoint. We may need more reviews to see if it's future proof (e.g. when we want to support schema evolution of the state store). Anyway, this PR covers the cases that people upgrade from Spark 2.x to 3.x, which is necessary even if we have SPARK-27237. |
|
|
||
| val STREAMING_AGGREGATION_STATE_FORMAT_CHECK_ENABLED = | ||
| buildConf("spark.sql.streaming.aggregationStateFormatCheck.enabled") | ||
| .doc("Whether to detect a streaming aggregation query may try to use an invalid UnsafeRow " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: When true, check if the UnsafeRow from the state store is valid or not when running streaming aggregation queries. This can happen if the state store format has been changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, rephrase in 10a7980.
| .doc("Whether to detect a streaming aggregation query may try to use an invalid UnsafeRow " + | ||
| "in the state store.") | ||
| .version("3.1.0") | ||
| .internal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually put internal() right after buildConf(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done in 10a7980.
Yep, WIP for the integrated test of the state store format invalidation. I will show you the difference with/ this patch on the error message.
Yes, the two approach addresses different sides of this issue, SPARK-27237 require an extra file to keep the schema, which can make the schema checking possible. This one is a guard for random failure or correctness bug. |
rednaxelafx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for taking on this verification! I've used the same technique of checking unsafe row's structural integrity on quite a few occasions and it's been a very useful thing to have in the toolbox.
Having a util function that does this check inside of Spark would be very handy for future low-level debugging / investigations.
I agree with @cloud-fan that this verification feature is completely orthogonal to baking the schema info into the persisted state.
I'd strongly vote for having the schema info as a part of the persisted state instead of only having a blob that we interpret as UnsafeRows without any guardrails. But doing so changes the binary format of the persisted state, so I'd really love to see it as a piece of the puzzle in a long term plan to improve the state store.
The verification proposed in this PR does not make any changes to the binary format, so it could be useful for both Spark master branch and existing releases.
For the UnsafeRow structural integrity guarantees / heuristics, I'd propose the following candidate invariants to consider: given a row: UnsafeRow and a expectedSchema: StructType
schema.fields.length == row.numFieldsshould always be true (already covered in this PR)UnsafeRow.calculateBitSetWidthInBytes(row.numFields) <= row.getSizeInBytesshould always be true. A stricter<should be true if theexpectedSchemacontains at least one field. Not covered in this PR yet- For variable-length fields: if null bit says it's null then don't do anything, else extract
offsetandsize0 <= size < row.getSizeInBytesshould always be true. We can be even more precise than this, where the upper bound ofsizecan only be as big as the variable length part of the row.offsetshould be >= fixed sized part of the rowoffset + sizeshould be within the row bounds (already covered by this PR)- We can make further assumptions on the
UnsafeRowformat, by assuming that iffield1.ordinal < field2.ordinal, thenfield1.offset + field1.size <= field2.offset. This assumes that the fields were written into in left-to-right order, which doesn't have to be the case, but all the write logic I know of in Spark fits this assumption. So this can be considered an optional heuristic.
- For fixed-length fields that are narrower than 8 bytes (boolean / byte / short / int / float), if null bit says it's null then don't do anything, else:
- check if the unused bits in the field are all zeros. The
UnsafeRowWriter'swrite()methods make this guarantee.
- check if the unused bits in the field are all zeros. The
When I did manual debugging, sometimes I'd also check the first couple of characters in a UTF8String from an UnsafeRow and see if the characters make sense as UTF-8. That's not something easily checkable here so I wouldn't suggest that.
If we know the length of the entire buffer of the backing store for UnsafeRows, we should make sure our offset + size never goes beyond that, too.
| def values(store: StateStore): Iterator[UnsafeRow] | ||
|
|
||
| /** Check the UnsafeRow format with the expected schema */ | ||
| def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd like use "verb + noun" names for actions, and "nouns" for properties.
Here it'd be some form of "validate structural integrity". WDYT?
| override def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit = { | ||
| if (checkFormat && SQLConf.get.getConf( | ||
| SQLConf.STREAMING_STATE_FORMAT_CHECK_ENABLED) && row != null) { | ||
| if (schema.fields.length != row.numFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping we could move the core validation logic to either UnsafeRow itself, or some sort of UnsafeRowUtils, maybe somewhere in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util.
This util function would either return a boolean indicating passed/failed integrity check, or it could return more details. I'd probably go with the former first. It would not do any conf checks -- that's the caller's responsibility. This utility is useful for debugging low-level stuff in general, and would come in handy in both Spark SQL and Structured Streaming debugging.
Then we can call that util function from here, after checking the confs. And the exception throwing logic can be left here too.
| val offset = (offsetAndSize >> 32).toInt | ||
| val size = offsetAndSize.toInt | ||
| if (size < 0 || | ||
| offset < UnsafeRow.calculateBitSetWidthInBytes(row.numFields) + 8 * row.numFields || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnsafeRow.calculateBitSetWidthInBytes(row.numFields) + 8 * row.numFields this part is loop invariant. Please hoist it out of the loop manually here. It's the same kind of logic as UnsafeRowWriter's
this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
this.fixedSize = nullBitsSize + 8 * numFields;We may want to use the same or similar names for the hoisted variables.
row.getSizeInBytes on the next line is also loop invariant. Let's also hoist that out.
|
Test build #123477 has finished for PR 28707 at commit
|
+1. How about put the validation code in a new object |
|
Test build #123475 has finished for PR 28707 at commit
|
|
@rednaxelafx Great thanks for the detailed comment and guidance. I'm addressing these comments.
Sure, a separate utils object makes more sense as we want it to be a general validation logic. |
|
Will this be included to Spark 3.0.0? If this is to unblock SPARK-28067 to be included to Spark 3.0.0 then it's OK to consider this first, but if this plans to go to Spark 3.1 then I'm not sure about the priority - are all of you aware that the PR for SPARK-27237 was submitted more than a year ago, and still be considered as later? I still don't get why the proposal is restricting its usage to streaming aggregation, whereas the mechanism is a validation of the UnsafeRow which can be applied to all stateful operations. Let's not to pinpoint the problem we've just seen. Also from my side the overhead of the validation logic looks to be trivial compared to the operations stateful operators will take - we don't do the validation for all rows, even don't sample, just the first one. Unless we have a chance to bring a show-stopper bug in the validation logic (so that we need to provide the way to disable the validation), I'm not seeing the needs of new configuration. |
|
And I think SPARK-27237 doesn't require a sort of "future-proof" which is preferably be done with a thing with risk - it doesn't touch the existing part of checkpoint and simply put the schema information into a new file. If we find a better way to pack the schema information into the checkpoint, we can simply discard/ignore the file or craft a logic to migrate smoothly. No risk on rolling back in future. |
|
Yea we need this PR to unblock backporting SPARK-28067 to 3.0.
What are other stateful operations that use unsafe row? I think we can apply the check everywhere.
This is something we don't know. Adding a flag seems safer.
I'm not saying we shouldn't merge it. I just want to prioritize this PR so that we may be able to include sum correctness bug in 3.0. |
State store itself stores UnsafeRow, hence it applies to everywhere in stateful operations. I'd propose to do it like #28707 (comment) instead of fixing everywhere. |
|
@HeartSaVioR After taking a further look. Instead of dealing with the iterator, how about adding the invalidation for all state store operations in |
Thanks for adding the test. |
It would be nice to see the proposed change by code to avoid misunderstanding, like I proposed in previous comment. (anything including commit in your fork or text comment is OK) I'll try out my alternative (wrapping State Store) and show the code change. Thanks! EDIT: Please deal with interface whenever possible - there're different implementations of state store providers and we should avoid sticking to the specific implementation. |
|
My alternative with wrapping state store is something like below: The example code only checks in get operation, which is insufficient to check "key" row in state. That said, iterator approach still provides more possibility of validation, though the validation of unsafe row itself doesn't have enough coverage of checking various incompatibility issues (Definitely we should have another guards as well) so that's a sort of OK to only cover value side. |
|
All the comments addressed in 1f71563. Thanks for the review! |
|
Test build #123580 has finished for PR 28707 at commit
|
|
Sorry my comment was edited so you may be missed the content, but it is also a sort of pointing out for "pinpointing" - do you think your approach works with other state store providers as well? The root cause isn't bound to the implementation of state store provider but this patch is only addressing HDFS state store provider. I guess you're trying to find how it can be done less frequently, first time the state is loaded from the file, which is optimal. While I think it can be even done without binding to the state store provider implementation if we really need it (check only once when the provider instance is created), have we measured the actual overhead? If the overhead turns out to be trivial then it won't be matter we run validation check for each batch. It sounds to be sub-optimal, but the overhead would be trivial. |
…format for all state store
e3d841c to
01007fb
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
Show resolved
Hide resolved
| extends SparkException("The streaming query failed by state format invalidation. " + | ||
| "The following reasons may cause this: 1. An old Spark version wrote the checkpoint that is " + | ||
| "incompatible with the current one; 2. Broken checkpoint files; 3. The query is changed " + | ||
| "among restart. For the first case, you can try to restart the application without " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the first case: I think it's for the cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resolution is for the first case. For the rest cases listing, they should be considered as user problems.
| * An exception thrown when an invalid UnsafeRow is detected in state store. | ||
| */ | ||
| class InvalidUnsafeRowException | ||
| extends SparkException("The streaming query failed by state format invalidation. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be SparkException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, change it to RuntimeException. Done in fd74ff9.
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for a few comments.
| @volatile private var storeConf: StateStoreConf = _ | ||
| @volatile private var hadoopConf: Configuration = _ | ||
| @volatile private var numberOfVersionsToRetainInMemory: Int = _ | ||
| @volatile private var isValidated = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a TODO that this validation should be moved to a higher level so that it works for all state store implementations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, add the TODO in fd74ff9.
|
Test build #124166 has finished for PR 28707 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
Outdated
Show resolved
Hide resolved
|
Test build #124171 has finished for PR 28707 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
|
Test build #124186 has finished for PR 28707 at commit
|
|
thanks, merging to master! (I think this patch is too big to backport) |
|
Thanks all for reviewing! |
| val STATE_STORE_FORMAT_VALIDATION_ENABLED = | ||
| buildConf("spark.sql.streaming.stateStore.formatValidation.enabled") | ||
| .internal() | ||
| .doc("When true, check if the UnsafeRow from the state store is valid or not when running " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change UnsafeRow to checkpoint ? Most end users do not know what are UnsafeRow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will submit a follow-up PR today.
### What changes were proposed in this pull request? Address comment in #28707 (comment) ### Why are the changes needed? Hide the implementation details in the config doc. ### Does this PR introduce _any_ user-facing change? Config doc change. ### How was this patch tested? Document only. Closes #29315 from xuanyuanking/SPARK-31894-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Introduce UnsafeRow format validation for streaming state store.
Why are the changes needed?
Currently, Structured Streaming directly puts the UnsafeRow into StateStore without any schema validation. It's a dangerous behavior when users reusing the checkpoint file during migration. Any changes or bug fix related to the aggregate function may cause random exceptions, even the wrong answer, e.g SPARK-28067.
Does this PR introduce any user-facing change?
Yes. If the underlying changes are detected when the checkpoint is reused during migration, the InvalidUnsafeRowException will be thrown.
How was this patch tested?
UT added. Will also add integrated tests for more scenario in another PR separately.