Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' of github.com:apache/spark into streaming-closu…
…re-cleaner

Conflicts:
	core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
  • Loading branch information
Andrew Or committed May 3, 2015
commit eed339074476ee578a56acf736cbc61e06db119b
66 changes: 65 additions & 1 deletion core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,75 @@ private[spark] object ClosureCleaner extends Logging {
}
}

def clean(func: AnyRef, checkSerializable: Boolean = true) {
/**
* Clean the given closure in place.
*
* More specifically, this renders the given closure serializable as long as it does not
* explicitly reference unserializable objects.
*
* @param closure the closure to clean
* @param checkSerializable whether to verify that the closure is serializable after cleaning
* @param cleanTransitively whether to clean enclosing closures transitively
*/
def clean(
closure: AnyRef,
checkSerializable: Boolean = true,
cleanTransitively: Boolean = true): Unit = {
clean(closure, checkSerializable, cleanTransitively, Map.empty)
}

/**
* Helper method to clean the given closure in place.
*
* The mechanism is to traverse the hierarchy of enclosing closures and null out any
* references along the way that are not actually used by the starting closure, but are
* nevertheless included in the compiled anonymous classes. Note that it is unsafe to
* simply mutate the enclosing closures in place, as other code paths may depend on them.
* Instead, we clone each enclosing closure and set the parent pointers accordingly.
*
* By default, closures are cleaned transitively. This means we detect whether enclosing
* objects are actually referenced by the starting one, either directly or transitively,
* and, if not, sever these closures from the hierarchy. In other words, in addition to
* nulling out unused field references, we also null out any parent pointers that refer
* to enclosing objects not actually needed by the starting closure. We determine
* transitivity by tracing through the tree of all methods ultimately invoked by the
* inner closure and record all the fields referenced in the process.
*
* For instance, transitive cleaning is necessary in the following scenario:
*
* class SomethingNotSerializable {
* def someValue = 1
* def scope(name: String)(body: => Unit) = body
* def someMethod(): Unit = scope("one") {
* def x = someValue
* def y = 2
* scope("two") { println(y + 1) }
* }
* }
*
* In this example, scope "two" is not serializable because it references scope "one", which
* references SomethingNotSerializable. Note that, however, the body of scope "two" does not
* actually depend on SomethingNotSerializable. This means we can safely null out the parent
* pointer of a cloned scope "one" and set it the parent of scope "two", such that scope "two"
* no longer references SomethingNotSerializable transitively.
*
* @param func the starting closure to clean
* @param checkSerializable whether to verify that the closure is serializable after cleaning
* @param cleanTransitively whether to clean enclosing closures transitively
* @param accessedFields a map from a class to a set of its fields that are accessed by
* the starting closure
*/
private def clean(
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]): Unit = {

if (!isClosure(func.getClass)) {
throw new IllegalArgumentException("Expected a closure; got " + func.getClass.getName)
}

// TODO: clean all inner closures first. This requires us to find the inner objects.
// TODO: cache outerClasses / innerClasses / accessedFields

if (func == null) {
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.