Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix something in DataSourceStrategy.scala. Patch provided by @gatorsmile
  • Loading branch information
Michael Allman committed Dec 5, 2016
commit 37fc5955b2a26caa5ef133d19e415172a26330a2
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
insert.copy(partition = parts.map(p => (p._1, None)), child = Project(projectList, query))


case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), part, query, overwrite, false)
case logical.InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), _, query, overwrite, false)
if query.resolved && t.schema.sameType(query.schema) =>

// Sanity checks
Expand Down Expand Up @@ -192,11 +192,19 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty

val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
overwrite.staticPartitionKeys.map { case (k, v) =>
(partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
}
} else {
Map.empty
}

// When partitions are tracked by the catalog, compute all custom partition locations that
// may be relevant to the insertion job.
if (partitionsTrackedByCatalog) {
val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions(
l.catalogTable.get.identifier, Some(overwrite.staticPartitionKeys))
l.catalogTable.get.identifier, Some(staticPartitionKeys))
initialMatchingPartitions = matchingPartitions.map(_.spec)
customPartitionLocations = getCustomPartitionLocations(
t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions)
Expand Down Expand Up @@ -225,14 +233,6 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
t.location.refresh()
}

val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
overwrite.staticPartitionKeys.map { case (k, v) =>
(partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
}
} else {
Map.empty
}

val insertCmd = InsertIntoHadoopFsRelationCommand(
outputPath,
staticPartitionKeys,
Expand Down