Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
59daa48
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 5, 2016
41d5f64
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 7, 2016
472a6e3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 10, 2016
0fba10a
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 12, 2016
797aabb
When doing the test, issue an exception if hitting the max iteration.
gatorsmile Mar 15, 2016
98cab44
Merge remote-tracking branch 'upstream/master' into maxIterationExcep…
gatorsmile Mar 15, 2016
f351362
address comments.
gatorsmile Mar 15, 2016
1281f36
code clean
gatorsmile Mar 15, 2016
23663fe
update comments.
gatorsmile Mar 15, 2016
cbf73b3
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 21, 2016
c08f561
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
474df88
Merge remote-tracking branch 'upstream/master'
gatorsmile Mar 22, 2016
23a6cfc
Merge branch 'maxIterationException' into maxIterationExceptionNew
gatorsmile Mar 22, 2016
48b2c32
address comments.
gatorsmile Mar 23, 2016
1a59fcf
clean code.
gatorsmile Mar 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import scala.collection.JavaConverters._

import com.google.common.util.concurrent.AtomicLongMap

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.util.Utils

object RuleExecutor {
protected val timeMap = AtomicLongMap.create[String]()
Expand All @@ -46,15 +47,23 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
/**
* An execution strategy for rules that indicates the maximum number of executions. If the
* execution reaches fix point (i.e. converge) before maxIterations, it will stop.
* If throws is equal to true, it will issue an exception AnalysisExceptions
*/
abstract class Strategy { def maxIterations: Int }
abstract class Strategy {
def maxIterations: Int
def throws: Boolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throws -> throwsExceptionUponMaxIterations ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. will do

}

/** A strategy that only runs once. */
case object Once extends Strategy { val maxIterations = 1 }
case object Once extends Strategy {
val maxIterations = 1
val throws = false
}

/** A strategy that runs until fix point or maxIterations times, whichever comes first. */
case class FixedPoint(maxIterations: Int) extends Strategy

case class FixedPoint(maxIterations: Int) extends Strategy {
override val throws: Boolean = if (Utils.isTesting) true else false
}
/** A batch of rules. */
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

Expand Down Expand Up @@ -98,7 +107,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}")
val msg = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
if (batch.strategy.throws) throw new SparkException(msg) else logTrace(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalysisException?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also indent it properly, i.e.

if (batch.strategy. throwsExceptionUponMaxIterations) {
  throw new ...
} else {
  logTrace(msg)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the users fault and will only happen in tests so I wouldn't throw AnalysisException. I might even assert or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe TreeNodeException so we have the plan that caused the problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on treenodeexception.

i was suggesting analysis because his documentation said analysisexception earlier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. will do

}
continue = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
class AggregateOptimizeSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
System.setProperty("spark.testing", "true")
val batches = Batch("Aggregate", FixedPoint(100),
RemoveLiteralFromGroupExpressions) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.rules._
class BooleanSimplificationSuite extends PlanTest with PredicateHelper {

object Optimize extends RuleExecutor[LogicalPlan] {
System.setProperty("spark.testing", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just change PlanTest?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, a good idea!

val batches =
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.types.StringType
class ColumnPruningSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
System.setProperty("spark.testing", "true")
val batches = Batch("Column pruning", FixedPoint(100),
ColumnPruning,
CollapseProject) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules._
class CombiningLimitsSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
System.setProperty("spark.testing", "true")
val batches =
Batch("Filter Pushdown", FixedPoint(100),
ColumnPruning) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.DateTimeUtils

class ComputeCurrentTimeSuite extends PlanTest {
System.setProperty("spark.testing", "true")
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Seq(Batch("ComputeCurrentTime", Once, ComputeCurrentTime))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.apache.spark.sql.types.IntegerType
class SimplifyConditionalSuite extends PlanTest with PredicateHelper {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("SimplifyConditionals", FixedPoint(50), SimplifyConditionals) :: Nil
System.setProperty("spark.testing", "true")
val batches =
Batch("SimplifyConditionals", FixedPoint(50), SimplifyConditionals) :: Nil
}

protected def assertEquivalent(e1: Expression, e2: Expression): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.trees

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}

Expand Down Expand Up @@ -46,9 +46,13 @@ class RuleExecutorSuite extends SparkFunSuite {

test("to maxIterations") {
object ToFixedPoint extends RuleExecutor[Expression] {
System.setProperty("spark.testing", "true")
val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil
}

assert(ToFixedPoint.execute(Literal(100)) === Literal(90))
val message = intercept[SparkException] {
ToFixedPoint.execute(Literal(100))
}.getMessage
assert(message.contains("Max iterations (10) reached for batch fixedPoint"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private[sql] trait SQLTestUtils

protected override def beforeAll(): Unit = {
super.beforeAll()
System.setProperty("spark.testing", "true")
if (loadTestDataBeforeTests) {
loadTestData()
}
Expand Down