Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.kyuubi.SparkUtilsHelper
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -115,10 +116,10 @@ class PlanOnlyStatement(
SQLConf.get.maxToStringFields,
printOperatorId = false))))
case PhysicalMode =>
val physical = spark.sql(statement).queryExecution.sparkPlan
val physical = spark.sessionState.executePlan(plan, CommandExecutionMode.SKIP).sparkPlan
iter = new IterableFetchIterator(Seq(Row(physical.toString())))
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
val executed = spark.sessionState.executePlan(plan, CommandExecutionMode.SKIP).executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toString())))
case LineageMode =>
val result = parseLineage(spark, plan)
Expand All @@ -142,10 +143,10 @@ class PlanOnlyStatement(
val optimized = spark.sessionState.optimizer.execute(analyzed)
iter = new IterableFetchIterator(Seq(Row(optimized.toJSON)))
case PhysicalMode =>
val physical = spark.sql(statement).queryExecution.sparkPlan
val physical = spark.sessionState.executePlan(plan, CommandExecutionMode.SKIP).sparkPlan
iter = new IterableFetchIterator(Seq(Row(physical.toJSON)))
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
val executed = spark.sessionState.executePlan(plan, CommandExecutionMode.SKIP).executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toJSON)))
case LineageMode =>
val result = parseLineage(spark, plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,34 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
}
}

test("KYUUBI #6574: Skip eagerly execute command in physical/execution plan only mode") {
withJdbcStatement() { statement =>
val table = "test_plan_only"
val createTableCommand = s"create table $table(i int) using parquet"

statement.execute(s"SET ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=${PhysicalMode.name}")
val physicalPlan = getOperationPlanWithStatement(statement, createTableCommand)
assert(physicalPlan.startsWith("Execute CreateDataSourceTableCommand")
&& physicalPlan.contains(table))

statement.execute(s"SET ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=${ExecutionMode.name}")
val executionPlan = getOperationPlanWithStatement(statement, createTableCommand)
assert(executionPlan.startsWith("Execute CreateDataSourceTableCommand")
&& physicalPlan.contains(table))

statement.execute(s"SET ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=${NoneMode.name}")
val e = intercept[KyuubiSQLException](statement.executeQuery(s"select * from $table"))
assert(e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND")
|| e.getMessage.contains("Table or view not found"))
}
}

private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1 where true")
getOperationPlanWithStatement(statement, "select 1 where true")
}

private def getOperationPlanWithStatement(statement: Statement, sql: String): String = {
val resultSet = statement.executeQuery(sql)
assert(resultSet.next())
resultSet.getString(1)
}
Expand Down