Skip to content

Commit e9533e6

Browse files
author
Cantor Chron
committed
Automatic channel registration and unregistration in the iso system.
1 parent 449e341 commit e9533e6

File tree

9 files changed

+102
-80
lines changed

9 files changed

+102
-80
lines changed

src/main/scala/scala/reactive/Iso.scala

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -96,26 +96,6 @@ trait Iso[@spec(Int, Long, Double) T] extends ReactRecord {
9696
*/
9797
react <<= frame.internalConnector.events.collect({ case e: SysEvent => e }).pipe(systemEmitter)
9898

99-
/** Opens a new channel for this isolate.
100-
*
101-
* @tparam Q type of the events in the new channel
102-
* @param factory event queue factory
103-
* @return the connector object of the new channel
104-
*/
105-
final def open[@spec(Int, Long, Double) Q: Arrayable](implicit factory: EventQueue.Factory = frame.eventQueueFactory): Connector[Q] =
106-
Iso.openChannel[Q](frame, factory, false)
107-
108-
/** Opens a new daemon channel for this isolate.
109-
*
110-
* Daemon channels do not affect isolate termination.
111-
*
112-
* @tparam Q type of the events in the new channel
113-
* @param factory event queue factory
114-
* @return the connector object of the new channel
115-
*/
116-
final def daemon[@spec(Int, Long, Double) Q: Arrayable](implicit factory: EventQueue.Factory = frame.eventQueueFactory): Connector[Q] =
117-
Iso.openChannel[Q](frame, factory, true)
118-
11999
/** The unique id of this isolate.
120100
*
121101
* @return the unique id, assigned only to this isolate
@@ -165,13 +145,6 @@ object Iso {
165145

166146
private[reactive] val argFrame = new DynamicVariable[IsoFrame](null)
167147

168-
private[reactive] def openChannel[@spec(Int, Long, Double) Q: Arrayable](frame: IsoFrame, factory: EventQueue.Factory, isDaemon: Boolean): Connector[Q] = {
169-
val eventQueue = factory.create[Q]
170-
val connector = new Connector(frame, eventQueue, isDaemon)
171-
frame.multiplexer += connector
172-
connector
173-
}
174-
175148
/** Returns the current isolate.
176149
*
177150
* If the caller is not executing in an isolate,

src/main/scala/scala/reactive/IsoSystem.scala

Lines changed: 73 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ abstract class IsoSystem {
5454
* @param scheduler the scheduler used to scheduler the isolate
5555
* @return the channel for this isolate
5656
*/
57-
def isolate[@spec(Int, Long, Double) T: Arrayable](proto: Proto[Iso[T]], name: String = null): Channel[T]
57+
def isolate[@spec(Int, Long, Double) T: Arrayable](proto: Proto[Iso[T]]): Channel[T]
5858

5959
/** Creates a new channel for the specified isolate frame.
6060
*
@@ -77,11 +77,14 @@ abstract class IsoSystem {
7777
*/
7878
protected def uniqueName(name: String): String
7979

80-
/** Releases the name after the isolate terminates.
80+
/** Releases the channel names associated with the isolate,
81+
* and then releases the name of the isolate.
82+
*
83+
* Called after the isolate terminates.
8184
*
8285
* @param name the name to release
8386
*/
84-
protected[reactive] def releaseName(name: String): Unit
87+
protected[reactive] def releaseNames(name: String): Unit
8588

8689
/** Generates a new unique id, generated only once during
8790
* the lifetime of this isolate system.
@@ -107,14 +110,18 @@ abstract class IsoSystem {
107110
/** Creates an isolate frame.
108111
*
109112
* Should only be overridden if the default isolate initialization order needs to change.
113+
* The multiplexer, unique name and unique id are created for an isolate first.
114+
* Then, the isolate frame is created.
115+
* Then, the isolate object (concrete user implementation) is instantiated.
116+
* Then, the isolate frame is assigned the isolate object.
117+
* Finally, the `initiate` method is called on the scheduler, and the isolate is returned.
110118
* See the source code of the default implementation of this method for more details.
111119
*
112120
* @tparam T the type of the events for the isolate
113121
* @param proto prototype for the isolate
114-
* @param name name of the new isolate
115122
* @return the resulting isolate frame
116123
*/
117-
protected def createFrame[@spec(Int, Long, Double) T: Arrayable](proto: Proto[Iso[T]], name: String): Iso[T] = {
124+
protected def createFrame[@spec(Int, Long, Double) T: Arrayable](proto: Proto[Iso[T]]): Iso[T] = {
118125
val scheduler = proto.scheduler match {
119126
case null => bundle.defaultScheduler
120127
case name => bundle.scheduler(name)
@@ -128,16 +135,16 @@ abstract class IsoSystem {
128135
case mult => mult
129136
}
130137
val uid = uniqueId()
131-
val uname = uniqueName(name)
138+
val uname = uniqueName(proto.name)
132139
val frame = new IsoFrame(
133140
uid,
134141
uname,
135142
IsoSystem.this,
136143
scheduler,
137144
queueFactory,
138145
multiplexer,
139-
frame => Iso.openChannel(frame, queueFactory, false),
140-
frame => Iso.openChannel(frame, queueFactory, true)
146+
frame => IsoSystem.openChannel(IsoSystem.this, frame, queueFactory, "events", false),
147+
frame => IsoSystem.openChannel(IsoSystem.this, frame, queueFactory, "internal", true)
141148
)
142149
val isolate = Iso.argFrame.withValue(frame) {
143150
createAndResetIso(proto)
@@ -154,6 +161,19 @@ abstract class IsoSystem {
154161
*/
155162
object IsoSystem {
156163

164+
/** Opens a channel for the current isolate, using the specified parameters.
165+
*/
166+
private[reactive] def openChannel[@spec(Int, Long, Double) Q: Arrayable](system: IsoSystem, frame: IsoFrame, f: EventQueue.Factory, cn: String, isDaemon: Boolean): Connector[Q] = {
167+
val factory = if (f != null) f else Iso.self.frame.eventQueueFactory
168+
val channelName = if (cn != null) cn else "channel-" + frame.counter.incrementAndGet().toString
169+
val eventQueue = factory.create[Q]
170+
val name = frame.name + "#" + channelName
171+
val connector = new Connector(frame, eventQueue, name, isDaemon)
172+
system.channels(name) = connector.channel
173+
frame.multiplexer += connector
174+
connector
175+
}
176+
157177
/** Retrieves the default isolate system.
158178
*
159179
* @param name the name for the isolate system instance
@@ -210,41 +230,58 @@ object IsoSystem {
210230
*/
211231
lazy val defaultBundle = Bundle.default(Scheduler.default)
212232

233+
class ChannelBuilder(
234+
val system: IsoSystem,
235+
val channelName: String,
236+
val isDaemon: Boolean,
237+
val eventQueueFactory: EventQueue.Factory
238+
) {
239+
/** Associates a new name for the channel.
240+
*/
241+
def named(name: String) = new ChannelBuilder(system, name, isDaemon, eventQueueFactory)
242+
243+
/** Specifies a daemon channel.
244+
*/
245+
def daemon = new ChannelBuilder(system, channelName, true, eventQueueFactory)
246+
247+
/** Associates a new event queue factory.
248+
*/
249+
def eventQueue(factory: EventQueue.Factory) = new ChannelBuilder(system, channelName, isDaemon, factory)
250+
251+
/** Opens a new channel for this isolate.
252+
*
253+
* @tparam Q type of the events in the new channel
254+
* @param factory event queue factory
255+
* @return the connector object of the new channel
256+
*/
257+
final def open[@spec(Int, Long, Double) Q: Arrayable]: Connector[Q] =
258+
IsoSystem.openChannel[Q](system, Iso.self.frame, eventQueueFactory, channelName, isDaemon)
259+
}
260+
213261
/** The channel register used for channel lookup by name.
214262
*/
215-
trait Channels {
263+
abstract class Channels(system: IsoSystem) extends ChannelBuilder(system, null, false, null) {
264+
216265
/** Registers a new channel with this isolate system.
217266
*
218267
* Throws an exception if name is already taken.
219268
*
220269
* @param name name of the channel
221270
* @param channel the channel to register
222271
*/
223-
def update(name: String, channel: Channel[_]): Unit
272+
private[reactive] def update(name: String, channel: Channel[_]): Unit
224273

225274
/** Removes the channel registration.
226275
*
227276
* @param name name of the channel to remove from the register
228277
*/
229-
def remove(name: String): Unit
230-
231-
/** Returns a channel under the specified name, if any.
232-
*
233-
* Throws an exception if such a channel does not exist.
234-
*
235-
* @param name name of the channel
236-
* @return the channel registered under the specified name
237-
*/
238-
def apply[@spec(Int, Long, Double) T](name: String): Channel[T]
278+
private[reactive] def remove(name: String): Unit
239279

240-
/** Returns a channel under the specified name, if any.
280+
/** Removes all the channels registered with a specific isolate.
241281
*
242-
* Returns `None` if none exist.
243-
*
244-
* @param name name of the channel
245-
* @return optionally, the channel registered under the specified name
282+
* @param isoName name of the isolate whose channels must be removed
246283
*/
247-
def get[T](name: String): Option[Channel[T]]
284+
private[reactive] def removeIsolate(isoName: String): Unit
248285

249286
/** Eventually returns a channel under the specified name.
250287
*
@@ -255,6 +292,9 @@ object IsoSystem {
255292

256293
/** Eventually returns an *unsealed* channel under the specified name.
257294
*
295+
* Note: between the time that the channel is retrieved and the time it is first used,
296+
* that channel can asynchronously become sealed.
297+
*
258298
* @param name name of the channel
259299
* @return the ivar with the channel registered under the specified name
260300
*/
@@ -265,20 +305,19 @@ object IsoSystem {
265305

266306
/** A default implementation of the channels register.
267307
*/
268-
class Default extends IsoSystem.Channels {
308+
class Default(system: IsoSystem) extends IsoSystem.Channels(system) {
269309
private val channelMap = container.RMap[String, Channel[_]]
270-
def update(name: String, c: Channel[_]) = channelMap.synchronized {
310+
private[reactive] def update(name: String, c: Channel[_]) = channelMap.synchronized {
271311
if (!channelMap.contains(name)) channelMap(name) = c
272312
else sys.error(s"Name $name already contained in channels.")
273313
}
274-
def apply[@spec(Int, Long, Double) T](name: String): Channel[T] = channelMap.synchronized {
275-
channelMap(name).asInstanceOf[Channel[T]]
276-
}
277-
def remove(name: String): Unit = channelMap.synchronized {
314+
private[reactive] def remove(name: String): Unit = channelMap.synchronized {
278315
channelMap.remove(name)
279316
}
280-
def get[T](name: String): Option[Channel[T]] = channelMap.synchronized {
281-
channelMap.get(name).asInstanceOf[Option[Channel[T]]]
317+
private[reactive] def removeIsolate(isoName: String): Unit = channelMap.synchronized {
318+
val prefix = isoName + "#"
319+
val obsoleteNames = channelMap.keys.filter(_ startsWith prefix)
320+
for (name <- obsoleteNames) channelMap.remove(name)
282321
}
283322
private def channelExtractor[T](reqId: Long): PartialFunction[InternalEvent, Channel[T]] = {
284323
case ChannelRetrieved(`reqId`, c: Channel[T]) => c

src/main/scala/scala/reactive/Proto.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ final class Proto[+I <: Iso[_]] private[reactive] (
1717
val params: Seq[Any],
1818
val scheduler: String = null,
1919
val eventQueueFactory: EventQueue.Factory = null,
20-
val multiplexer: Multiplexer = null
20+
val multiplexer: Multiplexer = null,
21+
val name: String = null
2122
) {
2223

2324
/** Instantiates and returns the isolate.
@@ -31,17 +32,22 @@ final class Proto[+I <: Iso[_]] private[reactive] (
3132
* @param sname name of the scheduler
3233
* @return a new `Proto` object
3334
*/
34-
def withScheduler(sname: String): Proto[I] = new Proto(clazz, params, sname, eventQueueFactory, multiplexer)
35+
def withScheduler(sname: String): Proto[I] = new Proto(clazz, params, sname, eventQueueFactory, multiplexer, name)
3536

3637
/** Associates the specified event queue type and returns the new `Proto` object.
3738
*
3839
* @param f event queue factory, used to instantiate the event queue object
3940
* @return a new `Proto` object
4041
*/
41-
def withEventQueue(f: EventQueue.Factory): Proto[I] = new Proto(clazz, params, scheduler, f, multiplexer)
42+
def withEventQueue(f: EventQueue.Factory): Proto[I] = new Proto(clazz, params, scheduler, f, multiplexer, name)
4243

44+
/** Associates a multiplexer with the event queue type and returns the new `Proto` object.
45+
*/
46+
def withMultiplexer(m: Multiplexer): Proto[I] = new Proto(clazz, params, scheduler, eventQueueFactory, m, name)
4347

44-
def withMultiplexer(m: Multiplexer): Proto[I] = new Proto(clazz, params, scheduler, eventQueueFactory, m)
48+
/** Associates the name for the new isolate and returns the new `Proto` object.
49+
*/
50+
def withName(nm: String): Proto[I] = new Proto(clazz, params, scheduler, eventQueueFactory, multiplexer, nm)
4551

4652
}
4753

src/main/scala/scala/reactive/container/RMap.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ trait RMap[@spec(Int, Long, Double) K, V <: AnyRef] extends RContainer[(K, V)] {
1414

1515
def entries: PairContainer[K, V]
1616

17+
def keys: RContainer[K]
18+
19+
def values: RContainer[V]
20+
1721
def react: RMap.Lifted[K, V]
1822

1923
}

src/main/scala/scala/reactive/isolate/Connector.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ package isolate
1515
* @tparam T the type of the events in this connector
1616
* @param frame the isolate frame
1717
* @param queue the event queue
18+
* @param name the name of the channel in this connector
1819
* @param isDaemon is the connector a daemon -- daemon channels are ignored when determining termination
1920
*/
2021
class Connector[@spec(Int, Long, Double) T](
2122
private[reactive] val frame: IsoFrame,
2223
private[reactive] val queue: EventQueue[T],
24+
val name: String,
2325
val isDaemon: Boolean
2426
) {
2527
@volatile private[reactive] var dequeuer: Dequeuer[T] = _
@@ -73,6 +75,7 @@ object Connector {
7375
}
7476
def unreact() = {
7577
multiplexer.unreacted(connector)
78+
connector.frame.isolateSystem.channels.removeIsolate(connector.name)
7679
connector.frame.apply()
7780
}
7881
}

src/main/scala/scala/reactive/isolate/DefaultIsoSystem.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,19 @@ extends IsoSystem {
3434
else name
3535
}
3636

37-
protected[reactive] def releaseName(name: String): Unit = monitor.synchronized {
37+
protected[reactive] def releaseNames(name: String): Unit = monitor.synchronized {
38+
channels.removeIsolate(name)
3839
isolates.remove(name)
3940
}
4041

4142
protected[reactive] def newChannel[@spec(Int, Long, Double) Q](reactor: Reactor[Q]) = {
4243
new Channel.Synced(reactor, new util.Monitor)
4344
}
4445

45-
val channels = new IsoSystem.Channels.Default
46+
val channels = new IsoSystem.Channels.Default(this)
4647

47-
def isolate[@spec(Int, Long, Double) T: Arrayable](proto: Proto[Iso[T]], name: String = null): Channel[T] = {
48-
val isolate = createFrame(proto, name)
48+
def isolate[@spec(Int, Long, Double) T: Arrayable](proto: Proto[Iso[T]]): Channel[T] = {
49+
val isolate = createFrame(proto)
4950
val frame = isolate.frame
5051
monitor.synchronized {
5152
isolates(frame.name) = frame

src/main/scala/scala/reactive/isolate/IsoFrame.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ final class IsoFrame(
9696
if (isolateState.compareAndSet(Running, Terminated)) {
9797
try isolate.systemEmitter += IsoTerminated
9898
finally try for (es <- isolate.eventSources) es.close()
99-
finally isolateSystem.releaseName(name)
99+
finally isolateSystem.releaseNames(name)
100100
} else checkTerminated()
101101
}
102102
}

src/main/scala/scala/reactive/isolate/Multiplexer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ object Multiplexer {
117117
}
118118

119119
private def addConnector(connector: Connector[_]) {
120-
descriptors += connector
121120
if (!connector.isDaemon) liveCount += 1
121+
descriptors += connector
122122
}
123123

124124
private def deleteCurrentConnector() {

0 commit comments

Comments
 (0)