Skip to content

Commit fb8bb04

Browse files
Lin Zhaozsxwing
authored andcommitted
[SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever
Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow actor receiver blocked by back pressure or maxRate. Author: Lin Zhao <[email protected]> Closes apache#11176 from lin-zhao/SPARK-13069.
1 parent 751724b commit fb8bb04

File tree

3 files changed

+43
-1
lines changed

3 files changed

+43
-1
lines changed

external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka
2020
import java.nio.ByteBuffer
2121
import java.util.concurrent.atomic.AtomicInteger
2222

23+
import scala.concurrent.Future
2324
import scala.concurrent.duration._
2425
import scala.language.postfixOps
2526
import scala.reflect.ClassTag
2627

2728
import akka.actor._
2829
import akka.actor.SupervisorStrategy.{Escalate, Restart}
30+
import akka.pattern.ask
31+
import akka.util.Timeout
2932
import com.typesafe.config.ConfigFactory
3033

3134
import org.apache.spark.{Logging, TaskContext}
@@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor {
105108
}
106109

107110
/**
108-
* Store a single item of received data to Spark's memory.
111+
* Store a single item of received data to Spark's memory asynchronously.
109112
* These single items will be aggregated together into data blocks before
110113
* being pushed into Spark's memory.
111114
*/
112115
def store[T](item: T) {
113116
context.parent ! SingleItemData(item)
114117
}
118+
119+
/**
120+
* Store a single item of received data to Spark's memory and returns a `Future`.
121+
* The `Future` will be completed when the operator finishes, or with an
122+
* `akka.pattern.AskTimeoutException` after the given timeout has expired.
123+
* These single items will be aggregated together into data blocks before
124+
* being pushed into Spark's memory.
125+
*
126+
* This method allows the user to control the flow speed using `Future`
127+
*/
128+
def store[T](item: T, timeout: Timeout): Future[Unit] = {
129+
context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
130+
}
115131
}
116132

117133
/**
@@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor {
162178
def store[T](item: T) {
163179
context.parent ! SingleItemData(item)
164180
}
181+
182+
/**
183+
* Store a single item of received data to Spark's memory and returns a `Future`.
184+
* The `Future` will be completed when the operator finishes, or with an
185+
* `akka.pattern.AskTimeoutException` after the given timeout has expired.
186+
* These single items will be aggregated together into data blocks before
187+
* being pushed into Spark's memory.
188+
*
189+
* This method allows the user to control the flow speed using `Future`
190+
*/
191+
def store[T](item: T, timeout: Timeout): Future[Unit] = {
192+
context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
193+
}
165194
}
166195

167196
/**
@@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int,
179208
/** Case class to receive data sent by child actors */
180209
private[akka] sealed trait ActorReceiverData
181210
private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
211+
private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData
182212
private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
183213
private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
214+
private[akka] object Ack extends ActorReceiverData
184215

185216
/**
186217
* Provides Actors as receivers for receiving stream.
@@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag](
233264
store(msg.asInstanceOf[T])
234265
n.incrementAndGet
235266

267+
case AskStoreSingleItemData(msg) =>
268+
logDebug("received single sync")
269+
store(msg.asInstanceOf[T])
270+
n.incrementAndGet
271+
sender() ! Ack
272+
236273
case ByteBufferData(bytes) =>
237274
logDebug("received bytes")
238275
store(bytes)

external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import akka.actor.ActorSystem;
2121
import akka.actor.Props;
2222
import akka.actor.SupervisorStrategy;
23+
import akka.util.Timeout;
2324
import org.apache.spark.streaming.Duration;
2425
import org.apache.spark.streaming.api.java.JavaStreamingContext;
2526
import org.junit.Test;
@@ -62,5 +63,6 @@ class JavaTestActor extends JavaActorReceiver {
6263
@Override
6364
public void onReceive(Object message) throws Exception {
6465
store((String) message);
66+
store((String) message, new Timeout(1000));
6567
}
6668
}

external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.streaming.akka
1919

20+
import scala.concurrent.duration._
21+
2022
import akka.actor.{Props, SupervisorStrategy}
2123

2224
import org.apache.spark.SparkFunSuite
@@ -60,5 +62,6 @@ class AkkaUtilsSuite extends SparkFunSuite {
6062
class TestActor extends ActorReceiver {
6163
override def receive: Receive = {
6264
case m: String => store(m)
65+
case m => store(m, 10.seconds)
6366
}
6467
}

0 commit comments

Comments
 (0)