From e479043f659b39b38c447a7c04cde1b8e7a1a153 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 26 Dec 2023 21:23:02 +0300 Subject: [PATCH 1/7] Revert withOrigin from Dataset --- .../scala/org/apache/spark/sql/Dataset.scala | 359 ++++++++---------- 1 file changed, 154 insertions(+), 205 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 31e1495db7e3..a3a3f38e8d69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -511,11 +511,9 @@ class Dataset[T] private[sql]( * @group basic * @since 3.4.0 */ - def to(schema: StructType): DataFrame = withOrigin { - withPlan { - val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] - Project.matchSchema(logicalPlan, replaced, sparkSession.sessionState.conf) - } + def to(schema: StructType): DataFrame = withPlan { + val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] + Project.matchSchema(logicalPlan, replaced, sparkSession.sessionState.conf) } /** @@ -775,13 +773,12 @@ class Dataset[T] private[sql]( */ // We only accept an existing column name, not a derived column here as a watermark that is // defined on a derived column cannot referenced elsewhere in the plan. - def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withOrigin { - withTypedPlan { - val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold) - require(!IntervalUtils.isNegative(parsedDelay), - s"delay threshold ($delayThreshold) should not be negative.") - EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) - } + def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan { + val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold) + require(!IntervalUtils.isNegative(parsedDelay), + s"delay threshold ($delayThreshold) should not be negative.") + EliminateEventTimeWatermark( + EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)) } /** @@ -953,10 +950,8 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def join(right: Dataset[_]): DataFrame = withOrigin { - withPlan { - Join(logicalPlan, right.logicalPlan, joinType = Inner, None, JoinHint.NONE) - } + def join(right: Dataset[_]): DataFrame = withPlan { + Join(logicalPlan, right.logicalPlan, joinType = Inner, None, JoinHint.NONE) } /** @@ -1089,23 +1084,22 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = - withOrigin { - // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right - // by creating a new instance for one of the branch. - val joined = sparkSession.sessionState.executePlan( - Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None, JoinHint.NONE)) - .analyzed.asInstanceOf[Join] - - withPlan { - Join( - joined.left, - joined.right, - UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq), - None, - JoinHint.NONE) - } + def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = { + // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right + // by creating a new instance for one of the branch. + val joined = sparkSession.sessionState.executePlan( + Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None, JoinHint.NONE)) + .analyzed.asInstanceOf[Join] + + withPlan { + Join( + joined.left, + joined.right, + UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq), + None, + JoinHint.NONE) } + } /** * Inner join with another `DataFrame`, using the given join expression. @@ -1186,7 +1180,7 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = withOrigin { + def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = { withPlan { resolveSelfJoinCondition(right, Some(joinExprs), joinType) } @@ -1202,10 +1196,8 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.1.0 */ - def crossJoin(right: Dataset[_]): DataFrame = withOrigin { - withPlan { - Join(logicalPlan, right.logicalPlan, joinType = Cross, None, JoinHint.NONE) - } + def crossJoin(right: Dataset[_]): DataFrame = withPlan { + Join(logicalPlan, right.logicalPlan, joinType = Cross, None, JoinHint.NONE) } /** @@ -1229,28 +1221,27 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = - withOrigin { - // Creates a Join node and resolve it first, to get join condition resolved, self-join - // resolved, etc. - val joined = sparkSession.sessionState.executePlan( - Join( - this.logicalPlan, - other.logicalPlan, - JoinType(joinType), - Some(condition.expr), - JoinHint.NONE)).analyzed.asInstanceOf[Join] - - implicit val tuple2Encoder: Encoder[(T, U)] = - ExpressionEncoder.tuple(this.exprEnc, other.exprEnc) - - withTypedPlan(JoinWith.typedJoinWith( - joined, - sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity, - sparkSession.sessionState.analyzer.resolver, - this.exprEnc.isSerializedAsStructForTopLevel, - other.exprEnc.isSerializedAsStructForTopLevel)) - } + def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = { + // Creates a Join node and resolve it first, to get join condition resolved, self-join resolved, + // etc. + val joined = sparkSession.sessionState.executePlan( + Join( + this.logicalPlan, + other.logicalPlan, + JoinType(joinType), + Some(condition.expr), + JoinHint.NONE)).analyzed.asInstanceOf[Join] + + implicit val tuple2Encoder: Encoder[(T, U)] = + ExpressionEncoder.tuple(this.exprEnc, other.exprEnc) + + withTypedPlan(JoinWith.typedJoinWith( + joined, + sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity, + sparkSession.sessionState.analyzer.resolver, + this.exprEnc.isSerializedAsStructForTopLevel, + other.exprEnc.isSerializedAsStructForTopLevel)) + } /** * Using inner equi-join to join this Dataset returning a `Tuple2` for each pair @@ -1428,16 +1419,14 @@ class Dataset[T] private[sql]( * @since 2.2.0 */ @scala.annotation.varargs - def hint(name: String, parameters: Any*): Dataset[T] = withOrigin { - withTypedPlan { - val exprs = parameters.map { - case c: Column => c.expr - case s: Symbol => Column(s.name).expr - case e: Expression => e - case literal => Literal(literal) - } - UnresolvedHint(name, exprs, logicalPlan) - } + def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan { + val exprs = parameters.map { + case c: Column => c.expr + case s: Symbol => Column(s.name).expr + case e: Expression => e + case literal => Literal(literal) + }.toSeq + UnresolvedHint(name, exprs, logicalPlan) } /** @@ -1513,10 +1502,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def as(alias: String): Dataset[T] = withOrigin { - withTypedPlan { - SubqueryAlias(alias, logicalPlan) - } + def as(alias: String): Dataset[T] = withTypedPlan { + SubqueryAlias(alias, logicalPlan) } /** @@ -1553,28 +1540,25 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def select(cols: Column*): DataFrame = withOrigin { - withPlan { - val untypedCols = cols.map { - case typedCol: TypedColumn[_, _] => - // Checks if a `TypedColumn` has been inserted with - // specific input type and schema by `withInputType`. - val needInputType = typedCol.expr.exists { - case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty => true - case _ => false - } + def select(cols: Column*): DataFrame = withPlan { + val untypedCols = cols.map { + case typedCol: TypedColumn[_, _] => + // Checks if a `TypedColumn` has been inserted with + // specific input type and schema by `withInputType`. + val needInputType = typedCol.expr.exists { + case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty => true + case _ => false + } - if (!needInputType) { - typedCol - } else { - throw - QueryCompilationErrors.cannotPassTypedColumnInUntypedSelectError(typedCol.toString) - } + if (!needInputType) { + typedCol + } else { + throw QueryCompilationErrors.cannotPassTypedColumnInUntypedSelectError(typedCol.toString) + } - case other => other - } - Project(untypedCols.map(_.named), logicalPlan) + case other => other } + Project(untypedCols.map(_.named), logicalPlan) } /** @@ -1591,9 +1575,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def select(col: String, cols: String*): DataFrame = withOrigin { - select((col +: cols).map(Column(_)) : _*) - } + def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*) /** * Selects a set of SQL expressions. This is a variant of `select` that accepts @@ -1609,12 +1591,10 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def selectExpr(exprs: String*): DataFrame = withOrigin { - sparkSession.withActive { - select(exprs.map { expr => - Column(sparkSession.sessionState.sqlParser.parseExpression(expr)) - }: _*) - } + def selectExpr(exprs: String*): DataFrame = sparkSession.withActive { + select(exprs.map { expr => + Column(sparkSession.sessionState.sqlParser.parseExpression(expr)) + }: _*) } /** @@ -1628,7 +1608,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = withOrigin { + def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = { implicit val encoder: ExpressionEncoder[U1] = c1.encoder val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named :: Nil, logicalPlan) @@ -1712,10 +1692,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def filter(condition: Column): Dataset[T] = withOrigin { - withTypedPlan { - Filter(condition.expr, logicalPlan) - } + def filter(condition: Column): Dataset[T] = withTypedPlan { + Filter(condition.expr, logicalPlan) } /** @@ -2101,17 +2079,15 @@ class Dataset[T] private[sql]( ids: Array[Column], values: Array[Column], variableColumnName: String, - valueColumnName: String): DataFrame = withOrigin { - withPlan { - Unpivot( - Some(ids.map(_.named).toImmutableArraySeq), - Some(values.map(v => Seq(v.named)).toImmutableArraySeq), - None, - variableColumnName, - Seq(valueColumnName), - logicalPlan - ) - } + valueColumnName: String): DataFrame = withPlan { + Unpivot( + Some(ids.map(_.named).toImmutableArraySeq), + Some(values.map(v => Seq(v.named)).toImmutableArraySeq), + None, + variableColumnName, + Seq(valueColumnName), + logicalPlan + ) } /** @@ -2134,17 +2110,15 @@ class Dataset[T] private[sql]( def unpivot( ids: Array[Column], variableColumnName: String, - valueColumnName: String): DataFrame = withOrigin { - withPlan { - Unpivot( - Some(ids.map(_.named).toImmutableArraySeq), - None, - None, - variableColumnName, - Seq(valueColumnName), - logicalPlan - ) - } + valueColumnName: String): DataFrame = withPlan { + Unpivot( + Some(ids.map(_.named).toImmutableArraySeq), + None, + None, + variableColumnName, + Seq(valueColumnName), + logicalPlan + ) } /** @@ -2261,10 +2235,8 @@ class Dataset[T] private[sql]( * @since 3.0.0 */ @varargs - def observe(name: String, expr: Column, exprs: Column*): Dataset[T] = withOrigin { - withTypedPlan { - CollectMetrics(name, (expr +: exprs).map(_.named), logicalPlan, id) - } + def observe(name: String, expr: Column, exprs: Column*): Dataset[T] = withTypedPlan { + CollectMetrics(name, (expr +: exprs).map(_.named), logicalPlan, id) } /** @@ -2301,10 +2273,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def limit(n: Int): Dataset[T] = withOrigin { - withTypedPlan { - Limit(Literal(n), logicalPlan) - } + def limit(n: Int): Dataset[T] = withTypedPlan { + Limit(Literal(n), logicalPlan) } /** @@ -2313,10 +2283,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 3.4.0 */ - def offset(n: Int): Dataset[T] = withOrigin { - withTypedPlan { - Offset(Literal(n), logicalPlan) - } + def offset(n: Int): Dataset[T] = withTypedPlan { + Offset(Literal(n), logicalPlan) } // This breaks caching, but it's usually ok because it addresses a very specific use case: @@ -2726,20 +2694,20 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") - def explode[A <: Product : TypeTag](input: Column*)(f: Row => IterableOnce[A]): DataFrame = - withOrigin { - val elementSchema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] - val convert = CatalystTypeConverters.createToCatalystConverter(elementSchema) - - val rowFunction = - f.andThen(_.map(convert(_).asInstanceOf[InternalRow])) - val generator = UserDefinedGenerator(elementSchema, rowFunction, input.map(_.expr)) - - withPlan { - Generate(generator, unrequiredChildIndex = Nil, outer = false, - qualifier = None, generatorOutput = Nil, logicalPlan) - } + def explode[A <: Product : TypeTag](input: Column*)(f: Row => IterableOnce[A]): DataFrame = { + val elementSchema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] + + val convert = CatalystTypeConverters.createToCatalystConverter(elementSchema) + + val rowFunction = + f.andThen(_.map(convert(_).asInstanceOf[InternalRow])) + val generator = UserDefinedGenerator(elementSchema, rowFunction, input.map(_.expr)) + + withPlan { + Generate(generator, unrequiredChildIndex = Nil, outer = false, + qualifier = None, generatorOutput = Nil, logicalPlan) } + } /** * (Scala-specific) Returns a new Dataset where a single column has been expanded to zero @@ -2764,7 +2732,7 @@ class Dataset[T] private[sql]( */ @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => IterableOnce[B]) - : DataFrame = withOrigin { + : DataFrame = { val dataType = ScalaReflection.schemaFor[B].dataType val attributes = AttributeReference(outputColumn, dataType)() :: Nil // TODO handle the metadata? @@ -2921,14 +2889,14 @@ class Dataset[T] private[sql]( * @since 3.4.0 */ @throws[AnalysisException] - def withColumnsRenamed(colsMap: Map[String, String]): DataFrame = withOrigin { + def withColumnsRenamed(colsMap: Map[String, String]): DataFrame = { val (colNames, newColNames) = colsMap.toSeq.unzip withColumnsRenamed(colNames, newColNames) } private[spark] def withColumnsRenamed( colNames: Seq[String], - newColNames: Seq[String]): DataFrame = withOrigin { + newColNames: Seq[String]): DataFrame = { require(colNames.size == newColNames.size, s"The size of existing column names: ${colNames.size} isn't equal to " + s"the size of new column names: ${newColNames.size}") @@ -3103,10 +3071,8 @@ class Dataset[T] private[sql]( * @since 3.4.0 */ @scala.annotation.varargs - def drop(col: Column, cols: Column*): DataFrame = withOrigin { - withPlan { - DataFrameDropColumns((col +: cols).map(_.expr), logicalPlan) - } + def drop(col: Column, cols: Column*): DataFrame = withPlan { + DataFrameDropColumns((col +: cols).map(_.expr), logicalPlan) } /** @@ -3137,11 +3103,9 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def dropDuplicates(colNames: Seq[String]): Dataset[T] = withOrigin { - withTypedPlan { - val groupCols = groupColsFromDropDuplicates(colNames) - Deduplicate(groupCols, logicalPlan) - } + def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { + val groupCols = groupColsFromDropDuplicates(colNames) + Deduplicate(groupCols, logicalPlan) } /** @@ -3218,12 +3182,10 @@ class Dataset[T] private[sql]( * @group typedrel * @since 3.5.0 */ - def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withOrigin { - withTypedPlan { - val groupCols = groupColsFromDropDuplicates(colNames) - // UnsupportedOperationChecker will fail the query if this is called with batch Dataset. - DeduplicateWithinWatermark(groupCols, logicalPlan) - } + def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withTypedPlan { + val groupCols = groupColsFromDropDuplicates(colNames) + // UnsupportedOperationChecker will fail the query if this is called with batch Dataset. + DeduplicateWithinWatermark(groupCols, logicalPlan) } /** @@ -3447,7 +3409,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def filter(func: T => Boolean): Dataset[T] = withOrigin { + def filter(func: T => Boolean): Dataset[T] = { withTypedPlan(TypedFilter(func, logicalPlan)) } @@ -3458,7 +3420,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def filter(func: FilterFunction[T]): Dataset[T] = withOrigin { + def filter(func: FilterFunction[T]): Dataset[T] = { withTypedPlan(TypedFilter(func, logicalPlan)) } @@ -3469,7 +3431,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def map[U : Encoder](func: T => U): Dataset[U] = withOrigin { + def map[U : Encoder](func: T => U): Dataset[U] = { withTypedPlan { MapElements[T, U](func, logicalPlan) } @@ -3482,7 +3444,7 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = withOrigin { + def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = { implicit val uEnc: Encoder[U] = encoder withTypedPlan(MapElements[T, U](func, logicalPlan)) } @@ -3645,9 +3607,8 @@ class Dataset[T] private[sql]( * @group action * @since 3.0.0 */ - def tail(n: Int): Array[T] = withOrigin { - withAction("tail", withTypedPlan(Tail(Literal(n), logicalPlan)).queryExecution)(collectFromPlan) - } + def tail(n: Int): Array[T] = withAction( + "tail", withTypedPlan(Tail(Literal(n), logicalPlan)).queryExecution)(collectFromPlan) /** * Returns the first `n` rows in the Dataset as a list. @@ -3711,10 +3672,8 @@ class Dataset[T] private[sql]( * @group action * @since 1.6.0 */ - def count(): Long = withOrigin { - withAction("count", groupBy().count().queryExecution) { plan => - plan.executeCollect().head.getLong(0) - } + def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => + plan.executeCollect().head.getLong(0) } /** @@ -3723,15 +3682,13 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def repartition(numPartitions: Int): Dataset[T] = withOrigin { - withTypedPlan { - Repartition(numPartitions, shuffle = true, logicalPlan) - } + def repartition(numPartitions: Int): Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = true, logicalPlan) } private def repartitionByExpression( numPartitions: Option[Int], - partitionExprs: Seq[Column]): Dataset[T] = withOrigin { + partitionExprs: Seq[Column]): Dataset[T] = { // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. // However, we don't want to complicate the semantics of this API method. // Instead, let's give users a friendly error message, pointing them to the new method. @@ -3776,7 +3733,7 @@ class Dataset[T] private[sql]( private def repartitionByRange( numPartitions: Option[Int], - partitionExprs: Seq[Column]): Dataset[T] = withOrigin { + partitionExprs: Seq[Column]): Dataset[T] = { require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { case expr: SortOrder => expr @@ -3848,10 +3805,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def coalesce(numPartitions: Int): Dataset[T] = withOrigin { - withTypedPlan { - Repartition(numPartitions, shuffle = false, logicalPlan) - } + def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan { + Repartition(numPartitions, shuffle = false, logicalPlan) } /** @@ -3995,10 +3950,8 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @throws[AnalysisException] - def createTempView(viewName: String): Unit = withOrigin { - withPlan { - createTempViewCommand(viewName, replace = false, global = false) - } + def createTempView(viewName: String): Unit = withPlan { + createTempViewCommand(viewName, replace = false, global = false) } @@ -4010,10 +3963,8 @@ class Dataset[T] private[sql]( * @group basic * @since 2.0.0 */ - def createOrReplaceTempView(viewName: String): Unit = withOrigin { - withPlan { - createTempViewCommand(viewName, replace = true, global = false) - } + def createOrReplaceTempView(viewName: String): Unit = withPlan { + createTempViewCommand(viewName, replace = true, global = false) } /** @@ -4031,10 +3982,8 @@ class Dataset[T] private[sql]( * @since 2.1.0 */ @throws[AnalysisException] - def createGlobalTempView(viewName: String): Unit = withOrigin { - withPlan { - createTempViewCommand(viewName, replace = false, global = true) - } + def createGlobalTempView(viewName: String): Unit = withPlan { + createTempViewCommand(viewName, replace = false, global = true) } /** @@ -4474,7 +4423,7 @@ class Dataset[T] private[sql]( plan.executeCollect().map(fromRow) } - private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = withOrigin { + private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = { val sortOrder: Seq[SortOrder] = sortExprs.map { col => col.expr match { case expr: SortOrder => From 5cd3a11185d3adb6cae96c3a6491edbe4912a0d1 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 26 Dec 2023 22:21:15 +0300 Subject: [PATCH 2/7] Update tests --- .../apache/spark/sql/DataFrameSetOperationsSuite.scala | 6 +----- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 4 ++-- .../execution/datasources/FileMetadataStructSuite.scala | 9 +++++---- .../parquet/ParquetFileMetadataStructRowIndexSuite.scala | 4 +++- .../scala/org/apache/spark/sql/sources/InsertSuite.scala | 5 +++-- .../org/apache/spark/sql/streaming/StreamSuite.scala | 4 +--- 6 files changed, 15 insertions(+), 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index e2dd029d4b10..bbb1561bb695 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -374,11 +374,7 @@ class DataFrameSetOperationsSuite extends QueryTest errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", parameters = Map( "colName" -> "`m`", - "dataType" -> "\"MAP\""), - context = ExpectedContext( - fragment = "distinct", - callSitePattern = getCurrentClassCallSitePattern) - ) + "dataType" -> "\"MAP\"")) withTempView("v") { df.createOrReplaceTempView("v") checkError( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index cd28c60d83c7..716e5cd1bd29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2304,7 +2304,7 @@ class DatasetSuite extends QueryTest parameters = Map( "objectName" -> s"`${colName.replace(".", "`.`")}`", "proposal" -> "`field.1`, `field 2`"), - context = ExpectedContext(fragment = "select", getCurrentClassCallSitePattern)) + context = ExpectedContext(fragment = "anonfun$select$4", getCurrentClassCallSitePattern)) } } } @@ -2319,7 +2319,7 @@ class DatasetSuite extends QueryTest parameters = Map( "objectName" -> "`the`.`id`", "proposal" -> "`the.id`"), - context = ExpectedContext(fragment = "select", getCurrentClassCallSitePattern)) + context = ExpectedContext(fragment = "anonfun$select$4", getCurrentClassCallSitePattern)) } test("SPARK-39783: backticks in error message for map candidate key with dots") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 6bf72b82564e..091d49a5053c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -245,8 +245,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { }, errorClass = "FIELD_NOT_FOUND", parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"), - context = - ExpectedContext(fragment = "select", callSitePattern = getCurrentClassCallSitePattern)) + context = ExpectedContext( + fragment = "anonfun$select$4", + callSitePattern = getCurrentClassCallSitePattern)) } metadataColumnsTest("SPARK-42683: df metadataColumn - schema conflict", @@ -526,7 +527,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { errorClass = "FIELD_NOT_FOUND", parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"), context = ExpectedContext( - fragment = "select", + fragment = "anonfun$select$4", callSitePattern = getCurrentClassCallSitePattern)) checkError( @@ -536,7 +537,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { errorClass = "FIELD_NOT_FOUND", parameters = Map("fieldName" -> "`file_NAME`", "fields" -> "`id`, `university`"), context = ExpectedContext( - fragment = "select", + fragment = "anonfun$select$4", callSitePattern = getCurrentClassCallSitePattern)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala index 5e59418f8f92..141ac6eaeae5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala @@ -135,7 +135,9 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS "fieldName" -> "`row_index`", "fields" -> ("`file_path`, `file_name`, `file_size`, " + "`file_block_start`, `file_block_length`, `file_modification_time`")), - context = ExpectedContext(fragment = "select", getCurrentClassCallSitePattern)) + context = ExpectedContext( + fragment = "anonfun$select$4", + getCurrentClassCallSitePattern)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 76073a108a3c..1fae08d36a64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1919,8 +1919,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { }, errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", parameters = Map("objectName" -> "`default`", "proposal" -> "`value`"), - context = - ExpectedContext(fragment = "select", callSitePattern = getCurrentClassCallSitePattern)) + context = ExpectedContext( + fragment = "anonfun$select$4", + callSitePattern = getCurrentClassCallSitePattern)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index cd0bbfd47b2b..b0e54737d104 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -713,9 +713,7 @@ class StreamSuite extends StreamTest { "columnName" -> "`rn_col`", "windowSpec" -> ("(PARTITION BY COL1 ORDER BY COL2 ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING " + - "AND CURRENT ROW)")), - queryContext = Array( - ExpectedContext(fragment = "withColumn", callSitePattern = getCurrentClassCallSitePattern))) + "AND CURRENT ROW)"))) } From 384f8001d228ce3f7d15c1837592bf24dc5439ab Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 26 Dec 2023 22:58:09 +0300 Subject: [PATCH 3/7] Trigger build From 232366f8bd8c4c978ee0817bac943f871766701c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 9 Jan 2024 23:38:00 +0300 Subject: [PATCH 4/7] Skip scala sollection --- sql/core/src/main/scala/org/apache/spark/sql/package.scala | 7 ++++--- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 4 ++-- .../execution/datasources/FileMetadataStructSuite.scala | 6 +++--- .../parquet/ParquetFileMetadataStructRowIndexSuite.scala | 2 +- .../scala/org/apache/spark/sql/sources/InsertSuite.scala | 2 +- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 877d9906a1cf..9831ce62801a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -111,9 +111,10 @@ package object sql { } } - private val sparkCodePattern = Pattern.compile("org\\.apache\\.spark\\.sql\\." + - "(?:functions|Column|ColumnName|SQLImplicits|Dataset|DataFrameStatFunctions)" + - "(?:|\\..*|\\$.*)") + private val sparkCodePattern = Pattern.compile("(org\\.apache\\.spark\\.sql\\." + + "(?:functions|Column|ColumnName|SQLImplicits|Dataset|DataFrameStatFunctions|DatasetHolder)" + + "(?:|\\..*|\\$.*))" + + "|(scala\\.collection\\..*)") private def sparkCode(ste: StackTraceElement): Boolean = { sparkCodePattern.matcher(ste.getClassName).matches() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 716e5cd1bd29..cd28c60d83c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2304,7 +2304,7 @@ class DatasetSuite extends QueryTest parameters = Map( "objectName" -> s"`${colName.replace(".", "`.`")}`", "proposal" -> "`field.1`, `field 2`"), - context = ExpectedContext(fragment = "anonfun$select$4", getCurrentClassCallSitePattern)) + context = ExpectedContext(fragment = "select", getCurrentClassCallSitePattern)) } } } @@ -2319,7 +2319,7 @@ class DatasetSuite extends QueryTest parameters = Map( "objectName" -> "`the`.`id`", "proposal" -> "`the.id`"), - context = ExpectedContext(fragment = "anonfun$select$4", getCurrentClassCallSitePattern)) + context = ExpectedContext(fragment = "select", getCurrentClassCallSitePattern)) } test("SPARK-39783: backticks in error message for map candidate key with dots") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 091d49a5053c..5288f6b9d61e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -246,7 +246,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { errorClass = "FIELD_NOT_FOUND", parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"), context = ExpectedContext( - fragment = "anonfun$select$4", + fragment = "select", callSitePattern = getCurrentClassCallSitePattern)) } @@ -527,7 +527,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { errorClass = "FIELD_NOT_FOUND", parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"), context = ExpectedContext( - fragment = "anonfun$select$4", + fragment = "select", callSitePattern = getCurrentClassCallSitePattern)) checkError( @@ -537,7 +537,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { errorClass = "FIELD_NOT_FOUND", parameters = Map("fieldName" -> "`file_NAME`", "fields" -> "`id`, `university`"), context = ExpectedContext( - fragment = "anonfun$select$4", + fragment = "select", callSitePattern = getCurrentClassCallSitePattern)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala index 141ac6eaeae5..6d32d59314b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala @@ -136,7 +136,7 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS "fields" -> ("`file_path`, `file_name`, `file_size`, " + "`file_block_start`, `file_block_length`, `file_modification_time`")), context = ExpectedContext( - fragment = "anonfun$select$4", + fragment = "select", getCurrentClassCallSitePattern)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 1fae08d36a64..f819d0fb61d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1920,7 +1920,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", parameters = Map("objectName" -> "`default`", "proposal" -> "`value`"), context = ExpectedContext( - fragment = "anonfun$select$4", + fragment = "select", callSitePattern = getCurrentClassCallSitePattern)) } } From fd9fa82860e7594050d55ef366af64fa512b4abc Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 10 Jan 2024 17:31:27 +0300 Subject: [PATCH 5/7] Restore some tests --- .../sql/execution/datasources/FileMetadataStructSuite.scala | 5 ++--- .../parquet/ParquetFileMetadataStructRowIndexSuite.scala | 4 +--- .../scala/org/apache/spark/sql/sources/InsertSuite.scala | 5 ++--- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 5288f6b9d61e..6bf72b82564e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -245,9 +245,8 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { }, errorClass = "FIELD_NOT_FOUND", parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"), - context = ExpectedContext( - fragment = "select", - callSitePattern = getCurrentClassCallSitePattern)) + context = + ExpectedContext(fragment = "select", callSitePattern = getCurrentClassCallSitePattern)) } metadataColumnsTest("SPARK-42683: df metadataColumn - schema conflict", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala index 6d32d59314b5..5e59418f8f92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala @@ -135,9 +135,7 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS "fieldName" -> "`row_index`", "fields" -> ("`file_path`, `file_name`, `file_size`, " + "`file_block_start`, `file_block_length`, `file_modification_time`")), - context = ExpectedContext( - fragment = "select", - getCurrentClassCallSitePattern)) + context = ExpectedContext(fragment = "select", getCurrentClassCallSitePattern)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index f819d0fb61d1..76073a108a3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1919,9 +1919,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { }, errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", parameters = Map("objectName" -> "`default`", "proposal" -> "`value`"), - context = ExpectedContext( - fragment = "select", - callSitePattern = getCurrentClassCallSitePattern)) + context = + ExpectedContext(fragment = "select", callSitePattern = getCurrentClassCallSitePattern)) } } From 98503459143d39f3a95ead1b8b045f6fcb35497c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 10 Jan 2024 17:44:15 +0300 Subject: [PATCH 6/7] Revert changes drop duplicates --- .../scala/org/apache/spark/sql/Dataset.scala | 18 +++++++++++------- .../sql/DataFrameSetOperationsSuite.scala | 6 +++++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a3a3f38e8d69..9a7b81bf8124 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3103,9 +3103,11 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { - val groupCols = groupColsFromDropDuplicates(colNames) - Deduplicate(groupCols, logicalPlan) + def dropDuplicates(colNames: Seq[String]): Dataset[T] = withOrigin { + withTypedPlan { + val groupCols = groupColsFromDropDuplicates(colNames) + Deduplicate(groupCols, logicalPlan) + } } /** @@ -3182,10 +3184,12 @@ class Dataset[T] private[sql]( * @group typedrel * @since 3.5.0 */ - def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withTypedPlan { - val groupCols = groupColsFromDropDuplicates(colNames) - // UnsupportedOperationChecker will fail the query if this is called with batch Dataset. - DeduplicateWithinWatermark(groupCols, logicalPlan) + def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withOrigin { + withTypedPlan { + val groupCols = groupColsFromDropDuplicates(colNames) + // UnsupportedOperationChecker will fail the query if this is called with batch Dataset. + DeduplicateWithinWatermark(groupCols, logicalPlan) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index bbb1561bb695..e2dd029d4b10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -374,7 +374,11 @@ class DataFrameSetOperationsSuite extends QueryTest errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", parameters = Map( "colName" -> "`m`", - "dataType" -> "\"MAP\"")) + "dataType" -> "\"MAP\""), + context = ExpectedContext( + fragment = "distinct", + callSitePattern = getCurrentClassCallSitePattern) + ) withTempView("v") { df.createOrReplaceTempView("v") checkError( From 60dda6cde440660bf278a823ac2de4397be09e01 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 11 Jan 2024 11:52:45 +0300 Subject: [PATCH 7/7] Revert "Revert changes drop duplicates" This reverts commit 98503459143d39f3a95ead1b8b045f6fcb35497c. --- .../scala/org/apache/spark/sql/Dataset.scala | 18 +++++++----------- .../sql/DataFrameSetOperationsSuite.scala | 6 +----- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 9a7b81bf8124..a3a3f38e8d69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3103,11 +3103,9 @@ class Dataset[T] private[sql]( * @group typedrel * @since 2.0.0 */ - def dropDuplicates(colNames: Seq[String]): Dataset[T] = withOrigin { - withTypedPlan { - val groupCols = groupColsFromDropDuplicates(colNames) - Deduplicate(groupCols, logicalPlan) - } + def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan { + val groupCols = groupColsFromDropDuplicates(colNames) + Deduplicate(groupCols, logicalPlan) } /** @@ -3184,12 +3182,10 @@ class Dataset[T] private[sql]( * @group typedrel * @since 3.5.0 */ - def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withOrigin { - withTypedPlan { - val groupCols = groupColsFromDropDuplicates(colNames) - // UnsupportedOperationChecker will fail the query if this is called with batch Dataset. - DeduplicateWithinWatermark(groupCols, logicalPlan) - } + def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withTypedPlan { + val groupCols = groupColsFromDropDuplicates(colNames) + // UnsupportedOperationChecker will fail the query if this is called with batch Dataset. + DeduplicateWithinWatermark(groupCols, logicalPlan) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index e2dd029d4b10..bbb1561bb695 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -374,11 +374,7 @@ class DataFrameSetOperationsSuite extends QueryTest errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", parameters = Map( "colName" -> "`m`", - "dataType" -> "\"MAP\""), - context = ExpectedContext( - fragment = "distinct", - callSitePattern = getCurrentClassCallSitePattern) - ) + "dataType" -> "\"MAP\"")) withTempView("v") { df.createOrReplaceTempView("v") checkError(