Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ displayTitle: Spark SQL Upgrading Guide
- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.

- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.

- In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the key is non-struct type, e.g. int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behaviour is preserved under a newly added configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a default value of `false`.

## Upgrading From Spark SQL 2.3 to 2.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE =
buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue")
.internal()
.doc("When set to true, the key attribute resulted from running `Dataset.groupByKey` " +
"for non-struct key type, will be named as `value`, following the behavior of Spark " +
"version 2.4 and earlier.")
.booleanConf
.createWithDefault(false)

val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields")
.doc("Maximum number of fields of sequence-like entries can be converted to strings " +
"in debug output. Any elements beyond the limit will be dropped and replaced by a" +
Expand Down Expand Up @@ -2016,6 +2025,9 @@ class SQLConf extends Serializable with Logging {

def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)

def nameNonStructGroupingKeyAsValue: Boolean =
getConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE)

def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateStruct
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.expressions.ReduceAggregator
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}

/**
Expand Down Expand Up @@ -459,7 +460,11 @@ class KeyValueGroupedDataset[K, V] private[sql](
columns.map(_.withInputType(vExprEnc, dataAttributes).named)
val keyColumn = if (!kExprEnc.isSerializedAsStruct) {
assert(groupingAttributes.length == 1)
groupingAttributes.head
if (SQLConf.get.nameNonStructGroupingKeyAsValue) {
groupingAttributes.head
} else {
Alias(groupingAttributes.head, "key")()
}
} else {
Alias(CreateStruct(groupingAttributes), "key")()
}
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
checkDatasetUnorderly(agg, ((1, 2), 1L, 3L), ((2, 3), 2L, 4L), ((3, 4), 3L, 5L))
}

test("SPARK-26085: fix key attribute name for atomic type for typed aggregation") {
val ds = Seq(1, 2, 3).toDS()
assert(ds.groupByKey(x => x).count().schema.head.name == "key")

// Enable legacy flag to follow previous Spark behavior
withSQLConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE.key -> "true") {
assert(ds.groupByKey(x => x).count().schema.head.name == "value")
}
}

test("SPARK-8288: class with only a companion object constructor") {
val data = Seq(ScroogeLikeExample(1), ScroogeLikeExample(2))
val ds = data.toDS
Expand Down