-
Notifications
You must be signed in to change notification settings - Fork 267
Stop parsing special dates for Spark 3.2+ #3439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop parsing special dates for Spark 3.2+ #3439
Conversation
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
|
build |
| // handle special dates like "epoch", "now", etc. | ||
| val finalResult = specialDates.foldLeft(converted)((prev, specialDate) => | ||
| specialTimestampOr(sanitizedInput, specialDate._1, specialDate._2, prev)) | ||
| val finalResult = withResource(daysEqual(sanitizedInput, DateUtils.EPOCH)) { isEpoch => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: It would be good to have this boilerplate code into a method.
| def daysScalarDays(name: String): Scalar = ShimLoader.getSparkVersion match { | ||
| // In Spark 3.2, special datetime values such as `epoch`, `today`, `yesterday`, `tomorrow`, | ||
| // and `now` are supported in typed literals only | ||
| case version if version >= "3.2" => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to add shims for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do keep the version check here (which I am ok with personally because it reduces complexity in the shim layers) then we should use more robust logic for the version check. We have some code in the test suite that could be moved elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to keep the version check. I would suggest to move @andygrove's implementation to ShimVerion to make shimversions comparable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason to use a shim over a version check is mostly databricks and other vendors that pull back "fixes" not directly related to version numbers. I doubt it will happen in this case, but similar things have happened in the past.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's something we will find out via the test being broken on databricks. If we go with the ShimVersion comparison implementation, the fix is either an additional condition on databricks shim version compare or it's more involved at which point we will have to add a new shim method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced the version dispatch with a shim method.
| // handle special dates like "epoch", "now", etc. | ||
| specialDates.foldLeft(converted)((prev, specialDate) => | ||
| specialDateOr(sanitizedInput, specialDate._1, specialDate._2, prev)) | ||
| withResource(daysEqual(sanitizedInput, DateUtils.EPOCH)) { isEpoch => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change feels like a step backwards. Before with the folding the maximum extra memory needed on the GPU was a boolean column, and the new output column. With this change we now have 5 boolean columns and 5 temporary output columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was not optimal. I reworked this part, cleaning up temporary GPU resources as early as possible.
| val startTimeSeconds = System.currentTimeMillis()/1000L | ||
| val cpuNowSeconds = withCpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long] | ||
| val gpuNowSeconds = withGpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long] | ||
| assert(cpuNowSeconds >= startTimeSeconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for removing these assertions? Are they no longer valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, under spark 3.2+, the result will be zero instead of current time, since NOW is not longer being parsed in 3.2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then have the test check the version. and have it check that now is replaced with a 0 instead. Or just have the test skip entirely if it is 3.2+
Signed-off-by: sperlingxx <[email protected]>
|
build |
|
build |
|
the CI has been assigned to non-reserved instances which failed PVC driver. I will report to blossom |
|
build |
| // `converted` will be closed in replaceSpecialDates. We wrap it with closeOnExcept in case | ||
| // of exception before replaceSpecialDates. | ||
| val finalResult = closeOnExcept(converted) { timeStampVector => | ||
| val specialDates = Seq(DateUtils.EPOCH, DateUtils.NOW, DateUtils.TODAY, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems expensive to replace the special dates with null with Spark 3.2 rather than just skip handling them at all. They will already be ignored by is_timestamp.
I will pull this PR locally today and experiment with it and come back with more detailed suggestions.
|
I looked at how I would approach this and I don't think there is a need to make any shim changes here. Spark 3.2 doesn't support the special dates so it seems confusing to add related logic there. There are also no changes in Spark APIs that would cause us to need shims. I think this issue can be resolved more simply with some version checks in a couple of places: In def specialDatesDays: Map[String, Int] = if (spark320orLater) {
Map.empty
} else {
val today = currentDate()
Map(
EPOCH -> 0,
NOW -> today,
TODAY -> today,
YESTERDAY -> (today - 1),
TOMORROW -> (today + 1)
)
}(and the same for Then in if (spark320orLater) {
withResource(isTimestamp(lhs.getBase, sparkFormat, strfFormat)) { isTimestamp =>
withResource(asTimestamp(lhs.getBase, strfFormat)) { converted =>
withResource(Scalar.fromNull(dtype)) { nullValue =>
isTimestamp.ifElse(converted, nullValue)
}
}
}
} else {
// original complex logic that handles special dates
} |
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
I fully agree on this approach, and I've made corresponding changes. Thanks for such a detailed explanation! |
|
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala
Outdated
Show resolved
Hide resolved
| val startTimeSeconds = System.currentTimeMillis()/1000L | ||
| val cpuNowSeconds = withCpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long] | ||
| val gpuNowSeconds = withGpuSparkSession(now).collect().head.toSeq(1).asInstanceOf[Long] | ||
| assert(cpuNowSeconds >= startTimeSeconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then have the test check the version. and have it check that now is replaced with a 0 instead. Or just have the test skip entirely if it is 3.2+
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
|
build |
Signed-off-by: sperlingxx <[email protected]>
|
build |
|
I pulled this PR and we still have CastOpSuite test failures, was this intended to handle all of those? its referenced in description so I was assuming so |
Hi @tgravescs, this PR can not fix some failed cases in CastOpSuite which are due to #3382. I am going to simply disable those cases for Spark 3.2+ after finishing this PR. It will be a temporary solution, since we need naive support to fix #3382 completely. |
We have to fall back to the CPU for this case unless the user opts into letting us do the cast anyways. Disabling the tests is not a solution. |
Oh, I thought we already had corresponding options to disable casting from string to date/timestamp by default. But I just found I was wrong: we only have the config |
|
We should add in |
revans2
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not thrilled with VersionUtils moving to the plugin as I explained here. But just to get something shipped I am going to approve this and we can revisit the topic later on.
|
That said we do need to fix the 7 digit year parsing as a separate issue. |
|
Just FYI I filed #3530 to handle the 7 digit year fallback issue. I will take a crack at it because hopefully I can get it done today. |
Signed-off-by: sperlingxx [email protected]
Fixes #3383
Current PR also fixes cases on casting string to date/timestamp in
CastOpSuite, which introduce special dates. However, these cases won't be entirely fixed until we fix #3382 through supporting the full range date/timestamp on GPU as what SPARK-35780 does.