-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4131] Support "Writing data into the filesystem from queries" #18975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6ca7771
a975536
a15bf4e
9f596fd
b9db02e
e516bec
e05624f
7f5664d
d50b3a2
61a18a2
4c19aaf
47fde8a
068662a
da7065b
7f4b488
051018e
73f605e
8261b39
163c124
0882dd1
c813ad8
bc5424c
675ffd7
f36e933
64f37f4
b2068ce
8ebe5e2
51f9a0a
e2db5e1
7bb5c85
2e7a29b
7ccbde4
52350e8
2ec9947
05d9d20
392593b
511cfc3
77948bb
fd9322c
e590847
d1b4ec8
dd6a418
c2c693c
e9c88b5
62370fd
b00acdc
b64520b
3aaf6e8
b461e00
e9c24de
28fcb39
0c03a2b
6c24b1b
c4ab411
0ec103e
160c0ec
95ebfd3
4a5ff29
c99872b
449249e
7919041
aeb5d5e
81382df
f93d57a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,7 @@ | |
| package org.apache.spark.sql.catalyst.plans.logical | ||
|
|
||
| import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression | ||
| import org.apache.spark.sql.catalyst.plans._ | ||
|
|
@@ -359,6 +359,30 @@ case class InsertIntoTable( | |
| override lazy val resolved: Boolean = false | ||
| } | ||
|
|
||
| /** | ||
| * Insert query result into a directory. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add the comment like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
| * | ||
| * @param isLocal Indicates whether the specified directory is local directory | ||
| * @param storage Info about output file, row and what serialization format | ||
| * @param provider Specifies what data source to use; only used for data source file. | ||
| * @param child The query to be executed | ||
| * @param overwrite If true, the existing directory will be overwritten | ||
| * | ||
| * Note that this plan is unresolved and has to be replaced by the concrete implementations | ||
| * during analysis. | ||
| */ | ||
| case class InsertIntoDir( | ||
| isLocal: Boolean, | ||
| storage: CatalogStorageFormat, | ||
| provider: Option[String], | ||
| child: LogicalPlan, | ||
| overwrite: Boolean = true) | ||
| extends UnaryNode { | ||
|
|
||
| override def output: Seq[Attribute] = Seq.empty | ||
| override lazy val resolved: Boolean = false | ||
| } | ||
|
|
||
| /** | ||
| * A container for holding the view description(CatalogTable), and the output of the view. The | ||
| * child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser._ | |
| import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.execution.command._ | ||
| import org.apache.spark.sql.execution.datasources.{CreateTable, _} | ||
| import org.apache.spark.sql.execution.datasources._ | ||
| import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
|
|
@@ -1512,4 +1512,81 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |
| query: LogicalPlan): LogicalPlan = { | ||
| RepartitionByExpression(expressions, query, conf.numShufflePartitions) | ||
| } | ||
|
|
||
| /** | ||
| * Return the parameters for [[InsertIntoDir]] logical plan. | ||
| * | ||
| * Expected format: | ||
| * {{{ | ||
| * INSERT OVERWRITE DIRECTORY | ||
| * [path] | ||
| * [OPTIONS table_property_list] | ||
| * select_statement; | ||
| * }}} | ||
| */ | ||
| override def visitInsertOverwriteDir( | ||
| ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) { | ||
| if (ctx.LOCAL != null) { | ||
| throw new ParseException( | ||
| "LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't support
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Originally, LOCAL was not added. |
||
| } | ||
|
|
||
| val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) | ||
| var storage = DataSource.buildStorageFormatFromOptions(options) | ||
|
|
||
| val path = Option(ctx.path).map(string).getOrElse("") | ||
|
|
||
| if (!(path.isEmpty ^ storage.locationUri.isEmpty)) { | ||
| throw new ParseException( | ||
| "Directory path and 'path' in OPTIONS should be specified one, but not both", ctx) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When both are not specified?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Users must specify one and only one
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We go into this condition when both are not specified. But the message is not for it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the original message is clear and simple. "Directory path and 'path' in OPTIONS should be specified one" means at least one should be specified. "but not both", means cannot specify both. "Directory path and 'path' in OPTIONS should be specified one and only one, but not both or both not." feels like redundant. |
||
| } | ||
|
|
||
| if (!path.isEmpty) { | ||
| val customLocation = Some(CatalogUtils.stringToURI(path)) | ||
| storage = storage.copy(locationUri = customLocation) | ||
| } | ||
|
|
||
| val provider = ctx.tableProvider.qualifiedName.getText | ||
|
|
||
| (false, storage, Some(provider)) | ||
| } | ||
|
|
||
| /** | ||
| * Return the parameters for [[InsertIntoDir]] logical plan. | ||
| * | ||
| * Expected format: | ||
| * {{{ | ||
| * INSERT OVERWRITE [LOCAL] DIRECTORY | ||
| * path | ||
| * [ROW FORMAT row_format] | ||
| * [STORED AS file_format] | ||
| * select_statement; | ||
| * }}} | ||
| */ | ||
| override def visitInsertOverwriteHiveDir( | ||
| ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) { | ||
| validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) | ||
| val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) | ||
| .getOrElse(CatalogStorageFormat.empty) | ||
| val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) | ||
| .getOrElse(CatalogStorageFormat.empty) | ||
|
|
||
| val path = string(ctx.path) | ||
| // The path field is required | ||
| if (path.isEmpty) { | ||
| operationNotAllowed("INSERT OVERWRITE DIRECTORY must be accompanied by path", ctx) | ||
| } | ||
|
|
||
| val defaultStorage = HiveSerDe.getDefaultStorage(conf) | ||
|
|
||
| val storage = CatalogStorageFormat( | ||
| locationUri = Some(CatalogUtils.stringToURI(path)), | ||
| inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), | ||
| outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), | ||
| serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), | ||
| compressed = false, | ||
| properties = rowStorage.properties ++ fileStorage.properties) | ||
|
|
||
| (ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER)) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.command | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.datasources._ | ||
|
|
||
| /** | ||
| * A command used to write the result of a query to a directory. | ||
| * | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * INSERT OVERWRITE DIRECTORY (path=STRING)? | ||
| * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) | ||
| * SELECT ... | ||
| * }}} | ||
| * | ||
| * @param storage storage format used to describe how the query result is stored. | ||
| * @param provider the data source type to be used | ||
| * @param query the logical plan representing data to write to | ||
| * @param overwrite whthere overwrites existing directory | ||
| */ | ||
| case class InsertIntoDataSourceDirCommand( | ||
| storage: CatalogStorageFormat, | ||
| provider: String, | ||
| query: LogicalPlan, | ||
| overwrite: Boolean) extends RunnableCommand { | ||
|
|
||
| override def children: Seq[LogicalPlan] = Seq(query) | ||
|
|
||
| override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { | ||
| assert(children.length == 1) | ||
| assert(storage.locationUri.nonEmpty, "Directory path is required") | ||
| assert(provider.nonEmpty, "Data source is required") | ||
|
|
||
| // Create the relation based on the input logical plan: `query`. | ||
| val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) | ||
|
|
||
| val dataSource = DataSource( | ||
| sparkSession, | ||
| className = provider, | ||
| options = storage.properties ++ pathOption, | ||
| catalogTable = None) | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can add an extra check here.
If A simple example is like sql(
s"""
|INSERT OVERWRITE DIRECTORY '${path}'
|USING JDBC
|OPTIONS (uRl '$url1', DbTaBlE 'TEST.PEOPLE1', User 'testUser', PassWord 'testPass')
|SELECT 1, 2
""".stripMargin)Currently, the above query can pass. We should get an exception instead.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. |
||
| val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass) | ||
| if (!isFileFormat) { | ||
| throw new SparkException( | ||
| "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass) | ||
| } | ||
|
|
||
| val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists | ||
| try { | ||
| sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)) | ||
| dataSource.writeAndRead(saveMode, query) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation here confused me, just want to leave a question here why we should call both
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. We should get rid of
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gatorsmile Thanks for you reply, I'll try to fix this. |
||
| } catch { | ||
| case ex: AnalysisException => | ||
| logError(s"Failed to write to directory " + storage.locationUri.toString, ex) | ||
| throw ex | ||
| } | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add it to
TableIdentifierParserSuite?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is already in TableIdentifierParserSuite