Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Protect InsertIntoHive.
  • Loading branch information
yhuai committed Apr 2, 2015
commit 1e241af960c75f7477fe2c2e8631ffac8f2413bb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing: Calling a HiveContext sc is pretty confusing. Also since this is a convenience cast it should be a def so as to not take up extra space in the object.

@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val db = Hive.get(sc.hiveconf)
@transient private lazy val catalog = sc.catalog

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
Expand Down Expand Up @@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
}
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
catalog.synchronized {
catalog.client.validatePartitionNameCharacters(partVals)
}
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir
)
catalog.synchronized {
catalog.client.loadDynamicPartitions(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir
}
} else {
db.loadPartition(
catalog.synchronized {
catalog.client.loadPartition(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
}
}
} else {
catalog.synchronized {
catalog.client.loadTable(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
holdDDLTime)
}
} else {
db.loadTable(
outputPath,
qualifiedTableName,
overwrite,
holdDDLTime)
}

// Invalidate the cache.
Expand Down