Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
use stricter type parameter to make it clear that parquet reader retu…
…rns UnsafeRow
  • Loading branch information
cloud-fan committed Aug 2, 2016
commit 67a4c17c31bb6e12a24b923b96793681ef528521
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,11 @@ private[sql] class ParquetFileFormat
logDebug(s"Falling back to parquet-mr")
val reader = pushed match {
case Some(filter) =>
new ParquetRecordReader[InternalRow](
new ParquetRecordReader[UnsafeRow](
new ParquetReadSupport,
FilterCompat.get(filter, null))
case _ =>
new ParquetRecordReader[InternalRow](new ParquetReadSupport)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
}
reader.initialize(split, hadoopAttemptContext)
reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import org.apache.parquet.schema._
import org.apache.parquet.schema.Type.Repetition

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.types._

/**
* A Parquet [[ReadSupport]] implementation for reading Parquet records as Catalyst
* [[InternalRow]]s.
* [[UnsafeRow]]s.
*
* The API interface of [[ReadSupport]] is a little bit over complicated because of historical
* reasons. In older versions of parquet-mr (say 1.6.0rc3 and prior), [[ReadSupport]] need to be
Expand All @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging {
private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Logging {
private var catalystRequestedSchema: StructType = _

/**
Expand All @@ -72,13 +72,13 @@ private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with
/**
* Called on executor side after [[init()]], before instantiating actual Parquet record readers.
* Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
* records to Catalyst [[InternalRow]]s.
* records to Catalyst [[UnsafeRow]]s.
*/
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
val parquetRequestedSchema = readContext.getRequestedSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -32,12 +32,12 @@ import org.apache.spark.sql.types.StructType
*/
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
extends RecordMaterializer[InternalRow] {
extends RecordMaterializer[UnsafeRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord
override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord

override def getRootConverter: GroupConverter = rootConverter
}