Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
6ca7771
add insert overwrite local directory
janewangfb Aug 17, 2017
a975536
Add Unittests
janewangfb Aug 17, 2017
a15bf4e
fix local path
janewangfb Aug 17, 2017
9f596fd
Merge branch 'master' into port_local_directory
janewangfb Aug 17, 2017
b9db02e
fix style
janewangfb Aug 17, 2017
e516bec
Merge branch 'master' into port_local_directory
janewangfb Aug 18, 2017
e05624f
condense storage
janewangfb Aug 18, 2017
7f5664d
change InsertInto to InsertIntoTable
janewangfb Aug 18, 2017
d50b3a2
add InsertIntoDirectory
janewangfb Aug 19, 2017
61a18a2
update insertInto
janewangfb Aug 19, 2017
4c19aaf
SQLQuerySuite passed
janewangfb Aug 19, 2017
47fde8a
fix comments
janewangfb Aug 19, 2017
068662a
Merge branch 'master' into port_local_directory
janewangfb Aug 21, 2017
da7065b
Add tableProdier
janewangfb Aug 21, 2017
7f4b488
Add InsertIntoDataSourceDirCommand
janewangfb Aug 21, 2017
051018e
Merge branch 'master' into port_local_directory
janewangfb Aug 21, 2017
73f605e
fix style
janewangfb Aug 21, 2017
8261b39
fix style
janewangfb Aug 21, 2017
163c124
Merge branch 'master' into port_local_directory
janewangfb Aug 22, 2017
0882dd1
Address gatorsmile's comments
janewangfb Aug 22, 2017
c813ad8
Use withTempDir
janewangfb Aug 22, 2017
bc5424c
line length
janewangfb Aug 22, 2017
675ffd7
merge master
janewangfb Aug 23, 2017
f36e933
Address gatorsmile's comment
janewangfb Aug 23, 2017
64f37f4
fix typo
janewangfb Aug 23, 2017
b2068ce
Merge branch 'master' into port_local_directory
janewangfb Aug 31, 2017
8ebe5e2
Address gatorsmile's comments
janewangfb Aug 31, 2017
51f9a0a
fix comments and style
janewangfb Aug 31, 2017
e2db5e1
add more comments
janewangfb Aug 31, 2017
7bb5c85
Merge branch 'master' into port_local_directory
janewangfb Sep 1, 2017
2e7a29b
Merge branch 'master' into port_local_directory
janewangfb Sep 1, 2017
7ccbde4
Address gatorsmile's comments
janewangfb Sep 2, 2017
52350e8
fix style
janewangfb Sep 2, 2017
2ec9947
address Tejas' comment
janewangfb Sep 3, 2017
05d9d20
Merge branch 'master' into port_local_directory
janewangfb Sep 5, 2017
392593b
check point
janewangfb Sep 5, 2017
511cfc3
checkpoint
janewangfb Sep 5, 2017
77948bb
Merge insertIntoTable and insertIntoDirectory
janewangfb Sep 5, 2017
fd9322c
refactoring temp dir
janewangfb Sep 6, 2017
e590847
add TODO for tmpPath
janewangfb Sep 6, 2017
d1b4ec8
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
dd6a418
use tmpPath
janewangfb Sep 6, 2017
c2c693c
fix style
janewangfb Sep 6, 2017
e9c88b5
add exists check
janewangfb Sep 6, 2017
62370fd
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
b00acdc
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
b64520b
fix build
janewangfb Sep 6, 2017
3aaf6e8
fix build failure
janewangfb Sep 6, 2017
b461e00
fix build failure
janewangfb Sep 6, 2017
e9c24de
address's gatorsmile's comment
janewangfb Sep 6, 2017
28fcb39
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
0c03a2b
address gatorsmile's comment
janewangfb Sep 6, 2017
6c24b1b
remove unused import
janewangfb Sep 6, 2017
c4ab411
merge master
janewangfb Sep 7, 2017
0ec103e
address gatorsmile's comments
janewangfb Sep 7, 2017
160c0ec
fix style
janewangfb Sep 7, 2017
95ebfd3
add more unittest
janewangfb Sep 7, 2017
4a5ff29
add multi insert
janewangfb Sep 7, 2017
c99872b
Merge branch 'master' into port_local_directory
janewangfb Sep 9, 2017
449249e
address gatorsmile's comments
janewangfb Sep 9, 2017
7919041
address viirya's comment
janewangfb Sep 9, 2017
aeb5d5e
address viirya's comment
janewangfb Sep 9, 2017
81382df
Merge branch 'master' into port_local_directory
janewangfb Sep 9, 2017
f93d57a
address viirya's comment
janewangfb Sep 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address gatorsmile's comments
  • Loading branch information
