Skip to content

Commit 299cebb

Browse files
committed
fix
1 parent b19ee89 commit 299cebb

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,11 @@ object DDLUtils {
801801
val HIVE_PROVIDER = "hive"
802802

803803
def isHiveTable(table: CatalogTable): Boolean = {
804-
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
804+
isHiveTable(table.provider)
805+
}
806+
807+
def isHiveTable(provider: Option[String]): Boolean = {
808+
provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
805809
}
806810

807811
def isDatasourceTable(table: CatalogTable): Boolean = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
160160
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
161161

162162
case InsertIntoDir(isLocal, storage, provider, child, overwrite)
163-
if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER =>
164-
163+
if DDLUtils.isHiveTable(provider) =>
165164
val outputPath = new Path(storage.locationUri.get)
166165
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
167166

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
2121

2222
import org.apache.spark.internal.io.FileCommitProtocol
2323
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2425
import org.apache.spark.sql.catalyst.expressions.Attribute
2526
import org.apache.spark.sql.execution.SparkPlan
2627
import org.apache.spark.sql.execution.command.DataWritingCommand
@@ -36,7 +37,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
3637
hadoopConf: Configuration,
3738
fileSinkConf: FileSinkDesc,
3839
outputLocation: String,
39-
partitionAttributes: Seq[Attribute] = Nil): Unit = {
40+
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
41+
partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
4042

4143
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
4244
if (isCompressed) {
@@ -62,7 +64,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
6264
plan = plan,
6365
fileFormat = new HiveFileFormat(fileSinkConf),
6466
committer = committer,
65-
outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty),
67+
outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations),
6668
hadoopConf = hadoopConf,
6769
partitionColumns = partitionAttributes,
6870
bucketSpec = None,

0 commit comments

Comments
 (0)