From 036d5fd944a2d91c17ab324ff45ceb50c8301eb2 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 14 Jun 2016 15:25:35 +0900 Subject: [PATCH 1/4] Add ExistingRDD logical plan for input with RDD to have a chance to eliminate serialize/deserialize. --- .../org/apache/spark/sql/SparkSession.scala | 22 ++---- .../spark/sql/execution/ExistingRDD.scala | 72 ++++++++++++++++--- .../sql/execution/LocalTableScanExec.scala | 8 ++- .../spark/sql/execution/SparkStrategies.scala | 1 + .../org/apache/spark/sql/QueryTest.scala | 8 ++- 5 files changed, 83 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a3fd39d42eeb..f6fccce8ede6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -237,10 +237,8 @@ class SparkSession private( @Experimental def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { SparkSession.setActiveSession(this) - val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] - val attributeSeq = schema.toAttributes - val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) - Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRDD)(self)) + val encoder = Encoders.product[A] + Dataset.ofRows(self, ExistingRDD(rdd)(self)(encoder)) } /** @@ -328,14 +326,8 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = { - val attributeSeq: Seq[AttributeReference] = getSchema(beanClass) - val className = beanClass.getName - val rowRdd = rdd.mapPartitions { iter => - // BeanInfo is not serializable so we must rediscover it remotely for each partition. - val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className)) - SQLContext.beansToRows(iter, localBeanInfo, attributeSeq) - } - Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self)) + val encoder = Encoders.bean(beanClass).asInstanceOf[Encoder[AnyRef]] + Dataset.ofRows(self, ExistingRDD(rdd.asInstanceOf[RDD[AnyRef]])(self)(encoder)) } /** @@ -425,11 +417,7 @@ class SparkSession private( */ @Experimental def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { - val enc = encoderFor[T] - val attributes = enc.schema.toAttributes - val encoded = data.map(d => enc.toRow(d)) - val plan = LogicalRDD(attributes, encoded)(self) - Dataset[T](self, plan) + Dataset[T](self, ExistingRDD(data)(self)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index e2c23a4ba867..486ef05aba6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -20,12 +20,12 @@ package org.apache.spark.sql.execution import org.apache.commons.lang.StringUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, Encoder, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} @@ -74,13 +74,71 @@ object RDDConversions { } } +private[sql] object ExistingRDD { + + def apply[T: Encoder](rdd: RDD[T])(session: SparkSession): LogicalPlan = { + val exisitingRdd = ExistingRDD(CatalystSerde.generateObjAttr[T], rdd)(session) + CatalystSerde.serialize[T](exisitingRdd) + } +} + /** Logical plan node for scanning data from an RDD. */ +private[sql] case class ExistingRDD[T]( + outputObjAttr: Attribute, + rdd: RDD[T])(session: SparkSession) + extends LeafNode with ObjectProducer with MultiInstanceRelation { + + override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil + + override def newInstance(): ExistingRDD.this.type = + ExistingRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type] + + override def sameResult(plan: LogicalPlan): Boolean = { + plan.canonicalized match { + case ExistingRDD(_, otherRDD) => rdd.id == otherRDD.id + case _ => false + } + } + + override protected def stringArgs: Iterator[Any] = Iterator(output) + + @transient override lazy val statistics: Statistics = Statistics( + // TODO: Instead of returning a default value here, find a way to return a meaningful size + // estimate for RDDs. See PR 1238 for more discussions. + sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) + ) +} + +/** Physical plan node for scanning data from an RDD. */ +private[sql] case class ExistingRDDScanExec[T]( + outputObjAttr: Attribute, + rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + val outputDataType = outputObjAttr.dataType + rdd.mapPartitionsInternal { iter => + val outputObject = ObjectOperator.wrapObjectToRow(outputDataType) + iter.map { value => + numOutputRows += 1 + outputObject(value) + } + } + } + + override def simpleString: String = { + s"Scan $nodeName${output.mkString("[", ",", "]")}" + } +} + +/** Logical plan node for scanning data from an RDD of InternalRow. */ private[sql] case class LogicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow])(session: SparkSession) - extends LogicalPlan with MultiInstanceRelation { - - override def children: Seq[LogicalPlan] = Nil + extends LeafNode with MultiInstanceRelation { override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil @@ -96,8 +154,6 @@ private[sql] case class LogicalRDD( override protected def stringArgs: Iterator[Any] = Iterator(output) - override def producedAttributes: AttributeSet = outputSet - @transient override lazy val statistics: Statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. @@ -105,7 +161,7 @@ private[sql] case class LogicalRDD( ) } -/** Physical plan node for scanning data from an RDD. */ +/** Physical plan node for scanning data from an RDD of InternalRow. */ private[sql] case class RDDScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index df2f238d8c2e..f86f42b1f80e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -34,8 +34,12 @@ private[sql] case class LocalTableScanExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) private val unsafeRows: Array[InternalRow] = { - val proj = UnsafeProjection.create(output, output) - rows.map(r => proj(r).copy()).toArray + if (rows.isEmpty) { + Array.empty + } else { + val proj = UnsafeProjection.create(output, output) + rows.map(r => proj(r).copy()).toArray + } } private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b619d4edc30d..26ec4794f8d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -409,6 +409,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil + case ExistingRDD(outputObjAttr, rdd) => ExistingRDDScanExec(outputObjAttr, rdd) :: Nil case LogicalRDD(output, rdd) => RDDScanExec(output, rdd, "ExistingRDD") :: Nil case BroadcastHint(child) => planLater(child) :: Nil case _ => Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index b15f38c2a71e..899be6dd7926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -241,6 +241,12 @@ abstract class QueryTest extends PlanTest { case _: LogicalRelation => return case p if p.getClass.getSimpleName == "MetastoreRelation" => return case _: MemoryPlan => return + case p: InMemoryRelation => + p.child.transform { + case _: ObjectConsumerExec => return + case _: ObjectProducerExec => return + } + p }.transformAllExpressions { case a: ImperativeAggregate => return case _: TypedAggregateExpression => return From 62aca29a0bb619cea57d6883cca6187ca9afa664 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 24 Jun 2016 21:13:53 +0900 Subject: [PATCH 2/4] Revert createDataFrame for Java Beans. - because the Encoder for Java Beans needs not only getters but also setters. --- .../main/scala/org/apache/spark/sql/SparkSession.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f6fccce8ede6..ad825a6bd146 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -326,8 +326,14 @@ class SparkSession private( * @since 2.0.0 */ def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = { - val encoder = Encoders.bean(beanClass).asInstanceOf[Encoder[AnyRef]] - Dataset.ofRows(self, ExistingRDD(rdd.asInstanceOf[RDD[AnyRef]])(self)(encoder)) + val attributeSeq: Seq[AttributeReference] = getSchema(beanClass) + val className = beanClass.getName + val rowRdd = rdd.mapPartitions { iter => + // BeanInfo is not serializable so we must rediscover it remotely for each partition. + val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className)) + SQLContext.beansToRows(iter, localBeanInfo, attributeSeq) + } + Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self)) } /** From e218f5fbef996ca3e9606cc68bf433c83ebf224e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 8 Jul 2016 11:15:27 +0900 Subject: [PATCH 3/4] Rename ExistingRDD to ExternalRDD. --- .../org/apache/spark/sql/SparkSession.scala | 4 ++-- .../apache/spark/sql/execution/ExistingRDD.scala | 16 ++++++++-------- .../spark/sql/execution/SparkStrategies.scala | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index ad825a6bd146..80370a2d2f81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -238,7 +238,7 @@ class SparkSession private( def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { SparkSession.setActiveSession(this) val encoder = Encoders.product[A] - Dataset.ofRows(self, ExistingRDD(rdd)(self)(encoder)) + Dataset.ofRows(self, ExternalRDD(rdd)(self)(encoder)) } /** @@ -423,7 +423,7 @@ class SparkSession private( */ @Experimental def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { - Dataset[T](self, ExistingRDD(data)(self)) + Dataset[T](self, ExternalRDD(data)(self)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 486ef05aba6f..fbc0206374c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -74,28 +74,28 @@ object RDDConversions { } } -private[sql] object ExistingRDD { +private[sql] object ExternalRDD { def apply[T: Encoder](rdd: RDD[T])(session: SparkSession): LogicalPlan = { - val exisitingRdd = ExistingRDD(CatalystSerde.generateObjAttr[T], rdd)(session) - CatalystSerde.serialize[T](exisitingRdd) + val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session) + CatalystSerde.serialize[T](externalRdd) } } /** Logical plan node for scanning data from an RDD. */ -private[sql] case class ExistingRDD[T]( +private[sql] case class ExternalRDD[T]( outputObjAttr: Attribute, rdd: RDD[T])(session: SparkSession) extends LeafNode with ObjectProducer with MultiInstanceRelation { override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil - override def newInstance(): ExistingRDD.this.type = - ExistingRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type] + override def newInstance(): ExternalRDD.this.type = + ExternalRDD(outputObjAttr.newInstance(), rdd)(session).asInstanceOf[this.type] override def sameResult(plan: LogicalPlan): Boolean = { plan.canonicalized match { - case ExistingRDD(_, otherRDD) => rdd.id == otherRDD.id + case ExternalRDD(_, otherRDD) => rdd.id == otherRDD.id case _ => false } } @@ -110,7 +110,7 @@ private[sql] case class ExistingRDD[T]( } /** Physical plan node for scanning data from an RDD. */ -private[sql] case class ExistingRDDScanExec[T]( +private[sql] case class ExternalRDDScanExec[T]( outputObjAttr: Attribute, rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 26ec4794f8d4..a887252b3e92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -409,7 +409,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil - case ExistingRDD(outputObjAttr, rdd) => ExistingRDDScanExec(outputObjAttr, rdd) :: Nil + case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case LogicalRDD(output, rdd) => RDDScanExec(output, rdd, "ExistingRDD") :: Nil case BroadcastHint(child) => planLater(child) :: Nil case _ => Nil From 61da04057866c16c1f55ae9ecc448042fecd57c9 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 9 Jul 2016 13:20:24 +0900 Subject: [PATCH 4/4] Uncurry `apply` method. --- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 80370a2d2f81..1271d1c55bb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -238,7 +238,7 @@ class SparkSession private( def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { SparkSession.setActiveSession(this) val encoder = Encoders.product[A] - Dataset.ofRows(self, ExternalRDD(rdd)(self)(encoder)) + Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder)) } /** @@ -423,7 +423,7 @@ class SparkSession private( */ @Experimental def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { - Dataset[T](self, ExternalRDD(data)(self)) + Dataset[T](self, ExternalRDD(data, self)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index fbc0206374c2..0aa81481a919 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -76,7 +76,7 @@ object RDDConversions { private[sql] object ExternalRDD { - def apply[T: Encoder](rdd: RDD[T])(session: SparkSession): LogicalPlan = { + def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = { val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session) CatalystSerde.serialize[T](externalRdd) }