janewangfb committed Sep 2, 2017
commit 7ccbde47fffba7bef8eceba4993bbd70eeb84845
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ insertIntoTable
;

insertOverwriteDirectory
: INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir
: INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir
;

insertInto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
*/
override def visitInsertOverwriteDir(
ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
if (ctx.LOCAL != null) {
throw new ParseException(
"LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't support LOCAL for data source, should we remove it from the parsing rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, LOCAL was not added.
@gatorsmile had some comment that the parser might have some weird exception and he requested to add it.

}

val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
var storage = DataSource.buildStorageFormatFromOptions(options)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -52,12 +54,19 @@ case class InsertIntoDataSourceDirCommand(

// Create the relation based on the input logical plan: `query`.
val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_))

val dataSource = DataSource(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case only covers FileFormat. For the other external data sources that extend our data source API interfaces, e.g., CreatableRelationProvider, we can block it with a reasonable message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile I am not familiar with data source. Is it possible that you can give me some hints how to limit this to only "FileFormat"?

sparkSession,
className = provider,
options = storage.properties ++ pathOption,
catalogTable = None)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add an extra check here.

val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)

If isFileFormat is false, return an error.

A simple example is like

      sql(
        s"""
          |INSERT OVERWRITE DIRECTORY '${path}'
          |USING JDBC
          |OPTIONS (uRl '$url1', DbTaBlE 'TEST.PEOPLE1', User 'testUser', PassWord 'testPass')
          |SELECT 1, 2
        """.stripMargin)

Currently, the above query can pass. We should get an exception instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
if (!isFileFormat) {
throw new SparkException(
"Only Data Sources providing FileFormat are supported.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could also include dataSource.providingClass in there to make it easy to reason / debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

}

val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists
try {
sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ import java.util.Locale
import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

Expand Down Expand Up @@ -848,4 +847,19 @@ object DDLUtils {
}
}
}


/**
* Throws exception if outputPath tries to overwrite inputpath.
*/
def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
val inputPaths = query.collect {
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
}.flatten

if (inputPaths.contains(outputPath)) {
throw new AnalysisException(
"Cannot overwrite a path that is also being read from.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.datasources
import java.util.Locale
import java.util.concurrent.Callable

import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
Expand Down Expand Up @@ -141,8 +143,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case InsertIntoDir(_, storage, provider, query, overwrite)
case InsertIntoDir(isLocal, storage, provider, query, overwrite)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: isLocal -> _

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) != DDLUtils.HIVE_PROVIDER =>

val outputPath = new Path(storage.locationUri.get)
if (overwrite) DDLUtils.verifyNotReadPath(query, outputPath)

InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
Copy link
Member

@gatorsmile gatorsmile Sep 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to block both InsertIntoDir and InsertIntoHiveDirCommand : cannot overwrite a path that is also being read from. See the example:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L178-L187

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


case i @ InsertIntoTable(
Expand Down Expand Up @@ -181,15 +187,9 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
}

val outputPath = t.location.rootPaths.head
val inputPaths = actualQuery.collect {
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
}.flatten
if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath)

val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
if (overwrite && inputPaths.contains(outputPath)) {
throw new AnalysisException(
"Cannot overwrite a path that is also being read from.")
}

val partitionSchema = actualQuery.resolve(
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@ import java.net.URI
import java.util.Locale

import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchPartitionException,
NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -2379,6 +2378,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}

