Skip to content
Closed
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
6ca7771
add insert overwrite local directory
janewangfb Aug 17, 2017
a975536
Add Unittests
janewangfb Aug 17, 2017
a15bf4e
fix local path
janewangfb Aug 17, 2017
9f596fd
Merge branch 'master' into port_local_directory
janewangfb Aug 17, 2017
b9db02e
fix style
janewangfb Aug 17, 2017
e516bec
Merge branch 'master' into port_local_directory
janewangfb Aug 18, 2017
e05624f
condense storage
janewangfb Aug 18, 2017
7f5664d
change InsertInto to InsertIntoTable
janewangfb Aug 18, 2017
d50b3a2
add InsertIntoDirectory
janewangfb Aug 19, 2017
61a18a2
update insertInto
janewangfb Aug 19, 2017
4c19aaf
SQLQuerySuite passed
janewangfb Aug 19, 2017
47fde8a
fix comments
janewangfb Aug 19, 2017
068662a
Merge branch 'master' into port_local_directory
janewangfb Aug 21, 2017
da7065b
Add tableProdier
janewangfb Aug 21, 2017
7f4b488
Add InsertIntoDataSourceDirCommand
janewangfb Aug 21, 2017
051018e
Merge branch 'master' into port_local_directory
janewangfb Aug 21, 2017
73f605e
fix style
janewangfb Aug 21, 2017
8261b39
fix style
janewangfb Aug 21, 2017
163c124
Merge branch 'master' into port_local_directory
janewangfb Aug 22, 2017
0882dd1
Address gatorsmile's comments
janewangfb Aug 22, 2017
c813ad8
Use withTempDir
janewangfb Aug 22, 2017
bc5424c
line length
janewangfb Aug 22, 2017
675ffd7
merge master
janewangfb Aug 23, 2017
f36e933
Address gatorsmile's comment
janewangfb Aug 23, 2017
64f37f4
fix typo
janewangfb Aug 23, 2017
b2068ce
Merge branch 'master' into port_local_directory
janewangfb Aug 31, 2017
8ebe5e2
Address gatorsmile's comments
janewangfb Aug 31, 2017
51f9a0a
fix comments and style
janewangfb Aug 31, 2017
e2db5e1
add more comments
janewangfb Aug 31, 2017
7bb5c85
Merge branch 'master' into port_local_directory
janewangfb Sep 1, 2017
2e7a29b
Merge branch 'master' into port_local_directory
janewangfb Sep 1, 2017
7ccbde4
Address gatorsmile's comments
janewangfb Sep 2, 2017
52350e8
fix style
janewangfb Sep 2, 2017
2ec9947
address Tejas' comment
janewangfb Sep 3, 2017
05d9d20
Merge branch 'master' into port_local_directory
janewangfb Sep 5, 2017
392593b
check point
janewangfb Sep 5, 2017
511cfc3
checkpoint
janewangfb Sep 5, 2017
77948bb
Merge insertIntoTable and insertIntoDirectory
janewangfb Sep 5, 2017
fd9322c
refactoring temp dir
janewangfb Sep 6, 2017
e590847
add TODO for tmpPath
janewangfb Sep 6, 2017
d1b4ec8
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
dd6a418
use tmpPath
janewangfb Sep 6, 2017
c2c693c
fix style
janewangfb Sep 6, 2017
e9c88b5
add exists check
janewangfb Sep 6, 2017
62370fd
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
b00acdc
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
b64520b
fix build
janewangfb Sep 6, 2017
3aaf6e8
fix build failure
janewangfb Sep 6, 2017
b461e00
fix build failure
janewangfb Sep 6, 2017
e9c24de
address's gatorsmile's comment
janewangfb Sep 6, 2017
28fcb39
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
0c03a2b
address gatorsmile's comment
janewangfb Sep 6, 2017
6c24b1b
remove unused import
janewangfb Sep 6, 2017
c4ab411
merge master
janewangfb Sep 7, 2017
0ec103e
address gatorsmile's comments
janewangfb Sep 7, 2017
160c0ec
fix style
janewangfb Sep 7, 2017
95ebfd3
add more unittest
janewangfb Sep 7, 2017
4a5ff29
add multi insert
janewangfb Sep 7, 2017
c99872b
Merge branch 'master' into port_local_directory
janewangfb Sep 9, 2017
449249e
address gatorsmile's comments
janewangfb Sep 9, 2017
7919041
address viirya's comment
janewangfb Sep 9, 2017
aeb5d5e
address viirya's comment
janewangfb Sep 9, 2017
81382df
Merge branch 'master' into port_local_directory
janewangfb Sep 9, 2017
f93d57a
address viirya's comment
janewangfb Sep 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,10 @@ query
;

insertInto
: INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)?
| INSERT INTO TABLE? tableIdentifier partitionSpec?
: INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable
| INSERT INTO TABLE? tableIdentifier partitionSpec? #insertIntoTable
| INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir
;

partitionSpecLocation
Expand Down Expand Up @@ -745,6 +747,7 @@ nonReserved
| AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN
| UNBOUNDED | WHEN
| DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP
| DIRECTORY
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add it to TableIdentifierParserSuite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is already in TableIdentifierParserSuite

