Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ package object dsl {
def maxDistinct(e: Expression): Expression = Max(e).toAggregateExpression(isDistinct = true)
def upper(e: Expression): Expression = Upper(e)
def lower(e: Expression): Expression = Lower(e)
def coalesce(args: Expression*): Expression = Coalesce(args)
def sqrt(e: Expression): Expression = Sqrt(e)
def abs(e: Expression): Expression = Abs(e)
def star(names: String*): Expression = names match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class AverageLike(child: Expression) extends DeclarativeAggregate {
override lazy val aggBufferAttributes = sum :: count :: Nil

override lazy val initialValues = Seq(
/* sum = */ Cast(Literal(0), sumDataType),
/* sum = */ Literal(0).cast(sumDataType),
/* count = */ Literal(0L)
)

Expand All @@ -58,18 +58,16 @@ abstract class AverageLike(child: Expression) extends DeclarativeAggregate {
// If all input are nulls, count will be 0 and we will get null after the division.
override lazy val evaluateExpression = child.dataType match {
case _: DecimalType =>
Cast(
DecimalPrecision.decimalAndDecimal(sum / Cast(count, DecimalType.LongDecimal)),
resultType)
DecimalPrecision.decimalAndDecimal(sum / count.cast(DecimalType.LongDecimal)).cast(resultType)
case _ =>
Cast(sum, resultType) / Cast(count, resultType)
sum.cast(resultType) / count.cast(resultType)
}

protected def updateExpressionsDef: Seq[Expression] = Seq(
/* sum = */
Add(
sum,
Coalesce(Cast(child, sumDataType) :: Cast(Literal(0), sumDataType) :: Nil)),
coalesce(child.cast(sumDataType), Literal(0).cast(sumDataType))),
/* count = */ If(IsNull(child), count, count + 1L)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,23 +924,3 @@ case class Deduplicate(

override def output: Seq[Attribute] = child.output
}

/**
* A logical plan for setting a barrier of analysis.
*
* The SQL Analyzer goes through a whole query plan even most part of it is analyzed. This
* increases the time spent on query analysis for long pipelines in ML, especially.
*
* This logical plan wraps an analyzed logical plan to prevent it from analysis again. The barrier
* is applied to the analyzed logical plan in Dataset. It won't change the output of wrapped
* logical plan and just acts as a wrapper to hide it from analyzer. New operations on the dataset
* will be put on the barrier, so only the new nodes created will be analyzed.
*
* This analysis barrier will be removed at the end of analysis stage.
*/
case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
override protected def innerChildren: Seq[LogicalPlan] = Seq(child)
override def output: Seq[Attribute] = child.output
override def isStreaming: Boolean = child.isStreaming
override def doCanonicalize(): LogicalPlan = child.canonicalized
}