Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix
  • Loading branch information
maropu committed Feb 13, 2019
commit a6e4e40ad039fa3dcc522c628ace2968e62ade4c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class Analyzer(

lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
new ResolveHints.ResolveBroadcastHints(conf, catalog),
ResolveHints.ResolveCoalesceHints,
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.IdentifierWithDatabase
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -48,16 +49,25 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accordingly, we can use String instead of SessionCatalog.

-  class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] {
+  class ResolveBroadcastHints(conf: SQLConf, currentDatabase: String) extends Rule[LogicalPlan] {

Copy link
Member Author

@maropu maropu Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can't use String there because currentDatabase might be updatable by others?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can instead use getCurrentDatabase: () => String?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. Right, please ignore this. We need catalog to lookup global_temp, too.

private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")

def resolver: Resolver = conf.resolver

private def namePartsWithDatabase(nameParts: Seq[String]): Seq[String] = {
if (nameParts.size == 1) {
catalog.getCurrentDatabase +: nameParts
} else {
nameParts
}
}

private def matchedTableIdentifier(
nameParts: Seq[String],
tableIdent: IdentifierWithDatabase): Boolean = {
val identifierList = tableIdent.database.map(_ :: Nil).getOrElse(Nil) :+ tableIdent.identifier
nameParts.corresponds(identifierList)(resolver)
val identifierList =
tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: tableIdent.identifier :: Nil
namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic will make a regression (plan1 in the below) in case of global temporary view. Please add the following test case into GlobalTempViewSuite and revise the logic to handle both cases correctly.

  test("broadcast hint on global temp view") {
    import org.apache.spark.sql.catalyst.plans.logical.{ResolvedHint, Join}

    withGlobalTempView("v1") {
      spark.range(10).createGlobalTempView("v1")
      withTempView("v2") {
        spark.range(10).createTempView("v2")

        Seq(
          "SELECT /*+ MAPJOIN(v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id",
          "SELECT /*+ MAPJOIN(global_temp.v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id"
        ).foreach { statement =>
          val plan = sql(statement).queryExecution.optimizedPlan
          assert(plan.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
          assert(!plan.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
        }
      }
    }
  }

Copy link
Member Author

@maropu maropu Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun a little confused about the name resolution here;

"SELECT /*+ MAPJOIN(v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id",

MAPJOIN(v1) implicitly means global_temp.v1?
For example;

"SELECT /*+ MAPJOIN(v1) */ * FROM default.v1, global_temp.v1 WHERE default.v1.id = global_temp.v1.id",

In this case, what's the MAPJOIN(v1) behaviour?

  1. Apply no hint (current behaviour)
  2. Apply a hint into default.v1 only
  3. Apply a hint into both

WDYT?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. First of all, the above two test cases in test("broadcast hint on global temp view") should work as before. In general, global_temp.v1 should be used with the prefix global_temp.. However, before this PR, we cannot put database name on Hint. So, we allowed exceptional cases; hints on global temporary view (without global_temp. prefix).

  2. For the case you mentioned, I'd like to interpret MAPJOIN(v1) to default.v1 only because it's the Spark's behavior outside this Hint syntax. And, please add a test case for this, too.

@cloud-fan and @gatorsmile . Could you give us some advice, too? Is it okay to you?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, @maropu . In addition,

  • The current behavior of master branch (Spark 2.4) is Apply a hint into both.
  • The legacy behavior of Spark 2.3.1 is raising an AnalysisException for that query.

So, I think it's a good change to become consistent in Spark 2.4.

scala> sql("set spark.sql.autoBroadcastJoinThreshold=-1")
scala> sql("set spark.sql.crossJoin.enabled=true")
scala> sql("drop view v1")
scala> sql("create view v1 as select 'view' id").show
scala> sql("create global temporary view v1 as select 'global_temp_view' id").show
scala> sql("SELECT /*+ MAPJOIN(v1) */ * FROM v1, global_temp.v1 WHERE default.v1.id = global_temp.v1.id").explain(true)
org.apache.spark.sql.AnalysisException: cannot resolve '`default.v1.id`' given input columns: [v1.id, v1.id]; line 1 pos 58;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yes. I'll refine the pr. thanks.

}

private def applyBroadcastHint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,33 +196,42 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-25121 Supports multi-part names for broadcast hint resolution") {
val tableName = "t"
val (table1Name, table2Name) = ("t1", "t2")
withTempDatabase { dbName =>
withTable(tableName) {
withTable(table1Name, table2Name) {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
spark.range(100).write.saveAsTable(s"$dbName.$tableName")
spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
// First, makes sure a join is not broadcastable
val plan1 = spark.range(3)
.join(spark.table(s"$dbName.$tableName"), "id")
val plan = sql(s"SELECT * FROM $dbName.$table1Name, $dbName.$table2Name " +
s"WHERE $table1Name.id = $table2Name.id")
.queryExecution.executedPlan
assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size == 0)
assert(plan.collect { case p: BroadcastHashJoinExec => p }.size == 0)

// Uses multi-part table names for broadcast hints
val plan2 = spark.range(3)
.join(spark.table(s"$dbName.$tableName"), "id")
.hint("broadcast", s"$dbName.$tableName")
.queryExecution.executedPlan
val broadcastHashJoin = plan2.collect { case p: BroadcastHashJoinExec => p }
assert(broadcastHashJoin.size == 1)
val broadcastExchange = broadcastHashJoin.head.collect {
case p: BroadcastExchangeExec => p
}
assert(broadcastExchange.size == 1)
val table = broadcastExchange.head.collect {
case FileSourceScanExec(_, _, _, _, _, _, Some(tableIdent)) => tableIdent
def checkIfHintApplied(tableName: String, hintTableName: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hintTableName is never used in this func?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I'll fix.

val p = sql(s"SELECT /*+ BROADCASTJOIN($tableName) */ * " +
s"FROM $tableName, $dbName.$table2Name " +
s"WHERE $tableName.id = $table2Name.id")
.queryExecution.executedPlan
val broadcastHashJoin = p.collect { case p: BroadcastHashJoinExec => p }
assert(broadcastHashJoin.size == 1)
val broadcastExchange = broadcastHashJoin.head.collect {
case p: BroadcastExchangeExec => p
}
assert(broadcastExchange.size == 1)
val table = broadcastExchange.head.collect {
case FileSourceScanExec(_, _, _, _, _, _, Some(tableIdent)) => tableIdent
}
assert(table.size == 1)
assert(table.head === TableIdentifier(table1Name, Some(dbName)))
}
assert(table.size == 1)
assert(table.head === TableIdentifier(tableName, Some(dbName)))

sql(s"USE $dbName")
checkIfHintApplied(table1Name, table1Name)
checkIfHintApplied(s"$dbName.$table1Name", s"$dbName.$table1Name")
checkIfHintApplied(table1Name, s"$dbName.$table1Name")
checkIfHintApplied(s"$dbName.$table1Name", table1Name)
}
}
}
Expand Down