Skip to content

Commit 42fef3b

Browse files
committed
Added new Akka example
1 parent c315c87 commit 42fef3b

File tree

6 files changed

+135
-18
lines changed

6 files changed

+135
-18
lines changed

chapter9/build.sbt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
name := "scatter-gather"
2+
3+
organization := "scalaindepth"
4+
5+
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases"
6+
7+
libraryDependencies += "se.scalablesolutions.akka" % "akka-actor" % "1.1.2"
8+
9+

chapter9/project/build.properties

Lines changed: 0 additions & 8 deletions
This file was deleted.

chapter9/project/build/chapter9.scala

Lines changed: 0 additions & 3 deletions
This file was deleted.

chapter9/project/plugins/plugins.scala

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package scattergather
2+
3+
import collection.immutable.HashMap
4+
import akka.actor.{ReceiveTimeout, ActorRef, Actor}
5+
6+
/**
7+
* A message representing a document to add to the search tree.
8+
*/
9+
case class SearchableDocument(content: String)
10+
11+
trait BaseHeadNode { self: AdaptiveSearchNode =>
12+
var children = IndexedSeq[ActorRef]()
13+
var currentIdx = 0
14+
def parentNode: PartialFunction[Any, Unit] = {
15+
case SearchQuery(q, max, responder) =>
16+
// TODO - use gatherer scheudler
17+
val gatherer = Actor.actorOf(new GathererNode {
18+
val maxDocs = max
19+
val maxResponses = children.size
20+
val query = q
21+
val client = responder
22+
})
23+
gatherer.start
24+
for (node <- children) {
25+
node ! SearchQuery(q, max, gatherer)
26+
}
27+
case s @ SearchableDocument(_) => getNextChild ! s
28+
}
29+
30+
// Round Robin
31+
private def getNextChild = {
32+
currentIdx = (1 + currentIdx) % children.size
33+
children(currentIdx)
34+
}
35+
36+
}
37+
38+
trait BaseChildNode { self: AdaptiveSearchNode =>
39+
final val maxNoOfDocuments = 10
40+
var documents: Vector[String] = Vector()
41+
var index: HashMap[String, Seq[(Double, String)]] = HashMap()
42+
43+
def leafNode: PartialFunction[Any, Unit] = {
44+
case SearchQuery(query, maxDocs, handler) => executeLocalQuery(query, maxDocs, handler)
45+
case SearchableDocument(content) => addDocumentToLocalIndex(content)
46+
}
47+
private def executeLocalQuery(query: String, maxDocs: Int, handler: ActorRef) = {
48+
val result = for {
49+
results <- index.get(query).toList
50+
resultList <- results
51+
} yield resultList
52+
handler ! QueryResponse(result take maxDocs)
53+
}
54+
55+
private def addDocumentToLocalIndex(content: String) = {
56+
for( (key,value) <- content.split("\\s+").groupBy(identity)) {
57+
val list = index.get(key) getOrElse Seq()
58+
index += ((key, ((value.length.toDouble, content)) +: list))
59+
}
60+
documents = documents :+ content
61+
// Split on size....
62+
if (documents.size > maxNoOfDocuments) split()
63+
}
64+
65+
/** Abstract method to split this actor. */
66+
protected def split(): Unit
67+
68+
protected def clearIndex(): Unit = {
69+
documents = Vector()
70+
index = HashMap()
71+
}
72+
}
73+
74+
75+
class AdaptiveSearchNode extends Actor with BaseHeadNode with BaseChildNode {
76+
def receive = leafNode
77+
78+
/** Splits this search node into a tree of search nodes if there are too many documents. */
79+
protected def split(): Unit = {
80+
children = (for(docs <- documents grouped 5) yield {
81+
// TODO - use search scheduler + hook up to supervisor...
82+
val child = Actor.actorOf[AdaptiveSearchNode]
83+
child.start()
84+
docs foreach (child ! SearchableDocument(_))
85+
child
86+
}).toIndexedSeq
87+
clearIndex()
88+
this become parentNode
89+
}
90+
91+
}

chapter9/src/main/scala/main.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,43 @@
11
package scattergather
22

3-
import akka.actor.{Supervisor, Actor}
3+
import akka.actor.{Supervisor, Actor, ActorRef}
44
import akka.config.Supervision._
55
import akka.dispatch.Dispatchers
66
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
77

8+
object AdaptiveSearchTreeMain {
9+
def submitInitialDocuments(searchNode: ActorRef) =
10+
Seq("Some example data for you",
11+
"Some more example data for you to use",
12+
"To be or not to be, that is the question",
13+
"OMG it's a cat",
14+
"This is an example. It's a great one",
15+
"HAI there",
16+
"HAI IZ HUNGRY",
17+
"Hello, World",
18+
"Hello, and welcome to the search node 8",
19+
"The lazy brown fox jumped over the",
20+
"Winning is the best because it's winning."
21+
) foreach (doc => searchNode ! SearchableDocument(doc))
22+
def makeTree = {
23+
val supervisor = Supervisor(SupervisorConfig(AllForOneStrategy(List(classOf[Exception]), 3, 1000), Nil))
24+
val searchnodedispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("adaptive search tree")
25+
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
26+
.setCorePoolSize(10)
27+
.setMaxPoolSize(128)
28+
.setKeepAliveTimeInMillis(60000)
29+
.setRejectionPolicy(new CallerRunsPolicy)
30+
.build
31+
val searchTree = Actor.actorOf(new AdaptiveSearchNode {
32+
self.dispatcher = searchnodedispatcher
33+
})
34+
supervisor link searchTree
35+
searchTree.start()
36+
submitInitialDocuments(searchTree)
37+
searchTree
38+
}
39+
}
40+
841
object SearchTreeMain {
942
def create(name: String) = {
1043
val supervisor = Supervisor(SupervisorConfig(AllForOneStrategy(List(classOf[Exception]), 3, 1000), Nil))
@@ -39,4 +72,4 @@ object SearchTreeMain {
3972
tmp
4073
}
4174

42-
}
75+
}

0 commit comments

Comments
 (0)