Skip to content

Commit 61ec3bd

Browse files
Hisoka-Xsunchao
authored andcommitted
[SPARK-41471][SQL] Reduce Spark shuffle when only one side of a join is KeyGroupedPartitioning
### What changes were proposed in this pull request? When only one side of a SPJ (Storage-Partitioned Join) is KeyGroupedPartitioning, Spark currently needs to shuffle both sides using HashPartitioning. However, we may just need to shuffle the other side according to the partition transforms defined in KeyGroupedPartitioning. This is especially useful when the other side is relatively small. 1. Add new config `spark.sql.sources.v2.bucketing.shuffle.enabled` to control this feature enable or not. 2. Add `KeyGroupedPartitioner` use to partition when we know the tranform value of another side (KeyGroupedPartitioning at now). Spark already know the partition value with partition id of KeyGroupedPartitioning side in `EnsureRequirements`. Then save it in `KeyGroupedPartitioner` use to shuffle another partition, to make sure the same key data will shuffle into same partition. 3. only `identity` transform will work now. Because have another problem for now, same transform between DS V2 connector implement and catalog function will report different value, before solve this problem, we should only support `identity`. eg: in test package, `YearFunction` https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala#L47 and https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala#L143 ### Why are the changes needed? Reduce data shuffle in specific SPJ scenarios ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test Closes apache#42194 from Hisoka-X/SPARK-41471_one_side_keygroup. Authored-by: Jia Fan <fanjiaeminem@qq.com> Signed-off-by: Chao Sun <sunchao@apple.com>
1 parent 0e494cd commit 61ec3bd

File tree

8 files changed

+263
-5
lines changed

8 files changed

+263
-5
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ private[spark] class PartitionIdPassthrough(override val numPartitions: Int) ext
137137
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
138138
}
139139

