Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.engine.spark.operation

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -112,11 +112,15 @@ class PlanOnlyStatement(
SQLConf.get.maxToStringFields,
printOperatorId = false))))
case PhysicalMode =>
val physical = spark.sql(statement).queryExecution.sparkPlan
val analyzed = spark.sessionState.analyzer.execute(plan)
spark.sessionState.analyzer.checkAnalysis(analyzed)
val optimized = spark.sessionState.optimizer.execute(analyzed)
val physical = spark.sessionState.planner.plan(ReturnAnswer(optimized)).next()
iter = new IterableFetchIterator(Seq(Row(physical.toString())))
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toString())))
val executed = spark.sql(s"explain formatted $statement").collect.map(x =>
Row(x.getString(0).replaceFirst("== Physical Plan ==\n", "")))
iter = new IterableFetchIterator(executed)
case UnknownMode => throw unknownModeError(mode)
case _ => throw notSupportedModeError(mode, "Spark SQL")
}
Expand All @@ -136,11 +140,14 @@ class PlanOnlyStatement(
val optimized = spark.sessionState.optimizer.execute(analyzed)
iter = new IterableFetchIterator(Seq(Row(optimized.toJSON)))
case PhysicalMode =>
val physical = spark.sql(statement).queryExecution.sparkPlan
val analyzed = spark.sessionState.analyzer.execute(plan)
spark.sessionState.analyzer.checkAnalysis(analyzed)
val optimized = spark.sessionState.optimizer.execute(analyzed)
val physical = spark.sessionState.planner.plan(ReturnAnswer(optimized)).next()
iter = new IterableFetchIterator(Seq(Row(physical.toJSON)))
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toJSON)))
throw KyuubiSQLException(s"The operation mode $mode" +
" with json style doesn't support in Spark SQL engine.")
case UnknownMode => throw unknownModeError(mode)
case _ =>
throw KyuubiSQLException(s"The operation mode $mode" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
Map.empty) {
withJdbcStatement() { statement =>
val operationPlan = getOperationPlanWithStatement(statement)
assert(operationPlan.startsWith("*(1) Project") &&
operationPlan.contains("*(1) Scan OneRowRelation"))
assert(operationPlan.startsWith("* Project (2)") &&
operationPlan.contains("* Scan OneRowRelation (1)"))
}
}
}
Expand Down Expand Up @@ -188,7 +188,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
}

test("kyuubi #3214: Plan only mode with an incorrect value") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> "parse"))(Map.empty) {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> ParseMode.name))(Map.empty) {
withJdbcStatement() { statement =>
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=parser")
val e = intercept[KyuubiSQLException](statement.executeQuery("select 1"))
Expand All @@ -201,6 +201,40 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
}
}

test("kyuubi #3435: Command should not execute when plan only mode is set to PHYSICAL") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))(Map.empty) {
withJdbcStatement("tmp_test") { statement =>
statement.execute("create database if not exists db1")
statement.execute("use db1")
statement.executeQuery("create table tmp_test(test_col string) using parquet")
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}" +
s"=${PhysicalMode.name}")
statement.executeQuery("drop table tmp_test")
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=${NoneMode.name}")
val result = statement.executeQuery("desc table tmp_test")
assert(result.next())
assert(result.getString(1).contains("test_col"))
}
}
}

test("kyuubi #3435: Command should not execute when plan only mode is set to EXECUTION") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name))(Map.empty) {
withJdbcStatement("tmp_test") { statement =>
statement.execute("create database if not exists db1")
statement.execute("use db1")
statement.executeQuery("create table tmp_test(test_col string) using parquet")
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}" +
s"=${ExecutionMode.name}")
statement.executeQuery("drop table tmp_test")
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=${NoneMode.name}")
val result = statement.executeQuery("desc table tmp_test")
assert(result.next())
assert(result.getString(1).contains("test_col"))
}
}
}

private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1 where true")
assert(resultSet.next())
Expand Down