Skip to content
Prev Previous commit
Remove cogroupSorted tests
  • Loading branch information
EnricoMi committed Jun 30, 2023
commit cc5773cdd2f91c05ca5129a627f8ffed92cb845f
36 changes: 0 additions & 36 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -930,42 +930,6 @@ class DatasetSuite extends QueryTest
assert(actual === Seq((0, 0), (1, 1), (2, 2)))
}

test("SPARK-42132: cogroup with sorted and same plan on both sides") {
val df = spark.range(3).join(spark.range(2)).as[(Long, Long)]

val left_grouped_df = df.groupBy("id").as[Long, (Long, Long)]
val right_grouped_df = df.groupBy("id").as[Long, (Long, Long)]

val cogroup_df = left_grouped_df.cogroupSorted(right_grouped_df)($"id")($"id".desc) {
case (key, left, right) => left.zip(right)
}

val actual = cogroup_df.sort().collect()
assert(actual === Seq(
((0, 0), (0, 1)), ((0, 1), (0, 0)),
((1, 0), (1, 1)), ((1, 1), (1, 0)),
((2, 0), (2, 1)), ((2, 1), (2, 0))
))
}

test("SPARK-42132: cogroup groupby function with sorted and same plan on both sides") {
val df = spark.range(3).join(spark.range(2)).as[(Long, Long)]

val left_grouped_df = df.groupByKey(_._1)
val right_grouped_df = df.groupByKey(_._1)

val cogroup_df = left_grouped_df.cogroupSorted(right_grouped_df)($"id")($"id".desc) {
case (key, left, right) => left.zip(right)
}

val actual = cogroup_df.sort().collect()
assert(actual === Seq(
((0, 0), (0, 1)), ((0, 1), (0, 0)),
((1, 0), (1, 1)), ((1, 1), (1, 0)),
((2, 0), (2, 1)), ((2, 1), (2, 0))
))
}

test("SPARK-34806: observation on datasets") {
val namedObservation = Observation("named")
val unnamedObservation = Observation()
Expand Down