Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Removing potentially dangerous method
  • Loading branch information
MaxGekk committed Aug 30, 2018
commit 6fd4c0099c85df240ef7fb9d915f0b20c06b0040
32 changes: 4 additions & 28 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,36 +284,12 @@ private[spark] object ThreadUtils {
try {
implicit val ec = ExecutionContext.fromExecutor(pool)

parmap(in)(f)
val futures = in.map(x => Future(f(x)))
val futureSeq = Future.sequence(futures)

awaitResult(futureSeq, Duration.Inf)
} finally {
pool.shutdownNow()
}
}

/**
* Transforms input collection by applying the given function to each element in parallel fashion.
* Comparing to the map() method of Scala parallel collections, this method can be interrupted
* at any time. This is useful on canceling of task execution, for example.
*
* @param in - the input collection which should be transformed in parallel.
* @param f - the lambda function will be applied to each element of `in`.
* @param ec - an execution context for parallel applying of the given function `f`.
* @tparam I - the type of elements in the input collection.
* @tparam O - the type of elements in resulted collection.
* @return new collection in which each element was given from the input collection `in` by
* applying the lambda function `f`.
*/
def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]
(in: Col[I])
(f: I => O)
(implicit
cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]], // for Future.sequence
ec: ExecutionContext
): Col[O] = {
val futures = in.map(x => Future(f(x)))
val futureSeq = Future.sequence(futures)

awaitResult(futureSeq, Duration.Inf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,9 @@ private[streaming] object FileBasedWriteAheadLog {
implicit val ec = executionContext
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this line is not needed


source.grouped(groupSize).flatMap { group =>
ThreadUtils.parmap(group)(handler)
val parallelCollection = group.par
parallelCollection.tasksupport = taskSupport
parallelCollection.map(handler)
}.flatten
}
}