Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
6ca7771
add insert overwrite local directory
janewangfb Aug 17, 2017
a975536
Add Unittests
janewangfb Aug 17, 2017
a15bf4e
fix local path
janewangfb Aug 17, 2017
9f596fd
Merge branch 'master' into port_local_directory
janewangfb Aug 17, 2017
b9db02e
fix style
janewangfb Aug 17, 2017
e516bec
Merge branch 'master' into port_local_directory
janewangfb Aug 18, 2017
e05624f
condense storage
janewangfb Aug 18, 2017
7f5664d
change InsertInto to InsertIntoTable
janewangfb Aug 18, 2017
d50b3a2
add InsertIntoDirectory
janewangfb Aug 19, 2017
61a18a2
update insertInto
janewangfb Aug 19, 2017
4c19aaf
SQLQuerySuite passed
janewangfb Aug 19, 2017
47fde8a
fix comments
janewangfb Aug 19, 2017
068662a
Merge branch 'master' into port_local_directory
janewangfb Aug 21, 2017
da7065b
Add tableProdier
janewangfb Aug 21, 2017
7f4b488
Add InsertIntoDataSourceDirCommand
janewangfb Aug 21, 2017
051018e
Merge branch 'master' into port_local_directory
janewangfb Aug 21, 2017
73f605e
fix style
janewangfb Aug 21, 2017
8261b39
fix style
janewangfb Aug 21, 2017
163c124
Merge branch 'master' into port_local_directory
janewangfb Aug 22, 2017
0882dd1
Address gatorsmile's comments
janewangfb Aug 22, 2017
c813ad8
Use withTempDir
janewangfb Aug 22, 2017
bc5424c
line length
janewangfb Aug 22, 2017
675ffd7
merge master
janewangfb Aug 23, 2017
f36e933
Address gatorsmile's comment
janewangfb Aug 23, 2017
64f37f4
fix typo
janewangfb Aug 23, 2017
b2068ce
Merge branch 'master' into port_local_directory
janewangfb Aug 31, 2017
8ebe5e2
Address gatorsmile's comments
janewangfb Aug 31, 2017
51f9a0a
fix comments and style
janewangfb Aug 31, 2017
e2db5e1
add more comments
janewangfb Aug 31, 2017
7bb5c85
Merge branch 'master' into port_local_directory
janewangfb Sep 1, 2017
2e7a29b
Merge branch 'master' into port_local_directory
janewangfb Sep 1, 2017
7ccbde4
Address gatorsmile's comments
janewangfb Sep 2, 2017
52350e8
fix style
janewangfb Sep 2, 2017
2ec9947
address Tejas' comment
janewangfb Sep 3, 2017
05d9d20
Merge branch 'master' into port_local_directory
janewangfb Sep 5, 2017
392593b
check point
janewangfb Sep 5, 2017
511cfc3
checkpoint
janewangfb Sep 5, 2017
77948bb
Merge insertIntoTable and insertIntoDirectory
janewangfb Sep 5, 2017
fd9322c
refactoring temp dir
janewangfb Sep 6, 2017
e590847
add TODO for tmpPath
janewangfb Sep 6, 2017
d1b4ec8
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
dd6a418
use tmpPath
janewangfb Sep 6, 2017
c2c693c
fix style
janewangfb Sep 6, 2017
e9c88b5
add exists check
janewangfb Sep 6, 2017
62370fd
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
b00acdc
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
b64520b
fix build
janewangfb Sep 6, 2017
3aaf6e8
fix build failure
janewangfb Sep 6, 2017
b461e00
fix build failure
janewangfb Sep 6, 2017
e9c24de
address's gatorsmile's comment
janewangfb Sep 6, 2017
28fcb39
Merge branch 'master' into port_local_directory
janewangfb Sep 6, 2017
0c03a2b
address gatorsmile's comment
janewangfb Sep 6, 2017
6c24b1b
remove unused import
janewangfb Sep 6, 2017
c4ab411
merge master
janewangfb Sep 7, 2017
0ec103e
address gatorsmile's comments
janewangfb Sep 7, 2017
160c0ec
fix style
janewangfb Sep 7, 2017
95ebfd3
add more unittest
janewangfb Sep 7, 2017
4a5ff29
add multi insert
janewangfb Sep 7, 2017
c99872b
Merge branch 'master' into port_local_directory
janewangfb Sep 9, 2017
449249e
address gatorsmile's comments
janewangfb Sep 9, 2017
7919041
address viirya's comment
janewangfb Sep 9, 2017
aeb5d5e
address viirya's comment
janewangfb Sep 9, 2017
81382df
Merge branch 'master' into port_local_directory
janewangfb Sep 9, 2017
f93d57a
address viirya's comment
janewangfb Sep 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address gatorsmile's comments
  • Loading branch information
