Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ jobs:
exclude-tags: [""]
comment: ["normal"]
include:
- java: 8
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.1-binary'
- java: 8
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
Expand Down
1 change: 1 addition & 0 deletions docs/deployment/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
## Upgrading from Kyuubi 1.9 to 1.10

* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead.
* Since Kyuubi 1.10, the support of Spark engine for Spark 3.1 is removed.
* Since Kyuubi 1.10, the support of Flink engine for Flink 1.16 is removed.

## Upgrading from Kyuubi 1.8 to 1.9
Expand Down
2 changes: 1 addition & 1 deletion docs/quick_start/quick_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component.
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
Engine lib - Kyuubi Engine
Beeline - Kyuubi Beeline
**Spark** Engine 3.1 to 3.5 A Spark distribution
**Spark** Engine 3.2 to 3.5 A Spark distribution
**Flink** Engine 1.17 to 1.19 A Flink distribution
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
**Doris** Engine N/A A Doris cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,6 @@ object SparkSQLEngine extends Logging {
}

def main(args: Array[String]): Unit = {
if (KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION === "3.1") {
warn("The support for Spark 3.1 is deprecated, and will be removed in the next version.")
}
val startedTime = System.currentTimeMillis()
val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match {
case Some(t) => t.toLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,13 @@ class ExecuteStatement(
// Rename all col name to avoid duplicate columns
val colName = range(0, result.schema.size).map(x => "col" + x)

val codec = if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") "zstd" else "zlib"
// df.write will introduce an extra shuffle for the outermost limit, and hurt performance
if (resultMaxRows > 0) {
result.toDF(colName: _*).limit(resultMaxRows).write
.option("compression", codec).format("orc").save(saveFileName.get)
.option("compression", "zstd").format("orc").save(saveFileName.get)
} else {
result.toDF(colName: _*).write
.option("compression", codec).format("orc").save(saveFileName.get)
.option("compression", "zstd").format("orc").save(saveFileName.get)
}
info(s"Save result to ${saveFileName.get}")
fetchOrcStatement = Some(new FetchOrcStatement(spark))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ import org.apache.spark.sql.execution.datasources.RecordReaderIterator
import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
import org.apache.kyuubi.util.reflect.DynConstructors

class FetchOrcStatement(spark: SparkSession) {

Expand All @@ -62,7 +59,7 @@ class FetchOrcStatement(spark: SparkSession) {
val fullSchema = orcSchema.map(f =>
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
val deserializer = getOrcDeserializer(orcSchema, colId)
val deserializer = new OrcDeserializer(orcSchema, colId)
orcIter = new OrcFileIterator(list)
val iterRow = orcIter.map(value =>
unsafeProjection(deserializer.deserialize(value)))
Expand All @@ -73,35 +70,6 @@ class FetchOrcStatement(spark: SparkSession) {
def close(): Unit = {
orcIter.close()
}

private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
try {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
// SPARK-34535 changed the constructor signature of OrcDeserializer
DynConstructors.builder()
.impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
.build[OrcDeserializer]()
.newInstance(
orcSchema,
colId)
} else {
DynConstructors.builder()
.impl(
classOf[OrcDeserializer],
classOf[StructType],
classOf[StructType],
classOf[Array[Int]])
.build[OrcDeserializer]()
.newInstance(
new StructType,
orcSchema,
colId)
}
} catch {
case e: Throwable =>
throw new KyuubiException("Failed to create OrcDeserializer", e)
}
}
}

class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.CollectLimitExec
Expand Down Expand Up @@ -158,9 +157,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
val partsToScan =
partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))

// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
// drop Spark 3.1 support.
val sc = SparkSession.active.sparkContext
val sc = collectLimitExec.session.sparkContext
val res = sc.runJob(
childRDD,
(it: Iterator[InternalRow]) => {
Expand Down Expand Up @@ -347,6 +344,6 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
largeVarTypes)
}

// IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.1/3.2
// IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.2
final private val ARROW_IPC_OPTION_DEFAULT = new IpcOption()
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand All @@ -38,7 +37,6 @@ import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.schema.RowSet
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
import org.apache.kyuubi.util.reflect.DynMethods
import org.apache.kyuubi.util.reflect.ReflectUtils._

object SparkDatasetHelper extends Logging {

Expand All @@ -48,7 +46,7 @@ object SparkDatasetHelper extends Logging {

def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = {
case adaptiveSparkPlan: AdaptiveSparkPlanExec =>
executeArrowBatchCollect(finalPhysicalPlan(adaptiveSparkPlan))
executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan)
// TODO: avoid extra shuffle if `offset` > 0
case collectLimit: CollectLimitExec if offset(collectLimit) > 0 =>
logWarning("unsupported offset > 0, an extra shuffle will be introduced.")
Expand All @@ -57,9 +55,8 @@ object SparkDatasetHelper extends Logging {
doCollectLimit(collectLimit)
case collectLimit: CollectLimitExec if collectLimit.limit < 0 =>
executeArrowBatchCollect(collectLimit.child)
// TODO: replace with pattern match once we drop Spark 3.1 support.
case command: SparkPlan if isCommandResultExec(command) =>
doCommandResultExec(command)
case commandResult: CommandResultExec =>
doCommandResultExec(commandResult)
case localTableScan: LocalTableScanExec =>
doLocalTableScan(localTableScan)
case plan: SparkPlan =>
Expand All @@ -76,10 +73,8 @@ object SparkDatasetHelper extends Logging {
*/
def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
val schemaCaptured = plan.schema
// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
// drop Spark 3.1 support.
val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
val maxRecordsPerBatch = plan.session.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = plan.session.sessionState.conf.sessionLocalTimeZone
// note that, we can't pass the lazy variable `maxBatchSize` directly, this is because input
// arguments are serialized and sent to the executor side for execution.
val maxBatchSizePerBatch = maxBatchSize
Expand Down Expand Up @@ -160,10 +155,8 @@ object SparkDatasetHelper extends Logging {
}

private def doCollectLimit(collectLimit: CollectLimitExec): Array[Array[Byte]] = {
// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
// drop Spark 3.1 support.
val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = collectLimit.session.sessionState.conf.sessionLocalTimeZone
val maxRecordsPerBatch = collectLimit.session.sessionState.conf.arrowMaxRecordsPerBatch

val batches = KyuubiArrowConverters.takeAsArrowBatches(
collectLimit,
Expand Down Expand Up @@ -191,27 +184,21 @@ object SparkDatasetHelper extends Logging {
result.toArray
}

private lazy val commandResultExecRowsMethod = DynMethods.builder("rows")
.impl("org.apache.spark.sql.execution.CommandResultExec")
.build()

private def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = {
val spark = SparkSession.active
// TODO: replace with `command.rows` once we drop Spark 3.1 support.
val rows = commandResultExecRowsMethod.invoke[Seq[InternalRow]](command)
command.longMetric("numOutputRows").add(rows.size)
sendDriverMetrics(spark.sparkContext, command.metrics)
private def doCommandResultExec(commandResult: CommandResultExec): Array[Array[Byte]] = {
val spark = commandResult.session
commandResult.longMetric("numOutputRows").add(commandResult.rows.size)
sendDriverMetrics(spark.sparkContext, commandResult.metrics)
KyuubiArrowConverters.toBatchIterator(
rows.iterator,
command.schema,
commandResult.rows.iterator,
commandResult.schema,
spark.sessionState.conf.arrowMaxRecordsPerBatch,
maxBatchSize,
-1,
spark.sessionState.conf.sessionLocalTimeZone).toArray
}

private def doLocalTableScan(localTableScan: LocalTableScanExec): Array[Array[Byte]] = {
val spark = SparkSession.active
val spark = localTableScan.session
localTableScan.longMetric("numOutputRows").add(localTableScan.rows.size)
sendDriverMetrics(spark.sparkContext, localTableScan.metrics)
KyuubiArrowConverters.toBatchIterator(
Expand All @@ -224,31 +211,7 @@ object SparkDatasetHelper extends Logging {
}

/**
* This method provides a reflection-based implementation of
* [[AdaptiveSparkPlanExec.finalPhysicalPlan]] that enables us to adapt to the Spark runtime
* without patching SPARK-41914.
*
* TODO: Once we drop support for Spark 3.1.x, we can directly call
* [[AdaptiveSparkPlanExec.finalPhysicalPlan]].
*/
def finalPhysicalPlan(adaptiveSparkPlanExec: AdaptiveSparkPlanExec): SparkPlan = {
withFinalPlanUpdate(adaptiveSparkPlanExec, identity)
}

/**
* A reflection-based implementation of [[AdaptiveSparkPlanExec.withFinalPlanUpdate]].
*/
private def withFinalPlanUpdate[T](
adaptiveSparkPlanExec: AdaptiveSparkPlanExec,
fun: SparkPlan => T): T = {
val plan = invokeAs[SparkPlan](adaptiveSparkPlanExec, "getFinalPhysicalPlan")
val result = fun(plan)
invokeAs[Unit](adaptiveSparkPlanExec, "finalPlanUpdate")
result
}

/**
* offset support was add since Spark-3.4(set SPARK-28330), to ensure backward compatibility with
* offset support was add in SPARK-28330(3.4.0), to ensure backward compatibility with
* earlier versions of Spark, this function uses reflective calls to the "offset".
*/
private def offset(collectLimitExec: CollectLimitExec): Int = {
Expand All @@ -261,24 +224,6 @@ object SparkDatasetHelper extends Logging {
.getOrElse(0)
}

private def isCommandResultExec(sparkPlan: SparkPlan): Boolean = {
// scalastyle:off line.size.limit
// the CommandResultExec was introduced in SPARK-35378 (Spark 3.2), after SPARK-35378 the
// physical plan of runnable command is CommandResultExec.
// for instance:
// ```
// scala> spark.sql("show tables").queryExecution.executedPlan
// res0: org.apache.spark.sql.execution.SparkPlan =
// CommandResult <empty>, [namespace#0, tableName#1, isTemporary#2]
// +- ShowTables [namespace#0, tableName#1, isTemporary#2], V2SessionCatalog(spark_catalog), [default]
//
// scala > spark.sql("show tables").queryExecution.executedPlan.getClass
// res1: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.CommandResultExec
// ```
// scalastyle:on line.size.limit
sparkPlan.getClass.getName == "org.apache.spark.sql.execution.CommandResultExec"
}

/**
* refer to org.apache.spark.sql.Dataset#withAction(), assign a new execution id for arrow-based
* operation, so that we can track the arrow-based queries on the UI tab.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession

import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION

trait WithSparkSQLEngine extends KyuubiFunSuite {
protected var spark: SparkSession = _
Expand All @@ -35,7 +34,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
// Affected by such configuration' default value
// engine.initialize.sql='SHOW DATABASES'
// SPARK-35378
protected lazy val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") 1 else 0
protected val initJobId: Int = 1

override def beforeAll(): Unit = {
startSparkEngine()
Expand Down
Loading