;

SELECT: 'SELECT';
Expand Down Expand Up @@ -815,6 +818,7 @@ WITH: 'WITH';
VALUES: 'VALUES';
CREATE: 'CREATE';
TABLE: 'TABLE';
DIRECTORY: 'DIRECTORY';
VIEW: 'VIEW';
REPLACE: 'REPLACE';
INSERT: 'INSERT';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ object UnsupportedOperationChecker {
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")

case _: InsertIntoDir =>
throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets")

// mapGroupsWithState and flatMapGroupsWithState
case m: FlatMapGroupsWithState if m.isStreaming =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
Expand Down Expand Up @@ -178,11 +179,63 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}

/**
* Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan.
* Parameters used for writing query to a table:
* (tableIdentifier, partitionKeys, overwrite, exists).
*/
type InsertTableParams = (TableIdentifier, Map[String, Option[String]], Boolean, Boolean)

/**
* Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider).
*/
type InsertDirParams = (Boolean, CatalogStorageFormat, Option[String])

/**
* Add an
* {{{
* INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]?
* INSERT INTO [TABLE] tableIdentifier [partitionSpec]
* INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
* INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList]
* }}}
* operation to logical plan
*/
private def withInsertInto(
ctx: InsertIntoContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
ctx match {
case table: InsertIntoTableContext =>
val (tableIdent, partitionKeys, overwrite, exists) = visitInsertIntoTable(table)
InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, overwrite, exists)
case table: InsertOverwriteTableContext =>
val (tableIdent, partitionKeys, overwrite, exists) = visitInsertOverwriteTable(table)
InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, overwrite, exists)
case dir: InsertOverwriteDirContext =>
val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
case hiveDir: InsertOverwriteHiveDirContext =>
val (isLocal, storage, provider) = visitInsertOverwriteHiveDir(hiveDir)
InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
case _ =>
throw new ParseException("Invalid InsertIntoContext", ctx)
}
}

/**
* Add an INSERT INTO TABLE operation to the logical plan.
*/
override def visitInsertIntoTable(
ctx: InsertIntoTableContext): InsertTableParams = withOrigin(ctx) {
val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)

(tableIdent, partitionKeys, false, false)
}

/**
* Add an INSERT OVERWRITE TABLE operation to the logical plan.
*/
override def visitInsertOverwriteTable(
ctx: InsertOverwriteTableContext): InsertTableParams = withOrigin(ctx) {
val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)

Expand All @@ -192,12 +245,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
}

InsertIntoTable(
UnresolvedRelation(tableIdent),
partitionKeys,
query,
ctx.OVERWRITE != null,
ctx.EXISTS != null)
(tableIdent, partitionKeys, ctx.OVERWRITE() != null, ctx.EXISTS() != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under InsertOverwriteTableContext, I think ctx.OVERWRITE() should not be null at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have InsertOverwriteTableContext and InsertIntoTableContext actually, that said you don't need an overwrite in InsertTableParams. You already know whether to overwrite before visiting them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

}

/**
* Write to a file, returning a [[InsertIntoDir]] logical plan.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write to a dir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

*/
override def visitInsertOverwriteDir(
ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
}

