Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add more
  • Loading branch information
LuciferYang committed Oct 27, 2023
commit e4460d2b18dbf424444df733a57febc7c96a9efc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ object WholeTextFileInputFormatSuite {
private val fileLengths = Array(10, 100, 1000)

private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
filename -> LazyList.continually(testWords.toList.to(LazyList)).flatten.take(upperBound).toArray
}.toMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,6 @@ object WholeTextFileRecordReaderSuite {
private val fileLengths = Array(10, 100, 1000)

private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
filename -> LazyList.continually(testWords.toList.to(LazyList)).flatten.take(upperBound).toArray
}.toMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(sconf)
val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq).
flatMap( x => Stream.range(0, expand_size))
flatMap( x => LazyList.range(0, expand_size))
val persisted = data.persist(StorageLevel.DISK_ONLY)
assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5)
Expand All @@ -47,7 +47,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(sconf)
val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq).
flatMap(x => Stream.range(0, expand_size))
flatMap(x => LazyList.range(0, expand_size))
val persisted = data.persist(StorageLevel.MEMORY_ONLY)
assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5)
Expand All @@ -59,7 +59,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(sconf)
val expand_size = 500
val data = sc.parallelize(Seq(1, 2)).
flatMap(x => Stream.range(1, expand_size).
flatMap(x => LazyList.range(1, expand_size).
map(y => "%d: string test %d".format(y, x)))
val persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
assert(persisted.filter(_.startsWith("1:")).count()===2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2650,7 +2650,7 @@ case class Concat(children: Seq[Expression]) extends ComplexTypeMergingExpressio
}
case ArrayType(elementType, _) =>
input => {
val inputs = children.toStream.map(_.eval(input))
val inputs = children.to(LazyList).map(_.eval(input))
if (inputs.contains(null)) {
null
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
// but if only `b AS y` can be projected we can't return `Seq(SortOrder(y))`.
orderingExpressions.iterator.map { sortOrder =>
val orderingSet = mutable.Set.empty[Expression]
val sameOrderings = sortOrder.children.toStream
val sameOrderings = sortOrder.children.to(LazyList)
.flatMap(projectExpression)
.filter(e => orderingSet.add(e.canonicalized))
.take(aliasCandidateLimit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
case Some(value) => Some(recursiveTransform(value))
case m: Map[_, _] => m
case d: DataType => d // Avoid unpacking Structs
case stream: Stream[_] => stream.map(recursiveTransform).force
case stream: LazyList[_] => stream.map(recursiveTransform).force
case seq: Iterable[_] => seq.map(recursiveTransform)
case other: AnyRef => other
case null => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ class LogicalPlanSuite extends SparkFunSuite {
test("transformExpressions works with a Stream") {
val id1 = NamedExpression.newExprId
val id2 = NamedExpression.newExprId
val plan = Project(Stream(
val plan = Project(LazyList(
Alias(Literal(1), "a")(exprId = id1),
Alias(Literal(2), "b")(exprId = id2)),
OneRowRelation())
val result = plan.transformExpressions {
case Literal(v: Int, IntegerType) if v != 1 =>
Literal(v + 1, IntegerType)
}
val expected = Project(Stream(
val expected = Project(LazyList(
Alias(Literal(1), "a")(exprId = id1),
Alias(Literal(3), "b")(exprId = id2)),
OneRowRelation())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
}

test("transform works on stream of children") {
val before = Coalesce(Stream(Literal(1), Literal(2)))
val before = Coalesce(LazyList(Literal(1), Literal(2)))
// Note it is a bit tricky to exhibit the broken behavior. Basically we want to create the
// situation in which the TreeNode.mapChildren function's change detection is not triggered. A
// stream's first element is typically materialized, so in order to not trip the TreeNode change
Expand All @@ -702,14 +702,14 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
case Literal(v: Int, IntegerType) if v != 1 =>
Literal(v + 1, IntegerType)
}
val expected = Coalesce(Stream(Literal(1), Literal(3)))
val expected = Coalesce(LazyList(Literal(1), Literal(3)))
assert(result === expected)
}

test("withNewChildren on stream of children") {
val before = Coalesce(Stream(Literal(1), Literal(2)))
val result = before.withNewChildren(Stream(Literal(1), Literal(3)))
val expected = Coalesce(Stream(Literal(1), Literal(3)))
val before = Coalesce(LazyList(Literal(1), Literal(2)))
val result = before.withNewChildren(LazyList(Literal(1), Literal(3)))
val expected = Coalesce(LazyList(Literal(1), Literal(3)))
assert(result === expected)
}

Expand Down Expand Up @@ -1075,10 +1075,10 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
a_or_b.getOrElse(
// Besides returning the alternatives for the first encounter, also set up a mechanism to
// update the cache when the new alternatives are requested.
Stream(Literal(1), Literal(2)).map { x =>
LazyList(Literal(1), Literal(2)).map { x =>
a_or_b = Some(Seq(x))
x
}.append {
}.lazyAppendedAll {
a_or_b = None
Seq.empty
})
Expand All @@ -1094,19 +1094,19 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
val transformed2 = e.multiTransformDown {
case StringLiteral("a") | StringLiteral("b") =>
a_or_b.getOrElse(
Stream(Literal(1), Literal(2)).map { x =>
LazyList(Literal(1), Literal(2)).map { x =>
a_or_b = Some(Seq(x))
x
}.append {
}.lazyAppendedAll {
a_or_b = None
Seq.empty
})
case StringLiteral("c") | StringLiteral("d") =>
c_or_d.getOrElse(
Stream(Literal(10), Literal(20)).map { x =>
LazyList(Literal(10), Literal(20)).map { x =>
c_or_d = Some(Seq(x))
x
}.append {
}.lazyAppendedAll {
c_or_d = None
Seq.empty
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class RelationalGroupedDataset protected[sql](
val aggregates = if (df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {
groupingExprs match {
// call `toList` because `Stream` can't serialize in scala 2.13
case s: Stream[Expression] => s.toList ++ aggExprs
case s: LazyList[Expression] => s.toList ++ aggExprs
case other => other ++ aggExprs
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
projectExpression(e)
.filter(e => partitioningSet.add(e.canonicalized))
.take(aliasCandidateLimit)
.asInstanceOf[Stream[Partitioning]]
.asInstanceOf[LazyList[Partitioning]]
case o => Seq(o)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ trait CodegenSupport extends SparkPlan {
}

val inputVars = inputVarsCandidate match {
case stream: Stream[ExprCode] => stream.force
case stream: LazyList[ExprCode] => stream.force
case other => other
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ case class BroadcastHashJoinExec(
PartitioningCollection(partitioning.multiTransformDown {
case e: Expression if streamedKeyToBuildKeyMapping.contains(e.canonicalized) =>
e +: streamedKeyToBuildKeyMapping(e.canonicalized)
}.asInstanceOf[Stream[HashPartitioning]]
}.asInstanceOf[LazyList[HashPartitioning]]
.take(conf.broadcastHashJoinOutputPartitioningExpandLimit))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,7 @@ class DataFrameAggregateSuite extends QueryTest
}

test("SPARK-38221: group by stream of complex expressions should not fail") {
val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id")
val df = Seq(1).toDF("id").groupBy(LazyList($"id" + 1, $"id" + 2): _*).sum("id")
checkAnswer(df, Row(2, 3, 1))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
).toDF("a", "b", "c")

val w = Window.partitionBy("a").orderBy("b")
val selectExprs = Stream(
val selectExprs = LazyList(
sum("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("sumc"),
avg("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("avgc")
)
Expand Down
13 changes: 6 additions & 7 deletions sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql

import java.util.concurrent.LinkedBlockingQueue

import scala.collection.immutable.Stream
import scala.sys.process._
import scala.util.Try

Expand All @@ -46,7 +45,7 @@ object BlockingLineStream {
private final class BlockingStreamed[T](
val process: T => Unit,
val done: Int => Unit,
val stream: () => Stream[T])
val stream: () => LazyList[T])

// See scala.sys.process.Streamed
private object BlockingStreamed {
Expand All @@ -57,11 +56,11 @@ object BlockingLineStream {
def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)

def next(): Stream[T] = q.take match {
case Left(0) => Stream.empty
def next(): LazyList[T] = q.take match {
case Left(0) => LazyList.empty
case Left(code) =>
if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
case Right(s) => Stream.cons(s, next())
if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else LazyList.empty
case Right(s) => LazyList.cons(s, next())
}

new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
Expand All @@ -79,7 +78,7 @@ object BlockingLineStream {
}
}

def apply(command: Seq[String]): Stream[String] = {
def apply(command: Seq[String]): LazyList[String] = {
val streamed = BlockingStreamed[String](true)
val process = command.run(BasicIO(false, streamed.process, None))
Spawn(streamed.done(process.exitValue()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
test("SPARK-38528: generator in stream of aggregate expressions") {
val df = Seq(1, 2, 3).toDF("v")
checkAnswer(
df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
df.select(LazyList(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
Row(1, 6) :: Row(3, 6) :: Nil)
}

Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1714,7 +1714,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val dsA = Seq((1, "a")).toDF("id", "c1")
val dsB = Seq((2, "b")).toDF("id", "c2")
val dsC = Seq((3, "c")).toDF("id", "c3")
val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, Stream("id"), "full_outer")
val joined = dsA.join(dsB, LazyList("id"), "full_outer").join(dsC, LazyList("id"), "full_outer")

val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), Row(3, null, null, "c"))

Expand All @@ -1723,7 +1723,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with invalid access") {
val ds = Seq((1, "a")).toDF("id", "c1")
val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, Stream("id"), "full_outer")
val joined = ds.join(ds, LazyList("id"), "full_outer").join(ds, LazyList("id"), "full_outer")

val expected = Seq(Row(1, "a", "a", "a"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}

test("SPARK-24500: create union with stream of children") {
val df = Union(Stream(
val df = Union(LazyList(
Range(1, 1, 1, 1),
Range(1, 2, 1, 1)))
df.queryExecution.executedPlan.execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class SortSuite extends SparkPlanTest with SharedSparkSession {
)
checkAnswer(
input.toDF("a", "b", "c"),
(child: SparkPlan) => SortExec(Stream($"a".asc, $"b".asc, $"c".asc),
(child: SparkPlan) => SortExec(LazyList($"a".asc, $"b".asc, $"c".asc),
global = true, child = child),
input.sortBy(t => (t._1, t._2, t._3)).map(Row.fromTuple),
sortAnswers = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,14 +776,14 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
val b = Seq((1, "a")).toDF("key", "value")
val c = Seq(1).toDF("key")

val ab = a.join(b, Stream("key"), "left")
val ab = a.join(b, LazyList("key"), "left")
val abc = ab.join(c, Seq("key"), "left")

checkAnswer(abc, Row(1, "a"))
}

test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") {
val groupByCols = Stream(col("key"))
val groupByCols = LazyList(col("key"))
val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value")
.groupBy(groupByCols: _*)
.max("value")
Expand Down