-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28612][SQL] Add DataFrameWriterV2 API #25354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28612][SQL] Add DataFrameWriterV2 API #25354
Conversation
|
@cloud-fan, please take a look at this PR with the new DSv2 write API, as discussed on the InsertInto thread. @brkyvz, @mccheah, @jzhuge, and @dongjoon-hyun, you may also be interested in reviewing. Thanks! |
|
Test build #108630 has finished for PR 25354 at commit
|
|
Test build #108632 has finished for PR 25354 at commit
|
|
Thank you for pinging me, @rdblue . Could you fix the doc generation issue in order to pass the Jenkins? |
|
Test build #108674 has finished for PR 25354 at commit
|
| } | ||
|
|
||
| @scala.annotation.varargs | ||
| override def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this name intended to be different from DataFrameWriter.partitionBy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent is to match CREATE TABLE SQL, which uses PARTITIONED BY.
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala
Outdated
Show resolved
Hide resolved
|
How do convert BTW, a new JdbcCatalog CatalogPlugin type may store url and connectionProperties in catalog properties, thus easier for users to access Jdbc tables. |
|
@jzhuge, options like connection properties and JDBC URL belong at the catalog level, not at the table level. Those are table-level configuration in v1 because there is no catalog that holds common options like the database connection URL. |
33dde6d to
409a0bc
Compare
|
@rdblue Catalog level properties sounds good to me. Also realized that JdbcCatalog plugin type is not needed if the only things we want are the provider common options. Each provider can pick up common options from catalog properties automatically, e.g.: |
|
Test build #108676 has finished for PR 25354 at commit
|
| } | ||
|
|
||
| /** | ||
| * Configuration methods common to create/replace operations and insert/overwrite operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By reading this, sounds like there is another Writer for insert/overwrite extending WriteConfigMethods, like CreateTableWriter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. DataFrameWriterV2 and CreateTableWriter implement these methods. When a CreateTableWriter method is called, like partitionedBy, the result is always a CreateTableWriter and not a DataFrameWriterV2 so that append can't be called with unsupported options.
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use this API with paths or URIs instead of table names?
Something like:
writeTo.table("catalog.db.tbl")
writeTo.path("file:/tmp/abc") \\ or
writeTo.uri("jdbc:mysql://...")
| * @group partition_transforms | ||
| * @since 3.0.0 | ||
| */ | ||
| def years(e: Column): Column = withExpr { Years(e.expr) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried that these are a single letter away from existing function names. With one small typo, you either get your function to be unevaluable, or return a runtime exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you think of a solution to this besides supporting year(ts) as though it were years? I'm concerned that would cause confusion for functions like hour that have different meanings.
Maybe we should catch these and throw more helpful exceptions instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we hide them by a namespace?
object partitionBy {
def years: ...
def months: ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this would be partitionBy.years? I find partitionBy a little awkward. Is there a better name we could use? partitioner.years maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too stuck up on the name. What do you think of the idea in general? Other names I can think of:
- partitioner
- logical
- transformer
- partitioning
- partition
could be likepartition.byYears,partition.byDays...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the names should match the SQL names, which are years, months, etc.
I think I like "partitioning" the best, since it qualifies the function. partitioning.years. Should I make this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with that. Let's discuss at the sync in 30 mins and gather feedback. Wouldn't want you to waste work.
| @scala.annotation.varargs | ||
| override def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] = { | ||
| val asTransforms = (column +: columns).map(_.expr).map { | ||
| case Years(attr: Attribute) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not use the existing:
year
month
dayofmonth
hour
functions that already exist? The closeness of the function names worry me. I understand the separation of concerns, but something to consider. Maybe these were already discussed in the SPIP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we can't.
First, those are concrete functions that have different meanings. hour(ts) is hour of day, not hourly partitions, and day of month is not a function you would partition on either.
Second, using those functions would not correspond to the transform names that are supported in SQL, which are years, months, days, and hours.
| identifier, | ||
| partitioning.getOrElse(Seq.empty), | ||
| logicalPlan, | ||
| properties = provider.map(p => properties + ("provider" -> p)).getOrElse(properties).toMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about location, if this is meant to be an external table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a location(String) method to this that is translated to a location property. Does this need to be in the initial version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't need to be initial. We should however standardize on whether:
option("path", ...)would/should show up as "location" as a table property- Or should it be set by
tableProperty("location", ...) - or
tableProperty("path", ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be tableProperty("location") because that's what we've standardized on elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the DFWriter we have a config:
df.sparkSession.sessionState.conf.defaultDataSourceName
do you want to use that here, or do you think the catalog is free to create whatever datasource if the provider isn't available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. It is up to the catalog what to use for the provider when one isn't specified. This API doesn't need to do that -- it should be filled in by the v2 session catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be documented somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will be in the v2 documentation because we have to explain how USING is passed.
@brkyvz, Yes, it is. Instead of passing names to The v2 identifiers SPIP includes how to represent a path-based table: When we have a spec for how path-based tables behave, we can update |
|
Test build #108788 has finished for PR 25354 at commit
|
|
Test build #108947 has finished for PR 25354 at commit
|
|
Retest this please. |
|
Test build #108986 has finished for PR 25354 at commit
|
|
Test build #109429 has finished for PR 25354 at commit
|
17fe1fd to
fb0fafc
Compare
|
Test build #109446 has finished for PR 25354 at commit
|
| /** | ||
| * Expression for the v2 partition transform hours. | ||
| */ | ||
| case class Hours(child: Expression) extends PartitionTransformExpression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Ryan, maybe naive question, why not supporting granularity down to minutes or seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've not seen an example of a table that requires partitioning down to minutes or seconds. I'm not opposed to adding them, but it seems to me that those would not be very useful and would probably get people that use them into trouble by over-partitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, thanks. agree that if not many use cases for minutes or seconds then we can ignore it.
| case class OverwritePartitionsDynamic( | ||
| table: NamedRelation, | ||
| query: LogicalPlan, | ||
| writeOptions: Map[String, String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, is it coding style to have boolean parameter as the last one like isByName?
here writeOptions 2nd from last, and writeOptions is the last parameter in OverwritePartitionsDynamic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is for style. Boolean parameters should be passed by name, like isByName = false. Although you can pass positional parameters after a named parameter, the expectation is usually that named parameters are not necessarily in the correct position and can be omitted or reordered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks Ryan 👍
| * @group basic | ||
| * @since 3.0.0 | ||
| */ | ||
| def writeTo(table: String): DataFrameWriterV2[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find there is write for v1
/**
* Interface for saving the content of the non-streaming Dataset out into external storage.
*
* @group basic
* @since 1.6.0
*/
def write: DataFrameWriter[T] = {
if (isStreaming) {
logicalPlan.failAnalysis(
"'write' can not be called on streaming Dataset/DataFrame")
}
new DataFrameWriter[T](this)
}
why not name it writeV2 to be self-explaining? or overload write from different return type DataFrameWriterV2[T] ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't change the behavior of write because we don't want to break older jobs. And we need to pass the table name or path somewhere. I think this works, but if everyone prefers writeV2, we can rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like writeTo, since we're explaining exactly what we're writing to at that point.
| case lit @ Literal(_, IntegerType) => | ||
| Bucket(lit, e.expr) | ||
| case _ => | ||
| throw new AnalysisException(s"Invalid number of buckets: $numBuckets") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add column information in exception msg for debugging, like s"Invalid number of buckets: $numBuckets, for column: $e"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d"), Row(5L, "e"), Row(6L, "f"))) | ||
| } | ||
|
|
||
| test("Append: by name not position") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, is it better to make test case name like line-88 with expected result? like fail if by name not position
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tests that the validation is by name and not by position, so failing if by name is incorrect. The failure tests that a name violation (can't find "data") is generated, even though the number columns and column types match by position.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see. no issues then. thanks!
| Seq(Row(1L, "a"), Row(2L, "b"), Row(4L, "d"), Row(5L, "e"), Row(6L, "f"))) | ||
| } | ||
|
|
||
| test("Overwrite: by name not position") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same with fail by name not position
fb0fafc to
12097ec
Compare
|
Test build #109505 has finished for PR 25354 at commit
|
|
Test build #109506 has finished for PR 25354 at commit
|
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I still have worries about the partitioning expressions. I'm also fine with throwing better error messages, but curious what you think around namespacing.
| * @group basic | ||
| * @since 3.0.0 | ||
| */ | ||
| def writeTo(table: String): DataFrameWriterV2[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like writeTo, since we're explaining exactly what we're writing to at that point.
| // TODO: streaming could be adapted to use this interface | ||
| if (isStreaming) { | ||
| logicalPlan.failAnalysis( | ||
| "'writeTo' can not be called on streaming Dataset/DataFrame") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be good to add: use 'writeStream' instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't include this because I'd rather not have the v2 API recommend using the v1 API. That seems confusing to me.
| * @group partition_transforms | ||
| * @since 3.0.0 | ||
| */ | ||
| def years(e: Column): Column = withExpr { Years(e.expr) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we hide them by a namespace?
object partitionBy {
def years: ...
def months: ...
}
| /** | ||
| * Create a new table from the contents of the data frame. | ||
| * | ||
| * The new table's schema, partition layout, properties, and other configuration will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema will be assumed to be nullable though
| this | ||
| } | ||
|
|
||
| override def tableProperty(property: String, value: String): DataFrameWriterV2[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should return a CreateTableWriter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| identifier, | ||
| partitioning.getOrElse(Seq.empty), | ||
| logicalPlan, | ||
| properties = provider.map(p => properties + ("provider" -> p)).getOrElse(properties).toMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't need to be initial. We should however standardize on whether:
option("path", ...)would/should show up as "location" as a table property- Or should it be set by
tableProperty("location", ...) - or
tableProperty("path", ...)
|
Test build #109591 has finished for PR 25354 at commit
|
eff659a to
e424c2c
Compare
|
@brkyvz, I've removed the |
|
Test build #109963 has finished for PR 25354 at commit
|
|
Test build #109967 has finished for PR 25354 at commit
|
|
Test build #109982 has finished for PR 25354 at commit
|
|
LGTM. Merging to master. Thanks @rdblue |
|
|
||
| // turn off style check that object names must start with a capital letter | ||
| // scalastyle:off | ||
| object partitioning { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems breaking Maven build:
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.7/6803/console
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.7-ubuntu-testing/1712/consoleFull
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-3.2/289/consoleFull
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-3.2-jdk-11/357/consoleFull
[ERROR] [Error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-testing/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:32: cannot access org.apache.spark.sql.functions.1
class file for org.apache.spark.sql.functions$1 not found
[ERROR] [Error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-testing/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:70: cannot find symbol
symbol: method avg(java.lang.String)
location: class org.apache.spark.sql.hive.JavaDataFrameSuite
[ERROR] [Error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-testing/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:81: cannot find symbol
symbol: method col(java.lang.String)
location: class org.apache.spark.sql.hive.JavaDataFrameSuite
[ERROR] [Error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-testing/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:89: cannot find symbol
symbol: method col(java.lang.String)
location: class org.apache.spark.sql.hive.JavaDataFrameSuite
[ERROR] [Error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-testing/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:90: cannot find symbol
symbol: method col(java.lang.String)
location: class org.apache.spark.sql.hive.JavaDataFrameSuite
[ERROR] [Error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-testing/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:91: cannot find symbol
symbol: method col(java.lang.String)
location: class org.apache.spark.sql.hive.JavaDataFrameSuite
[ERROR] [Error] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-testing/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java:92: cannot find symbol
symbol: method col(java.lang.String)
location: class org.apache.spark.sql.hive.JavaDataFrameSuite
Seems after this commit, Github workflow also seems causing build failures:
|
|
||
| // turn off style check that object names must start with a capital letter | ||
| // scalastyle:off | ||
| object partitioning { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this has a Java corresponding test? This file should be compatible with Java side as well. I doubt if nested object works in Java API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. We also have nested objects in SQLConf but it works fine. Maybe the java compatibility is the problem here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLConf is internal. I don't think it's supposed to be exposed.
|
Uh oh, can you please revert?
…On Sun, Sep 1, 2019, 8:44 PM Hyukjin Kwon ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In sql/core/src/main/scala/org/apache/spark/sql/functions.scala
<#25354 (comment)>:
> @@ -3942,6 +3943,69 @@ object functions {
*/
def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava)
+ // turn off style check that object names must start with a capital letter
+ // scalastyle:off
+ object partitioning {
Does this has a Java corresponding test? This file should be compatible
with Java side as well. I doubt if nested object works in Java API.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#25354?email_source=notifications&email_token=ABIAE642NNEDPJVMA35RHTTQHSDZZA5CNFSM4IJGHPQKYY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOCDKPHLI#pullrequestreview-282391469>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABIAE63KX52GQJWGWSFB3ODQHSDZZANCNFSM4IJGHPQA>
.
|
|
Sure, thanks @brkyvz for confirmation quickly! |
|
Also, locally verified that the Maven build now works fine after reverting this one. |
|
@rdblue sorry I had to revert this. Can you open a PR again with some fixes? |
| * @group partition_transforms | ||
| * @since 3.0.0 | ||
| */ | ||
| def bucket(numBuckets: Column, e: Column): Column = withExpr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this overload if only literal is allowed as numBuckets?
Did we revert this ? I am seeing a compilation error in my local env while compiling the hive module like the following : |
|
Yes, @dilipbiswal . We reverted this 15 hours ago. Did you try to update your local environment?
|
|
@dongjoon-hyun My bad.. sorry.. yeah.. i see the revert when i updated the local env. |

What changes were proposed in this pull request?
This adds a new write API as proposed in the SPIP to standardize logical plans. This new API:
append,overwrite,create, andreplacethat correspond to the new logical plans.partitionedBycan only be called when the writer executescreateorreplace.Here are a few example uses of the new API:
How was this patch tested?
Added
DataFrameWriterV2Suitethat tests the new write API. Existing tests for v2 plans.