Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,39 @@

package org.apache.spark.sql.execution

import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructType

object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
def productToRowRdd[A <: Product : TypeTag](data: RDD[A],
outputSchema: StructType): RDD[InternalRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, seems like this is never used actually... shall we remove it instead if this is the case?

val converters = ExpressionEncoder[A].resolveAndBind(outputSchema.toAttributes)
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
val mutableRow = new GenericInternalRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
while (i < numColumns) {
mutableRow(i) = converters(i)(r.productElement(i))
i += 1
}

mutableRow
converters.toRow(r)
}
}
}

/**
* Convert the objects inside Row into the types Catalyst expected.
*/
def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
def rowToRowRdd(data: RDD[Row], outputSchema: StructType): RDD[InternalRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove whole object. rowToRowRdd looks only being used at one place and the code here is quite small.

val converters = RowEncoder(outputSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked each case. Every case looks fine except one case:

case chr: Char => UTF8String.fromString(chr.toString)

Looks we're going to drop Char as StringType. I think it's trivial and rather a mistake that we supported this. I don't feel strongly about documenting it in migration guide but if anyone feels so, we better do that.

data.mapPartitions { iterator =>
val numColumns = outputTypes.length
val mutableRow = new GenericInternalRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
while (i < numColumns) {
mutableRow(i) = converters(i)(r(i))
i += 1
}

mutableRow
converters.toRow(r)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow] = {
if (relation.relation.needConversion) {
execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
execution.RDDConversions.rowToRowRdd(rdd, StructType.fromAttributes(output))
} else {
rdd.asInstanceOf[RDD[InternalRow]]
}
Expand Down