test("insert overwrite directory to data source not providing FileFormat") {
withTempDir { dir =>
val path = dir.toURI.getPath

val v1 =
s"""
| INSERT OVERWRITE DIRECTORY '${path}'
| USING JDBC
| OPTIONS (a 1, b 0.1, c TRUE)
| SELECT 1 as a, 'c' as b
""".stripMargin
val e = intercept[SparkException] {
spark.sql(v1)
}.getMessage

assert(e.contains("Only Data Sources providing FileFormat are supported"))
}
}

Seq(true, false).foreach { caseSensitive =>
test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ object HiveAnalysis extends Rule[LogicalPlan] {

case InsertIntoDir(isLocal, storage, provider, child, overwrite)
if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER =>

val outputPath = new Path(storage.locationUri.get)
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)

InsertIntoHiveDirCommand(isLocal, storage, child, overwrite)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ package org.apache.spark.sql.hive.execution
import java.util.Properties

import scala.language.existentials

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred._

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -63,28 +64,40 @@ case class InsertIntoHiveDirCommand(
assert(children.length == 1)
assert(storage.locationUri.nonEmpty)

val Array(cols, types) = children.head.output.foldLeft(Array("", "")) { case (r, a) =>
r(0) = r(0) + a.name + ","
r(1) = r(1) + a.dataType.catalogString + ":"
r
}

val properties = new Properties()
properties.put("columns", cols.dropRight(1))
properties.put("columns.types", types.dropRight(1))

val sqlContext = sparkSession.sqlContext

properties.put(serdeConstants.SERIALIZATION_LIB,
// val Array(cols, types) = children.head.output.foldLeft(Array("", "")) { case (r, a) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadcode ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was waiting for comment from @gatorsmile about using CatalogTable with table identifier and database. if he is ok with that, I will remove this code.

// r(0) = r(0) + a.name + ","
// r(1) = r(1) + a.dataType.catalogString + ":"
// r
// }
//
// val properties = new Properties()
// properties.put("columns", cols.dropRight(1))
// properties.put("columns.types", types.dropRight(1))
// properties.put(serdeConstants.SERIALIZATION_LIB,
// storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
//
// import scala.collection.JavaConverters._
// properties.putAll(storage.properties.asJava)
//
// val tableDesc = new TableDesc(
// Utils.classForName(storage.inputFormat.get).asInstanceOf[Class[_ <: InputFormat[_, _]]],
// Utils.classForName(storage.outputFormat.get),
// properties
// )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The following dummy table looks fine to me. Could you remove the above lines? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),
tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
storage = storage,
schema = query.schema
))
hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))

import scala.collection.JavaConverters._
properties.putAll(storage.properties.asJava)

var tableDesc = new TableDesc(
Utils.classForName(storage.inputFormat.get).asInstanceOf[Class[_ <: InputFormat[_, _]]],
Utils.classForName(storage.outputFormat.get),
properties
val tableDesc = new TableDesc(
hiveTable.getInputFormatClass,
hiveTable.getOutputFormatClass,
hiveTable.getMetadata
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure the above logics work well for all Hive versions and all the file formats. Another safer way is to use our existing way

val hiveTable = HiveClientImpl.toHiveTable(dummyCatalogTableWithUserSpecifiedStorage)
val tableDesc = new TableDesc(
  hiveTable.getInputFormatClass,
  hiveTable.getOutputFormatClass,
  hiveTable.getMetadata
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the schema of the query as the dummyTableSchema, do we still need to populate the properties by ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use val hiveTable = HiveClientImpl.toHiveTable(dummyCatalogTableWithUserSpecifiedStorage), it requires a tablename and a database name. but in our case, we dont have it.

I commented out my original code and implement the dummy hive table. Let me know if that's ok to.


val hadoopConf = sparkSession.sessionState.newHadoopConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,4 +645,21 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}
}

test("insert overwrite to dir to illegal path") {
withTempView("test_insert_table") {
spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")

val e = intercept[IllegalArgumentException] {
sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY 'abc://a'
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|SELECT * FROM test_insert_table
""".stripMargin)
}.getMessage

assert(e.contains("Wrong FS: abc://a, expected: file:///"))
}
}
}