Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix
  • Loading branch information
gatorsmile committed Sep 13, 2017
commit 299cebb2be933c689f33c0cd96e1a0f7e0f91623
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,11 @@ object DDLUtils {
val HIVE_PROVIDER = "hive"

def isHiveTable(table: CatalogTable): Boolean = {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
isHiveTable(table.provider)
}

def isHiveTable(provider: Option[String]): Boolean = {
provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
}

def isDatasourceTable(table: CatalogTable): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
CreateHiveTableAsSelectCommand(tableDesc, query, mode)

case InsertIntoDir(isLocal, storage, provider, child, overwrite)
if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER =>

if DDLUtils.isHiveTable(provider) =>
val outputPath = new Path(storage.locationUri.get)
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
Expand All @@ -36,7 +37,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
hadoopConf: Configuration,
fileSinkConf: FileSinkDesc,
outputLocation: String,
partitionAttributes: Seq[Attribute] = Nil): Unit = {
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
partitionAttributes: Seq[Attribute] = Nil): Set[String] = {

val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
if (isCompressed) {
Expand All @@ -62,7 +64,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
plan = plan,
fileFormat = new HiveFileFormat(fileSinkConf),
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty),
outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations),
hadoopConf = hadoopConf,
partitionColumns = partitionAttributes,
bucketSpec = None,
Expand Down