Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix docs
  • Loading branch information
zsxwing committed Jan 14, 2015
commit 37f79c677b8e91ed1d0bdc3e92cfca65b115728c
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/util/EventLoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import org.apache.spark.Logging
/**
* An event loop to receive events from the caller and process all events in the event thread. It
* will start an exclusive event thread to process all events.
*
* Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
* handle events in time to avoid the potential OOM.
*/
private[spark] abstract class EventLoop[E](name: String) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark as DeveloperAPI?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a private class. We only need to mark developer API's for exposed classes.


Expand Down Expand Up @@ -84,17 +87,17 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging {
def isActive: Boolean = eventThread.isAlive

/**
* Invoke when `start()` is called. It's also invoked before the event thread starts.
* Invoked when `start()` is called but before the event thread starts.
*/
def onStart(): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change "invoke" to "invoked" here and below; otherwise, it sounds like it is the subclass which should invoke this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it supposed to be called from outside of the class? I mean, shall we tighten the access permission to protected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.


/**
* Invoke when `stop()` is called and the event thread exits.
* Invoked when `stop()` is called and the event thread exits.
*/
def onStop(): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here


/**
* Invoke in the event thread when polling events from the event queue.
* Invoked in the event thread when polling events from the event queue.
*
* Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
* and cannot process events in time. If you want to call some blocking actions, run them in
Expand All @@ -103,7 +106,8 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging {
def onReceive(event: E): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since onReceive is supposed to be called only by eventThread, we'd better not expose it to the outside of the class


/**
* Invoke if `onReceive` throws any non fatal error. `onError` must not throw any non fatal error.
* Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
* will be ignored.
*/
def onError(e: Throwable): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as onReceive


Expand Down