Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
})
}

override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
override def innerChildren: Seq[QueryPlan[_]] = subqueries

/**
* Returns a plan where a best effort attempt has been made to transform `this` in a way
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
* eagerly executed.
*/
trait Command extends LeafNode {
trait Command extends LogicalPlan {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.math.{MathContext, RoundingMode}
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
Expand Down Expand Up @@ -243,9 +244,9 @@ object ColumnStat extends Logging {
}

col.dataType match {
case _: IntegralType => fixedLenTypeStruct(LongType)
case dt: IntegralType => fixedLenTypeStruct(dt)
case _: DecimalType => fixedLenTypeStruct(col.dataType)
case DoubleType | FloatType => fixedLenTypeStruct(DoubleType)
case dt @ (DoubleType | FloatType) => fixedLenTypeStruct(dt)
case BooleanType => fixedLenTypeStruct(col.dataType)
case DateType => fixedLenTypeStruct(col.dataType)
case TimestampType => fixedLenTypeStruct(col.dataType)
Expand All @@ -264,14 +265,12 @@ object ColumnStat extends Logging {
}

/** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = {
def rowToColumnStat(row: InternalRow, attr: Attribute): ColumnStat = {
ColumnStat(
distinctCount = BigInt(row.getLong(0)),
// for string/binary min/max, get should return null
min = Option(row.get(1))
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
max = Option(row.get(2))
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
min = Option(row.get(1, attr.dataType)),
max = Option(row.get(2, attr.dataType)),
nullCount = BigInt(row.getLong(3)),
avgLen = row.getLong(4),
maxLen = row.getLong(5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -231,12 +232,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save")

runCommand(df.sparkSession, "save") {
SaveIntoDataSourceCommand(
query = df.logicalPlan,
provider = source,
DataSource(
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap,
mode = mode)
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
}
}

Expand Down Expand Up @@ -607,7 +607,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
try {
val start = System.nanoTime()
// call `QueryExecution.toRDD` to trigger the execution of commands.
qe.toRdd
SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
val end = System.nanoTime()
session.listenerManager.onSuccess(name, qe, end - start)
} catch {
Expand Down
22 changes: 16 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ class Dataset[T] private[sql](
// to happen right away to let these side effects take place eagerly.
queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
case _ =>
queryExecution.analyzed
}
Expand Down Expand Up @@ -2450,6 +2450,11 @@ class Dataset[T] private[sql](
*/
def take(n: Int): Array[T] = head(n)

// An internal version of `take`, which won't set execution id and trigger listeners.
private[sql] def takeInternal(n: Int): Array[T] = {
queryExecution.executedPlan.executeTake(1).map(boundEnc.fromRow)
}

/**
* Returns the first `n` rows in the Dataset as a list.
*
Expand Down Expand Up @@ -2515,6 +2520,11 @@ class Dataset[T] private[sql](
plan.executeCollect().head.getLong(0)
}

// An internal version of `count`, which won't set execution id and trigger listeners.
private[sql] def countInternal(): Long = {
groupBy().count().queryExecution.executedPlan.executeCollect().head.getLong(0)
}

/**
* Returns a new Dataset that has exactly `numPartitions` partitions.
*
Expand Down Expand Up @@ -2744,7 +2754,7 @@ class Dataset[T] private[sql](
createTempViewCommand(viewName, replace = false, global = true)
}

private def createTempViewCommand(
private[sql] def createTempViewCommand(
viewName: String,
replace: Boolean,
global: Boolean): CreateViewCommand = {
Expand Down Expand Up @@ -2937,17 +2947,17 @@ class Dataset[T] private[sql](
}

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
@inline private def withPlan(logicalPlan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
}

/** A convenient function to wrap a logical plan and produce a Dataset. */
@inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
@inline private def withTypedPlan[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
Dataset(sparkSession, logicalPlan)
}

/** A convenient function to wrap a set based logical plan and produce a Dataset. */
@inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
@inline private def withSetOperator[U : Encoder](logicalPlan: LogicalPlan): Dataset[U] = {
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,20 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {


/**
* Returns the result as a hive compatible sequence of strings. This is for testing only.
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
*/
def hiveResultString(): Seq[String] = executedPlan match {
case ExecutedCommandExec(desc: DescribeTableCommand) =>
case ExecutedCommandExec(desc: DescribeTableCommand, _) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
desc.run(sparkSession).map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType,
Option(comment.asInstanceOf[String]).getOrElse(""))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
Seq(name, dataType, Option(comment.asInstanceOf[String]).getOrElse(""))
.map(s => String.format(s"%-20s", s)).mkString("\t")
}
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
case command @ ExecutedCommandExec(s: ShowTablesCommand, _) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case r: RunnableCommand => ExecutedCommandExec(r, r.children.map(planLater)) :: Nil

case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ case class InMemoryRelation(
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[SparkPlan] = Seq(child)
override def innerChildren: Seq[SparkPlan] = Seq(child)

override def producedAttributes: AttributeSet = outputSet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class InMemoryTableScanExec(
@transient relation: InMemoryRelation)
extends LeafExecNode {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ case class AnalyzeColumnCommand(
attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))

val namedExpressions = expressions.map(e => Alias(e, e.toString)())
val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation))
.queryExecution.executedPlan.executeTake(1).head
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation)).executedPlan.executeTake(1).head


val rowCount = statsRow.getLong(0)
val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) =>
(attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1), attr))
(attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 6), attr))
}.toMap
(rowCount, columnStats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class AnalyzeTableCommand(
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (!noscan) {
val newRowCount = sparkSession.table(tableIdentWithDB).count()
val newRowCount = sparkSession.table(tableIdentWithDB).countInternal()
if (newRowCount >= 0 && newRowCount != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,26 @@ case class CacheTableCommand(
require(plan.isEmpty || tableIdent.database.isEmpty,
"Database name is not allowed in CACHE TABLE AS SELECT")

override protected def innerChildren: Seq[QueryPlan[_]] = {
plan.toSeq
}
override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq

override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
CreateViewCommand(
name = tableIdent,
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
allowExisting = false,
replace = false,
viewType = LocalTempView).run(sparkSession)
}
sparkSession.catalog.cacheTable(tableIdent.quotedString)

if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
sparkSession.table(tableIdent).countInternal()
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.debug._
Expand All @@ -36,14 +35,20 @@ import org.apache.spark.sql.types._
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {
def run(sparkSession: SparkSession): Seq[Row]
def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
throw new NotImplementedError
}

def run(sparkSession: SparkSession): Seq[Row] = {
throw new NotImplementedError
}
}

/**
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
*/
case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
Expand All @@ -55,14 +60,19 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
*/
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
val rows = if (children.isEmpty) {
cmd.run(sqlContext.sparkSession)
} else {
cmd.run(sqlContext.sparkSession, children)
}
rows.map(converter(_).asInstanceOf[InternalRow])
}

override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
override def innerChildren: Seq[QueryPlan[_]] = cmd.innerChildren

override def output: Seq[Attribute] = cmd.output

override def children: Seq[SparkPlan] = Nil
override def nodeName: String = cmd.nodeName

override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ case class CreateDataSourceTableAsSelectCommand(
query: LogicalPlan)
extends RunnableCommand {

override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
override def innerChildren: Seq[LogicalPlan] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
Expand Down Expand Up @@ -195,7 +195,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)

try {
dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
dataSource.writeAndRead(mode, query)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ case class CreateViewCommand(

import ViewHelper._

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
override def innerChildren: Seq[QueryPlan[_]] = Seq(child)

if (viewType == PersistedView) {
require(originalText.isDefined, "'originalText' must be provided to create permanent view")
Expand Down Expand Up @@ -264,7 +264,7 @@ case class AlterViewAsCommand(

import ViewHelper._

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def run(session: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
Expand Down
Loading