Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix Hive test cases
  • Loading branch information
rxin committed Nov 21, 2018
commit 34f8bfe69b70ff702324ec7f38d78ae920410ef7
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object QueryPlanningTracker {
}

/** Returns the current tracker in scope, based on the thread local variable. */
def get: QueryPlanningTracker = localTracker.get()
def get: Option[QueryPlanningTracker] = Option(localTracker.get())

/** Sets the current tracker for the execution of function f. We assume f is single-threaded. */
def withTracker[T](tracker: QueryPlanningTracker)(f: => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger()
val tracker = QueryPlanningTracker.get
val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get

batches.foreach { batch =>
val batchStartPlan = curPlan
Expand All @@ -111,9 +111,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)

if (tracker ne null) {
tracker.recordRuleInvocation(rule.ruleName, runTime, effective)
}
// Record timing information using QueryPlanningTracker
tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this make the query-local and the global metrics inconsistent when tracker is None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! (not great -- but I'd probably remove the global tracker at some point)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing the global tracker would be great!


// Run the structural integrity checker against the plan after each rule.
if (!isPlanIntegral(result)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
Expand Down Expand Up @@ -220,6 +219,16 @@ private[hive] class TestHiveSparkSession(
sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.newSession()
}

/**
* This is a temporary hack to override SparkSession.sql so we can still use the version of
* Dataset.ofRows that creates a TestHiveQueryExecution (rather than a normal QueryExecution
* which wouldn't load all the test tables).
*/
override def sql(sqlText: String): DataFrame = {
val plan = sessionState.sqlParser.parsePlan(sqlText)
Dataset.ofRows(self, plan)
}

override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
}
Expand Down Expand Up @@ -587,7 +596,7 @@ private[hive] class TestHiveQueryExecution(
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(sparkSession.loadTestTable)
// Proceed with analysis.
sparkSession.sessionState.analyzer.executeAndCheck(logical, new QueryPlanningTracker)
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}
}

Expand Down