Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix test failed
  • Loading branch information
windpiger committed Mar 1, 2017
commit f4b4d29b4d75266411a8fc7366e5ade6facf516d
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ class HadoopTableReader(

// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
val localTableDesc = tableDesc
val broadcastedHadoopConf = _broadcastedHadoopConf

val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)

val locationPath = new Path(inputPathStr)
val fs = locationPath.getFileSystem(_broadcastedHadoopConf.value.value)
val fs = locationPath.getFileSystem(broadcastedHadoopConf.value.value)

// if the location of the table which is not created by 'stored by' does not exist,
// return an empty RDD
Expand All @@ -130,15 +131,15 @@ class HadoopTableReader(
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)

val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))

val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
deserializer.initialize(hconf, localTableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}

Expand Down