@@ -64,21 +64,25 @@ trait Channel[@spec(Int, Long, Double) T] {
6464
6565 /** Checks if this channel was terminated.
6666 *
67- * A channel is terminated if it is sealed and all its reactives are unreacted.
67+ * A channel is terminated if it is sealed and all its reactives are
68+ * unreacted.
6869 *
6970 * @return `true` if the channel is terminated, `false` otherwise
7071 */
7172 def isTerminated : Boolean
7273
7374 /** Composes this channel with a custom mapping function for the input events.
7475 *
75- * Events from reactives passed to this channel are mapped inside their isolates.
76+ * Events from reactives passed to this channel are mapped inside their
77+ * isolates.
7678 *
7779 * @tparam S type of the events the new channel will accept
78- * @param f maps events in the resulting channel to events of the original channel
80+ * @param f maps events in the resulting channel to events of the
81+ * original channel
7982 * @return the new channel accepting events of type `S`
8083 */
81- def compose [@ spec(Int , Long , Double ) S ](f : S => T ) = new Channel .Composed (this , f)
84+ def compose [@ spec(Int , Long , Double ) S ](f : S => T ) =
85+ new Channel .Composed (this , f)
8286
8387 /** Creates and attaches a reactive emitter to the channel.
8488 *
@@ -141,7 +145,8 @@ trait Channel[@spec(Int, Long, Double) T] {
141145 */
142146object Channel {
143147
144- private [reactive] class Composed [@ spec(Int , Long , Double ) T , @ spec(Int , Long , Double ) S ]
148+ private [reactive] class Composed
149+ [@ spec(Int , Long , Double ) T , @ spec(Int , Long , Double ) S ]
145150 (val self : Channel [T ], val f : S => T )
146151 extends Channel [S ] {
147152 def attach (r : Reactive [S ]): Channel [S ] = {
@@ -164,14 +169,16 @@ object Channel {
164169 * @param reactor the reactor notified of this channel's events
165170 * @param monitor private monitor object used for synchronization
166171 */
167- class Synced [@ spec(Int , Long , Double ) T ](val reactor : Reactor [T ], val monitor : util.Monitor )
172+ class Synced [@ spec(Int , Long , Double ) T ]
173+ (val reactor : Reactor [T ], val monitor : util.Monitor )
168174 extends Channel [T ] {
169175 private var sealedChannel = false
170176 private val reactives = mutable.Map [Reactive [T ], Reactive .Subscription ]()
171177 def attach (r : Reactive [T ]) = monitor.synchronized {
172178 if (! sealedChannel) {
173179 if (! reactives.contains(r)) reactives(r) = r.observe(new Reactor [T ] {
174180 def react (event : T ) = reactor.react(event)
181+ def except (t : Throwable ) = reactor.except(t)
175182 def unreact () {
176183 monitor.synchronized { reactives.remove(r) }
177184 checkTerminated()
0 commit comments