Skip to content

Commit 41ef436

Browse files
author
Vojin Jovanovic
committed
Actor migration instantiation tests and corresponding minor fixes.
1 parent 55cf154 commit 41ef436

File tree

9 files changed

+142
-35
lines changed

9 files changed

+142
-35
lines changed

src/actors/scala/actors/Actor.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -313,11 +313,7 @@ object Actor extends Combinators {
313313
def respond(k: B => Unit) = fun(caseBlock andThen k)
314314
}
315315

316-
private[actors] trait Body[a] {
317-
def andThen[b](other: => b): Unit
318-
}
319-
320-
implicit def mkBody[a](body: => a) = new Body[a] {
316+
implicit def mkBody[a](body: => a) = new InternalActor.Body[a] {
321317
def andThen[b](other: => b): Unit = rawSelf.seq(body, other)
322318
}
323319

src/actors/scala/actors/Combinators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ private[actors] trait Combinators {
1616
* Enables the composition of suspendable closures using `andThen`,
1717
* `loop`, `loopWhile`, etc.
1818
*/
19-
implicit def mkBody[a](body: => a): Actor.Body[a]
19+
implicit def mkBody[a](body: => a): InternalActor.Body[a]
2020

2121
/**
2222
* Repeatedly executes `body`.

src/actors/scala/actors/InternalActor.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ import java.util.TimerTask
1010
import scala.util.continuations.suspendable
1111
import scala.util.control.ControlThrowable
1212

13+
private[actors] object InternalActor {
14+
private[actors] trait Body[a] {
15+
def andThen[b](other: => b): Unit
16+
}
17+
}
18+
1319
private[actors] trait InternalActor extends AbstractActor with InternalReplyReactor with ActorCanReply with InputChannel[Any] with Serializable {
1420

1521
/* The following two fields are only used when the actor
@@ -432,7 +438,8 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
432438

433439
private[actors] def internalPostStop() = {}
434440

435-
private[actors] def stop(reason: AnyRef): Unit = {
441+
442+
private[actors] def stop(reason: AnyRef): Unit = {
436443
synchronized {
437444
shouldExit = true
438445
exitReason = reason
@@ -444,7 +451,8 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
444451
else if (waitingFor ne Reactor.waitingForNone) {
445452
waitingFor = Reactor.waitingForNone
446453
// it doesn't matter what partial function we are passing here
447-
scheduleActor(waitingFor, null)
454+
val task = new ActorTask(this, null, waitingFor, null)
455+
scheduler execute task
448456
/* Here we should not throw a SuspendActorControl,
449457
since the current method is called from an actor that
450458
is in the process of exiting.

src/actors/scala/actors/Reactor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators {
259259
_state
260260
}
261261

262-
implicit def mkBody[A](body: => A) = new Actor.Body[A] {
262+
implicit def mkBody[A](body: => A) = new InternalActor.Body[A] {
263263
def andThen[B](other: => B): Unit = Reactor.this.seq(body, other)
264264
}
265265

src/actors/scala/actors/RichActor.scala

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@ package scala.actors
33
import scala.util.continuations._
44
import scala.collection._
55

6+
object RichActor extends Combinators {
7+
implicit def mkBody[A](body: => A) = new InternalActor.Body[A] {
8+
def andThen[B](other: => B): Unit = Actor.rawSelf.seq(body, other)
9+
}
10+
}
11+
612
trait RichActor extends InternalActor {
7-
type Receive = PartialFunction[Any, Unit @suspendable]
13+
type Receive = PartialFunction[Any, Unit]
814

915
// checks if RichActor is created within the actorOf block
1016
creationCheck;
@@ -173,10 +179,10 @@ trait RichActor extends InternalActor {
173179
// creation check (see ActorRef)
174180
val context = ActorSystem.contextStack.get
175181
if (context.isEmpty)
176-
throw new Exception("must use actorOf to create actor")
182+
throw new RuntimeException("In order to create RichActor one must use actorOf.")
177183
else {
178184
if (!context.head)
179-
throw new Exception("must use actorOf to create actor")
185+
throw new RuntimeException("Only one actor can be created per actorOf call.")
180186
else
181187
ActorSystem.contextStack.set(context.push(false))
182188
}
@@ -191,23 +197,22 @@ trait RichActor extends InternalActor {
191197
* Method that models the behavior of Akka actors.
192198
*/
193199
private[actors] def internalAct() {
194-
reset {
200+
201+
behaviorStack = behaviorStack.push(new PartialFunction[Any, Unit] {
202+
def isDefinedAt(x: Any) =
203+
handle.isDefinedAt(x)
204+
def apply(x: Any) = handle(x)
205+
} orElse {
206+
case m => unhandled(m)
207+
})
195208

196-
behaviorStack = behaviorStack.push(new PartialFunction[Any, Unit] {
197-
def isDefinedAt(x: Any) =
198-
handle.isDefinedAt(x)
199-
def apply(x: Any) =
200-
reset { handle(x) }
201-
} orElse {
202-
case m => unhandled(m)
203-
})
204-
205-
while (true)
209+
reset {
210+
while(true) {
206211
if (receiveTimeout.isDefined)
207-
reactWithin(receiveTimeout.get)(behaviorStack.top) // orElse
212+
reactWithin(receiveTimeout.get)(behaviorStack.top)
208213
else
209214
react(behaviorStack.top)
210-
215+
}
211216
}
212217
}
213218

src/actors/scala/actors/akka/ActorRef.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ trait ActorRef {
2121

2222
def start(): ActorRef
2323

24-
// TODO (VJ) test with linked
2524
/**
2625
* Shuts down the actor its dispatcher and message queue.
2726
*/
@@ -105,8 +104,9 @@ private[actors] class ReplyActorRef(override val actor: InternalReplyReactor) ex
105104

106105
}
107106

108-
private[actors] final class RichActorRef(override val actor: RichActor) extends ReplyActorRef(actor) {
107+
private[actors] final class InternalActorRef(override val actor: InternalActor) extends ReplyActorRef(actor) {
109108

109+
// TODO (VJ) this does not work
110110
override def stop(): Unit = actor.stop('normal)
111111

112112
}

src/actors/scala/actors/akka/ActorSystem.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,16 @@ object ActorSystem {
2525
contextStack.set(if (!stackAfter.head) stackAfter.pop.pop else stackAfter.pop)
2626
}
2727
}
28-
29-
// actorOf(new MyActor())
30-
def actorOf(factory: RichActor): ActorRef = withCleanContext {
31-
val r = new RichActorRef(factory)
28+
29+
def actorOf(factory: InternalActor): ActorRef = withCleanContext {
30+
val r = new InternalActorRef(factory)
3231
r
3332
}
3433

35-
def actorOf[T <: RichActor](implicit m: Manifest[T]): ActorRef = withCleanContext {
34+
def actorOf[T <: InternalActor](implicit m: Manifest[T]): ActorRef = withCleanContext {
3635
val clazz = m.erasure.asInstanceOf[Class[_ <: RichActor]]
37-
val r = new RichActorRef(clazz.newInstance())
36+
val r = new InternalActorRef(clazz.newInstance())
3837
r
3938
}
4039

41-
42-
// TODO (VJ) def for plain actors
4340
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
OK error: java.lang.RuntimeException: In order to create RichActor one must use actorOf.
2+
OK error: java.lang.RuntimeException: Only one actor can be created per actorOf call.
3+
0
4+
100
5+
200
6+
300
7+
400
8+
500
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import scala.actors.ActorSystem._
2+
import scala.actors.Actor._
3+
import scala.actors.{Actor, RichActor, ActorRef}
4+
import scala.util.continuations._
5+
import java.util.concurrent.{TimeUnit, CountDownLatch}
6+
import scala.collection.mutable.ArrayBuffer
7+
8+
class TestRichActor extends RichActor {
9+
10+
def handle = { case v: Int => Test.append(v); Test.latch.countDown() }
11+
12+
}
13+
14+
object Test {
15+
val NUMBER_OF_TESTS = 5
16+
17+
// used for sorting non-deterministic output
18+
val buff = ArrayBuffer[Int](0)
19+
val latch = new CountDownLatch(NUMBER_OF_TESTS)
20+
val toStop = ArrayBuffer[ActorRef]()
21+
22+
def append(v: Int) = synchronized {
23+
buff += v
24+
}
25+
26+
def main(args: Array[String]) = {
27+
// plain scala actor
28+
val a1 = actor {
29+
reset {react { case v:Int => Test.append(v); Test.latch.countDown() }}
30+
}
31+
a1 ! 100
32+
33+
// simple instantiation
34+
val a2 = actorOf(new TestRichActor)
35+
a2.start()
36+
a2 ! 200
37+
38+
toStop += a2
39+
// actor of with scala actor
40+
val a3 = actorOf(actor{
41+
reset {react { case v:Int => Test.append(v); Test.latch.countDown() }}
42+
})
43+
a3 ! 300
44+
45+
// using the manifest
46+
val a4 = actorOf[TestRichActor].start()
47+
a4 ! 400
48+
49+
toStop += a4
50+
// deterministic part of a test
51+
52+
// creation without actorOf
53+
try {
54+
val a3 = new TestRichActor
55+
a3 ! -1
56+
} catch {
57+
case e => println("OK error: " + e)
58+
}
59+
60+
// actorOf double creation
61+
try {
62+
val a3 = actorOf {
63+
new TestRichActor
64+
new TestRichActor
65+
}
66+
a3 ! -1
67+
} catch {
68+
case e => println("OK error: " + e)
69+
}
70+
71+
// actorOf nesting
72+
try {
73+
val a5 = actorOf {
74+
val a6 = actorOf[TestRichActor]
75+
new TestRichActor
76+
}
77+
a5.start()
78+
a5 ! 500
79+
toStop += a5
80+
} catch {
81+
case e => println("Should not throw an exception: " + e)
82+
}
83+
84+
// output
85+
latch.await(10, TimeUnit.MILLISECONDS)
86+
if (latch.getCount() > 0) {
87+
println("Error: Tasks have not finished!!!")
88+
}
89+
90+
buff.sorted.foreach(println)
91+
toStop.foreach(_.stop())
92+
}
93+
}

0 commit comments

Comments
 (0)