/**
* Write to a file, returning a [[InsertIntoDir]] logical plan.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

*/
override def visitInsertOverwriteHiveDir(
ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) {
throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -359,6 +359,31 @@ case class InsertIntoTable(
override lazy val resolved: Boolean = false
}

/**
* Insert query result into a directory.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the comment like

Note that this plan is unresolved and has to be replaced by the concrete implementations during analysis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

*
* @param isLocal Indicates whether the specified directory is local directory
* @param storage Info about output file, row and what serialization format
* @param provider Specifies what data source to use; only used for data source file.
* @param child The query to be executed
* @param overwrite If true, the existing directory will be overwritten
*
* Note that this plan is unresolved and has to be replaced by the concrete implementations
* during analysis.
*/
case class InsertIntoDir(
Copy link
Member

@gatorsmile gatorsmile Aug 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be InsertOverwriteDir. Let us make it extensible. Keep the current name InsertIntoDir with another parameter overwrite: Boolean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add function descriptions and parameter descriptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

isLocal: Boolean,
storage: CatalogStorageFormat,
provider: Option[String],
child: LogicalPlan,
overwrite: Boolean = true)
extends LogicalPlan {

override def children: Seq[LogicalPlan] = child :: Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can simply extend UnaryNode and remove children.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

override def output: Seq[Attribute] = Seq.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set override lazy val resolved: Boolean = false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

override lazy val resolved: Boolean = false
}

/**
* A container for holding the view description(CatalogTable), and the output of the view. The
* child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, _}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -1512,4 +1512,81 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
query: LogicalPlan): LogicalPlan = {
RepartitionByExpression(expressions, query, conf.numShufflePartitions)
}

/**
* Return the parameters for [[InsertIntoDir]] logical plan.
*
* Expected format:
* {{{
* INSERT OVERWRITE DIRECTORY
* [path]
* [OPTIONS table_property_list]
* select_statement;
* }}}
*/
override def visitInsertOverwriteDir(
ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
if (ctx.LOCAL != null) {
throw new ParseException(
"LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't support LOCAL for data source, should we remove it from the parsing rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, LOCAL was not added.
@gatorsmile had some comment that the parser might have some weird exception and he requested to add it.

}

val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
var storage = DataSource.buildStorageFormatFromOptions(options)

val path = Option(ctx.path).map(string).getOrElse("")

if (!(path.isEmpty ^ storage.locationUri.isEmpty)) {
throw new ParseException(
"Directory path and 'path' in OPTIONS should be specified one, but not both", ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When both are not specified?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users must specify one and only one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We go into this condition when both are not specified. But the message is not for it. Directory path and 'path' in OPTIONS should be specified one and only one, but not both or both not.

Copy link
Contributor Author

@janewangfb janewangfb Sep 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original message is clear and simple. "Directory path and 'path' in OPTIONS should be specified one" means at least one should be specified. "but not both", means cannot specify both.

"Directory path and 'path' in OPTIONS should be specified one and only one, but not both or both not." feels like redundant.

}

if (!path.isEmpty) {
val customLocation = Some(CatalogUtils.stringToURI(path))
storage = storage.copy(locationUri = customLocation)
}

val provider = ctx.tableProvider.qualifiedName.getText

(false, storage, Some(provider))
}

/**
* Return the parameters for [[InsertIntoDir]] logical plan.
*
* Expected format:
* {{{
* INSERT OVERWRITE [LOCAL] DIRECTORY
* path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOCAL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added [LOCAL]

* [ROW FORMAT row_format]
* [STORED AS file_format]
* select_statement;
* }}}
*/
override def visitInsertOverwriteHiveDir(
ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) {
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
.getOrElse(CatalogStorageFormat.empty)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(CatalogStorageFormat.empty)

val path = string(ctx.path)
// The path field is required
if (path.isEmpty) {
operationNotAllowed("INSERT OVERWRITE DIRECTORY must be accompanied by path", ctx)
}

val defaultStorage = HiveSerDe.getDefaultStorage(conf)

val storage = CatalogStorageFormat(
locationUri = Some(CatalogUtils.stringToURI(path)),
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
compressed = false,
properties = rowStorage.properties ++ fileStorage.properties)

(ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources._

/**
* A command used to write the result of a query to a directory.
*
* The syntax of using this command in SQL is:
* {{{
* INSERT OVERWRITE DIRECTORY (path=STRING)?
* USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
* SELECT ...
* }}}
*
* @param storage storage format used to describe how the query result is stored.
* @param provider the data source type to be used
* @param query the logical plan representing data to write to
* @param overwrite whthere overwrites existing directory
*/
case class InsertIntoDataSourceDirCommand(
storage: CatalogStorageFormat,
provider: String,
query: LogicalPlan,
overwrite: Boolean) extends RunnableCommand {

override def children: Seq[LogicalPlan] = Seq(query)

override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
assert(children.length == 1)
assert(storage.locationUri.nonEmpty, "Directory path is required")
assert(!provider.isEmpty, "Data source is required")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: provider.nonEmpty for being consistent with above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


// Create the relation based on the input logical plan: `query`.
val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_))

val dataSource = DataSource(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case only covers FileFormat. For the other external data sources that extend our data source API interfaces, e.g., CreatableRelationProvider, we can block it with a reasonable message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I am not familiar with data source. Is it possible that you can give me some hints how to limit this to only "FileFormat"?

sparkSession,
className = provider,
options = storage.properties ++ pathOption,
catalogTable = None)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add an extra check here.

val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)

If isFileFormat is false, return an error.

A simple example is like

      sql(
        s"""
          |INSERT OVERWRITE DIRECTORY '${path}'
          |USING JDBC
          |OPTIONS (uRl '$url1', DbTaBlE 'TEST.PEOPLE1', User 'testUser', PassWord 'testPass')
          |SELECT 1, 2
        """.stripMargin)

Currently, the above query can pass. We should get an exception instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
if (!isFileFormat) {
throw new SparkException(
"Only Data Sources providing FileFormat are supported: " + dataSource.providingClass)
}

val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists
try {
sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query))
dataSource.writeAndRead(saveMode, query)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we do not need to return BaseRelation, we can call planForWriting and execute the plan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation here confused me, just want to leave a question here why we should call both writeAndRead and planForWriting?
@janewangfb @gatorsmile @cloud-fan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We should get rid of dataSource.writeAndRead @xuanyuanking Could you submit a PR to fix the issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Thanks for you reply, I'll try to fix this.

} catch {
case ex: AnalysisException =>
logError(s"Failed to write to directory " + storage.locationUri.toString, ex)
throw ex
}

Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.HiveSerDe
Expand Down Expand Up @@ -869,4 +870,18 @@ object DDLUtils {
}
}
}

/**
* Throws exception if outputPath tries to overwrite inputpath.
*/
def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
val inputPaths = query.collect {
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
}.flatten

if (inputPaths.contains(outputPath)) {
throw new AnalysisException(
"Cannot overwrite a path that is also being read from.")
}
}
}
Loading