140+
/**
141+
* A [[org.apache.spark.Partitioner]] that partitions all records using partition value map.
142+
* The `valueMap` is a map that contains tuples of (partition value, partition id). It is generated
143+
* by [[org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning]], used to partition
144+
* the other side of a join to make sure records with same partition value are in the same
145+
* partition.
146+
*/
147+
private[spark] class KeyGroupedPartitioner(
148+
valueMap: mutable.Map[Seq[Any], Int],
149+
override val numPartitions: Int) extends Partitioner {
150+
override def getPartition(key: Any): Int = {
151+
val keys = key.asInstanceOf[Seq[Any]]
152+
valueMap.getOrElseUpdate(keys, Utils.nonNegativeMod(keys.hashCode, numPartitions))
153+
}
154+
}
155+
140156
/**
141157
* A [[org.apache.spark.Partitioner]] that partitions all records into a single partition.
142158
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,13 @@ case class KeyGroupedShuffleSpec(
780780
case _ => false
781781
}
782782

783-
override def canCreatePartitioning: Boolean = false
783+
override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&
784+
// Only support partition expressions are AttributeReference for now
785+
partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
786+
787+
override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
788+
KeyGroupedPartitioning(clustering, partitioning.numPartitions, partitioning.partitionValues)
789+
}
784790
}
785791

786792
case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ object InternalRowComparableWrapper {
7979
rightPartitioning.partitionValues
8080
.map(new InternalRowComparableWrapper(_, partitionDataTypes))
8181
.foreach(partition => partitionsSet.add(partition))
82-
partitionsSet.map(_.row).toSeq
82+
// SPARK-41471: We keep to order of partitions to make sure the order of
83+
// partitions is deterministic in different case.
84+
val partitionOrdering: Ordering[InternalRow] = {
85+
RowOrdering.createNaturalAscendingOrdering(partitionDataTypes)
86+
}
87+
partitionsSet.map(_.row).toSeq.sorted(partitionOrdering)
8388
}
8489
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,6 +1482,16 @@ object SQLConf {
14821482
.booleanConf
14831483
.createWithDefault(false)
14841484

1485+
val V2_BUCKETING_SHUFFLE_ENABLED =
1486+
buildConf("spark.sql.sources.v2.bucketing.shuffle.enabled")
1487+
.doc("During a storage-partitioned join, whether to allow to shuffle only one side." +
1488+
"When only one side is KeyGroupedPartitioning, if the conditions are met, spark will " +
1489+
"only shuffle the other side. This optimization will reduce the amount of data that " +
1490+
s"needs to be shuffle. This config requires ${V2_BUCKETING_ENABLED.key} to be enabled")
1491+
.version("4.0.0")
1492+
.booleanConf
1493+
.createWithDefault(false)
1494+
14851495
val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
14861496
.doc("The maximum number of buckets allowed.")
14871497
.version("2.4.0")
@@ -4770,6 +4780,9 @@ class SQLConf extends Serializable with Logging {
47704780
def v2BucketingPartiallyClusteredDistributionEnabled: Boolean =
47714781
getConf(SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED)
47724782

4783+
def v2BucketingShuffleEnabled: Boolean =
4784+
getConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED)
4785+
47734786
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
47744787
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
47754788

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ case class BatchScanExec(
153153
if (spjParams.commonPartitionValues.isDefined &&
154154
spjParams.applyPartialClustering) {
155155
// A mapping from the common partition values to how many splits the partition
156-
// should contain. Note this no longer maintain the partition key ordering.
156+
// should contain.
157157
val commonPartValuesMap = spjParams.commonPartitionValues
158158
.get
159159
.map(t => (InternalRowComparableWrapper(t._1, p.expressions), t._2))

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.exchange
1919

2020
import java.util.function.Supplier
2121

22+
import scala.collection.mutable
2223
import scala.concurrent.Future
2324

2425
import org.apache.spark._
@@ -29,6 +30,7 @@ import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProces
2930
import org.apache.spark.shuffle.sort.SortShuffleManager
3031
import org.apache.spark.sql.catalyst.InternalRow
3132
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
33+
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
3234
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
3335
import org.apache.spark.sql.catalyst.plans.logical.Statistics
3436
import org.apache.spark.sql.catalyst.plans.physical._
@@ -299,6 +301,11 @@ object ShuffleExchangeExec {
299301
ascending = true,
300302
samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition)
301303
case SinglePartition => new ConstantPartitioner
304+
case k @ KeyGroupedPartitioning(expressions, n, _) =>
305+
val valueMap = k.uniquePartitionValues.zipWithIndex.map {
306+
case (partition, index) => (partition.toSeq(expressions.map(_.dataType)), index)
307+
}.toMap
308+
new KeyGroupedPartitioner(mutable.Map(valueMap.toSeq: _*), n)
302309
case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning")
303310
// TODO: Handle BroadcastPartitioning.
304311
}
@@ -325,6 +332,8 @@ object ShuffleExchangeExec {
325332
val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
326333
row => projection(row)
327334
case SinglePartition => identity
335+
case KeyGroupedPartitioning(expressions, _, _) =>
336+
row => bindReferences(expressions, outputAttributes).map(_.eval(row))
328337
case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning")
329338
}
330339

sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,187 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
10381038
}
10391039
}
10401040

1041+
test("SPARK-41471: shuffle one side: only one side reports partitioning") {
1042+
val items_partitions = Array(identity("id"))
1043+
createTable(items, items_schema, items_partitions)
1044+
1045+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
1046+
"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
1047+
"(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
1048+
"(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
1049+
1050+
createTable(purchases, purchases_schema, Array.empty)
1051+
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
1052+
"(1, 42.0, cast('2020-01-01' as timestamp)), " +
1053+
"(3, 19.5, cast('2020-02-01' as timestamp))")
1054+
1055+
Seq(true, false).foreach { shuffle =>
1056+
withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) {
1057+
val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " +
1058+
s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
1059+
"ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
1060+
1061+
val shuffles = collectShuffles(df.queryExecution.executedPlan)
1062+
if (shuffle) {
1063+
assert(shuffles.size == 1, "only shuffle one side not report partitioning")
1064+
} else {
1065+
assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" +
1066+
" is not enabled")
1067+
}
1068+
1069+
checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5)))
1070+
}
1071+
}
1072+
}
1073+
1074+
test("SPARK-41471: shuffle one side: shuffle side has more partition value") {
1075+
val items_partitions = Array(identity("id"))
1076+
createTable(items, items_schema, items_partitions)
1077+
1078+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
1079+
"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
1080+
"(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
1081+
"(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
1082+
1083+
createTable(purchases, purchases_schema, Array.empty)
1084+
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
1085+
"(1, 42.0, cast('2020-01-01' as timestamp)), " +
1086+
"(3, 19.5, cast('2020-02-01' as timestamp)), " +
1087+
"(5, 26.0, cast('2023-01-01' as timestamp)), " +
1088+
"(6, 50.0, cast('2023-02-01' as timestamp))")
1089+
1090+
Seq(true, false).foreach { shuffle =>
1091+
withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) {
1092+
Seq("JOIN", "LEFT OUTER JOIN", "RIGHT OUTER JOIN", "FULL OUTER JOIN").foreach { joinType =>
1093+
val df = sql(s"SELECT id, name, i.price as purchase_price, p.price as sale_price " +
1094+
s"FROM testcat.ns.$items i $joinType testcat.ns.$purchases p " +
1095+
"ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
1096+
1097+
val shuffles = collectShuffles(df.queryExecution.executedPlan)
1098+
if (shuffle) {
1099+
assert(shuffles.size == 1, "only shuffle one side not report partitioning")
1100+
} else {
1101+
assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one " +
1102+
"side is not enabled")
1103+
}
1104+
joinType match {
1105+
case "JOIN" =>
1106+
checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5)))
1107+
case "LEFT OUTER JOIN" =>
1108+
checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5),
1109+
Row(4, "cc", 15.5, null)))
1110+
case "RIGHT OUTER JOIN" =>
1111+
checkAnswer(df, Seq(Row(null, null, null, 26.0), Row(null, null, null, 50.0),
1112+
Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5)))
1113+
case "FULL OUTER JOIN" =>
1114+
checkAnswer(df, Seq(Row(null, null, null, 26.0), Row(null, null, null, 50.0),
1115+
Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5),
1116+
Row(4, "cc", 15.5, null)))
1117+
}
1118+
}
1119+
}
1120+
}
1121+
}
1122+
1123+
test("SPARK-41471: shuffle one side: only one side reports partitioning with two identity") {
1124+
val items_partitions = Array(identity("id"), identity("arrive_time"))
1125+
createTable(items, items_schema, items_partitions)
1126+
1127+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
1128+
"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
1129+
"(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
1130+
"(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
1131+
1132+
createTable(purchases, purchases_schema, Array.empty)
1133+
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
1134+
"(1, 42.0, cast('2020-01-01' as timestamp)), " +
1135+
"(3, 19.5, cast('2020-02-01' as timestamp))")
1136+
1137+
Seq(true, false).foreach { shuffle =>
1138+
withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) {
1139+
val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " +
1140+
s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
1141+
"ON i.id = p.item_id and i.arrive_time = p.time ORDER BY id, purchase_price, sale_price")
1142+
1143+
val shuffles = collectShuffles(df.queryExecution.executedPlan)
1144+
if (shuffle) {
1145+
assert(shuffles.size == 1, "only shuffle one side not report partitioning")
1146+
} else {
1147+
assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" +
1148+
" is not enabled")
1149+
}
1150+
1151+
checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0)))
1152+
}
1153+
}
1154+
}
1155+
1156+
test("SPARK-41471: shuffle one side: partitioning with transform") {
1157+
val items_partitions = Array(years("arrive_time"))
1158+
createTable(items, items_schema, items_partitions)
1159+
1160+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
1161+
"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
1162+
"(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
1163+
"(4, 'cc', 15.5, cast('2021-02-01' as timestamp))")
1164+
1165+
createTable(purchases, purchases_schema, Array.empty)
1166+
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
1167+
"(1, 42.0, cast('2020-01-01' as timestamp)), " +
1168+
"(3, 19.5, cast('2021-02-01' as timestamp))")
1169+
1170+
Seq(true, false).foreach { shuffle =>
1171+
withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString) {
1172+
val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " +
1173+
s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
1174+
"ON i.arrive_time = p.time ORDER BY id, purchase_price, sale_price")
1175+
1176+
val shuffles = collectShuffles(df.queryExecution.executedPlan)
1177+
if (shuffle) {
1178+
assert(shuffles.size == 2, "partitioning with transform not work now")
1179+
} else {
1180+
assert(shuffles.size == 2, "should add two side shuffle when bucketing shuffle one side" +
1181+
" is not enabled")
1182+
}
1183+
1184+
checkAnswer(df, Seq(
1185+
Row(1, "aa", 40.0, 42.0),
1186+
Row(3, "bb", 10.0, 42.0),
1187+
Row(4, "cc", 15.5, 19.5)))
1188+
}
1189+
}
1190+
}
1191+
1192+
test("SPARK-41471: shuffle one side: work with group partition split") {
1193+
val items_partitions = Array(identity("id"))
1194+
createTable(items, items_schema, items_partitions)
1195+
1196+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
1197+
"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
1198+
"(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
1199+
"(4, 'cc', 15.5, cast('2020-02-01' as timestamp))")
1200+
1201+
createTable(purchases, purchases_schema, Array.empty)
1202+
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
1203+
"(1, 42.0, cast('2020-01-01' as timestamp)), " +
1204+
"(3, 19.5, cast('2020-02-01' as timestamp)), " +
1205+
"(5, 26.0, cast('2023-01-01' as timestamp)), " +
1206+
"(6, 50.0, cast('2023-02-01' as timestamp))")
1207+
1208+
Seq(true, false).foreach { shuffle =>
1209+
withSQLConf(
1210+
SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> shuffle.toString,
1211+
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
1212+
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "true") {
1213+
val df = sql("SELECT id, name, i.price as purchase_price, p.price as sale_price " +
1214+
s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
1215+
"ON i.id = p.item_id ORDER BY id, purchase_price, sale_price")
1216+
1217+
checkAnswer(df, Seq(Row(1, "aa", 40.0, 42.0), Row(3, "bb", 10.0, 19.5)))
1218+
}
1219+
}
1220+
}
1221+
10411222
test("SPARK-44641: duplicated records when SPJ is not triggered") {
10421223
val items_partitions = Array(bucket(8, "id"))
10431224
createTable(items, items_schema, items_partitions)

sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818
package org.apache.spark.sql.execution.exchange
1919

2020
import org.apache.spark.api.python.PythonEvalType
21+
import org.apache.spark.sql.catalyst.InternalRow
2122
import org.apache.spark.sql.catalyst.expressions._
2223
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
24+
import org.apache.spark.sql.catalyst.optimizer.BuildRight
2325
import org.apache.spark.sql.catalyst.plans.Inner
24-
import org.apache.spark.sql.catalyst.plans.physical._
26+
import org.apache.spark.sql.catalyst.plans.physical.{SinglePartition, _}
2527
import org.apache.spark.sql.catalyst.statsEstimation.StatsTestPlan
2628
import org.apache.spark.sql.connector.catalog.functions._
2729
import org.apache.spark.sql.execution.{DummySparkPlan, SortExec}
2830
import org.apache.spark.sql.execution.SparkPlan
29-
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
31+
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
3032
import org.apache.spark.sql.execution.python.FlatMapCoGroupsInPandasExec
3133
import org.apache.spark.sql.execution.window.WindowExec
3234
import org.apache.spark.sql.internal.SQLConf
@@ -1109,6 +1111,32 @@ class EnsureRequirementsSuite extends SharedSparkSession {
11091111
}
11101112
}
11111113

1114+
test("SPARK-41471: shuffle right side when" +
1115+
" spark.sql.sources.v2.bucketing.shuffle.enabled is true") {
1116+
withSQLConf(SQLConf.V2_BUCKETING_SHUFFLE_ENABLED.key -> "true") {
1117+
1118+
val a1 = AttributeReference("a1", IntegerType)()
1119+
1120+
val partitionValue = Seq(50, 51, 52).map(v => InternalRow.fromSeq(Seq(v)))
1121+
val plan1 = DummySparkPlan(outputPartitioning = KeyGroupedPartitioning(
1122+
identity(a1) :: Nil, 4, partitionValue))
1123+
val plan2 = DummySparkPlan(outputPartitioning = SinglePartition)
1124+
1125+
val smjExec = ShuffledHashJoinExec(
1126+
a1 :: Nil, a1 :: Nil, Inner, BuildRight, None, plan1, plan2)
1127+
EnsureRequirements.apply(smjExec) match {
1128+
case ShuffledHashJoinExec(_, _, _, _, _,
1129+
DummySparkPlan(_, _, left: KeyGroupedPartitioning, _, _),
1130+
ShuffleExchangeExec(KeyGroupedPartitioning(attrs, 4, pv),
1131+
DummySparkPlan(_, _, SinglePartition, _, _), _, _), _) =>
1132+
assert(left.expressions == a1 :: Nil)
1133+
assert(attrs == a1 :: Nil)
1134+
assert(partitionValue == pv)
1135+
case other => fail(other.toString)
1136+
}
1137+
}
1138+
}
1139+
11121140
test("SPARK-42168: FlatMapCoGroupInPandas and Window function with differing key order") {
11131141
val lKey = AttributeReference("key", IntegerType)()
11141142
val lKey2 = AttributeReference("key2", IntegerType)()

0 commit comments

Comments
 (0)