janewangfb committed Aug 22, 2017
commit 0882dd1f3c300f832d731b69a0d57ef461e55038
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ object UnsupportedOperationChecker {
"Distinct aggregations are not supported on streaming DataFrames/Datasets. Consider " +
"using approx_count_distinct() instead.")


case _: Command =>
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
case hiveDir: InsertOverwriteHiveDirContext => visitInsertOverwriteHiveDir(hiveDir)
}

InsertIntoDir(isLocal, storage, provider, query)
InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ case class InsertIntoDir(
isLocal: Boolean,
storage: CatalogStorageFormat,
provider: Option[String],
child: LogicalPlan)
child: LogicalPlan,
overwrite: Boolean = true)
extends LogicalPlan {

override def children: Seq[LogicalPlan] = child :: Nil
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can simply extend UnaryNode and remove children.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
*
* Expected format:
* {{{
* INSERT OVERWRITE DIRECTORY
* INSERT OVERWRITE [LOCAL] DIRECTORY
* path
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOCAL?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added [LOCAL]

* [ROW FORMAT row_format]
* [STORED AS file_format]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import org.apache.spark.sql.execution.datasources._
case class InsertIntoDataSourceDirCommand(
storage: CatalogStorageFormat,
provider: Option[String],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add a parameter overwrite: Boolean,.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us change it to provider: String

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

query: LogicalPlan) extends RunnableCommand {
query: LogicalPlan,
overwrite: Boolean) extends RunnableCommand {

override def innerChildren: Seq[LogicalPlan] = Seq(query)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

innerChildren -> children

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


override def run(sparkSession: SparkSession): Seq[Row] = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run(sparkSession: SparkSession, children: Seq[SparkPlan])

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

assert(innerChildren.length == 1)
assert(!storage.locationUri.isEmpty)
assert(storage.locationUri.nonEmpty)
assert(provider.isDefined)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add the helper messages for the above three asserts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


// Create the relation based on the input logical plan: `data`.
Expand All @@ -52,8 +53,9 @@ case class InsertIntoDataSourceDirCommand(
options = storage.properties ++ pathOption,
catalogTable = None)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add an extra check here.

val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)

If isFileFormat is false, return an error.

A simple example is like

      sql(
        s"""
          |INSERT OVERWRITE DIRECTORY '${path}'
          |USING JDBC
          |OPTIONS (uRl '$url1', DbTaBlE 'TEST.PEOPLE1', User 'testUser', PassWord 'testPass')
          |SELECT 1, 2
        """.stripMargin)

Currently, the above query can pass. We should get an exception instead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists
try {
dataSource.writeAndRead(SaveMode.Overwrite, query)
dataSource.writeAndRead(saveMode, query)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we do not need to return BaseRelation, we can call planForWriting and execute the plan

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation here confused me, just want to leave a question here why we should call both writeAndRead and planForWriting?
@janewangfb @gatorsmile @cloud-fan

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We should get rid of dataSource.writeAndRead @xuanyuanking Could you submit a PR to fix the issue?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Thanks for you reply, I'll try to fix this.

} catch {
case ex: AnalysisException =>
logError(s"Failed to write to directory " + storage.locationUri.toString, ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case InsertIntoDir(_, storage, provider, query) =>
InsertIntoDataSourceDirCommand(storage, provider, query)
case InsertIntoDir(_, storage, provider, query, overwrite) if provider.nonEmpty =>
Copy link
Copy Markdown
Member

@gatorsmile gatorsmile Aug 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provider.toLowerCase(Locale.ROOT) != HIVE_PROVIDER

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

InsertIntoDataSourceDirCommand(storage, provider, query, overwrite)

case i @ InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ class DDLCommandSuite extends PlanTest {
test("insert overwrite directory") {
val v1 = "INSERT OVERWRITE DIRECTORY '/tmp/file' USING parquet SELECT 1 as a"
parser.parsePlan(v1) match {
case InsertIntoDir(_, storage, provider, query) =>
case InsertIntoDir(_, storage, provider, query, overwrite) =>
assert(storage.locationUri != None && storage.locationUri.get.toString == "/tmp/file")
case other =>
fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" +
Expand All @@ -510,7 +510,7 @@ class DDLCommandSuite extends PlanTest {
| SELECT 1 as a
""".stripMargin
parser.parsePlan(v3) match {
case InsertIntoDir(_, storage, provider, query) =>
case InsertIntoDir(_, storage, provider, query, overwrite) =>
assert(storage.locationUri != None && provider == Some("json"))
case other =>
fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ object HiveAnalysis extends Rule[LogicalPlan] {
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
CreateHiveTableAsSelectCommand(tableDesc, query, mode)

case InsertIntoDir(isLocal, storage, _, child) =>
InsertIntoHiveDirCommand(isLocal, storage, child)
case InsertIntoDir(isLocal, storage, _, child, overwrite) =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provider.toLowerCase(Locale.ROOT) == HIVE_PROVIDER

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

InsertIntoHiveDirCommand(isLocal, storage, child, overwrite)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ import org.apache.spark.util.Utils
case class InsertIntoHiveDirCommand(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

isLocal: Boolean,
storage: CatalogStorageFormat,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add a parameter overwrite: Boolean,.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

query: LogicalPlan) extends SaveAsHiveFile {
query: LogicalPlan,
overwrite: Boolean) extends SaveAsHiveFile {

override def children: Seq[LogicalPlan] = query :: Nil

override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
assert(children.length == 1)
assert(!storage.locationUri.isEmpty)
assert(storage.locationUri.nonEmpty)

val Array(cols, types) = children.head.output.foldLeft(Array("", "")) { case (r, a) =>
r(0) = r(0) + a.name + ","
Expand Down Expand Up @@ -79,14 +80,22 @@ case class InsertIntoHiveDirCommand(
val localFileSystem = FileSystem.getLocal(jobConf)
val localPath = localFileSystem.makeQualified(targetPath)
if (localFileSystem.exists(localPath)) {
localFileSystem.delete(localPath, true)
if (overwrite) {
localFileSystem.delete(localPath, true)
} else {
throw new RuntimeException("Directory '" + localPath.toString + "' already exists")
}
}
localPath
} else {
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
val dfs = qualifiedPath.getFileSystem(jobConf)
if (dfs.exists(qualifiedPath)) {
dfs.delete(qualifiedPath, true)
if (overwrite) {
dfs.delete(qualifiedPath, true)
} else {
throw new RuntimeException("Directory '" + qualifiedPath.toString + "' already exists")
}
} else {
dfs.mkdirs(qualifiedPath.getParent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
// Base trait from which all hive insert statement physical execution extends.
private[hive] trait SaveAsHiveFile extends DataWritingCommand {

protected def saveAsHiveFile(sparkSession: SparkSession,
plan: SparkPlan,
hadoopConf: Configuration,
fileSinkConf: FileSinkDesc,
outputLocation: String,
partitionAttributes: Seq[Attribute] = Nil,
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty): Unit = {
protected def saveAsHiveFile(
sparkSession: SparkSession,
plan: SparkPlan,
hadoopConf: Configuration,
fileSinkConf: FileSinkDesc,
outputLocation: String,
partitionAttributes: Seq[Attribute] = Nil,
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove bucketSpec and options ? Add them back only when we need it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


val sessionState = sparkSession.sessionState
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2046,74 +2046,81 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

val path = Utils.createTempDir()
path.delete()
checkAnswer(
sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' SELECT * FROM src where key < 10"),
Seq.empty[Row])
withTempDir { dir =>
val path = dir.toURI.getPath

checkAnswer(
sql(s"""INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}'
checkAnswer(
sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path}' SELECT * FROM src where key < 10"),
Seq.empty[Row])

checkAnswer(
sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${path}'
|STORED AS orc
|SELECT * FROM src where key < 10""".stripMargin),
Seq.empty[Row])
|SELECT * FROM src where key < 10
""".stripMargin),
Seq.empty[Row])

// use orc data source to check the data of path is right.
sql(
s"""CREATE TEMPORARY TABLE orc_source
|USING org.apache.spark.sql.hive.orc
|OPTIONS (
| PATH '${path.getCanonicalPath}'
|)
""".stripMargin)
checkAnswer(
sql("select * from orc_source"),
sql("select * from src where key < 10").collect()
)
// use orc data source to check the data of path is right.
sql(
s"""
|CREATE TEMPORARY TABLE orc_source
|USING org.apache.spark.sql.hive.orc
|OPTIONS (
| PATH '${dir.getCanonicalPath}'
|)
""".stripMargin)

Utils.deleteRecursively(path)
dropTempTable("orc_source")
checkAnswer(
sql("select * from orc_source"),
sql("select * from src where key < 10").collect())

dropTempTable("orc_source")
}
}

test("insert overwrite to dir from temp table") {
import org.apache.spark.util.Utils
withTempView("test_insert_table") {
spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")

sparkContext
.parallelize(1 to 10)
.map(i => TestData(i, i.toString))
.toDF()
.registerTempTable("test_insert_table")
withTempDir { dir =>
val path = dir.toURI.getPath

val path = Utils.createTempDir()
path.delete()
checkAnswer(
sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}'
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|SELECT * FROM test_insert_table
""".stripMargin),
Seq.empty[Row])
checkAnswer(
sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${path}'
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|SELECT * FROM test_insert_table
""".stripMargin),
Seq.empty[Row])

checkAnswer(
sql(s"""
INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}'
|STORED AS orc
|SELECT * FROM test_insert_table""".stripMargin),
Seq.empty[Row])
checkAnswer(
sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${path}'
|STORED AS orc
|SELECT * FROM test_insert_table
""".stripMargin),
Seq.empty[Row])

// use orc data source to check the data of path is right.
sql(
s"""CREATE TEMPORARY TABLE orc_source
|USING org.apache.spark.sql.hive.orc
|OPTIONS (
| PATH '${path.getCanonicalPath}'
|)
""".stripMargin)
checkAnswer(
sql("select * from orc_source"),
sql("select * from test_insert_table").collect()
)
Utils.deleteRecursively(path)
dropTempTable("test_insert_table")
dropTempTable("orc_source")
// use orc data source to check the data of path is right.
sql(
s"""
|CREATE TEMPORARY TABLE orc_source
|USING org.apache.spark.sql.hive.orc
|OPTIONS (
| PATH '${dir.getCanonicalPath}'
|)
""".stripMargin)

checkAnswer(
sql("select * from orc_source"),
sql("select * from test_insert_table").collect())

dropTempTable("orc_source")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also put it into withTempView

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

}
}
}
}