-
Notifications
You must be signed in to change notification settings - Fork 29k
[STREAMING] SPARK-1729. Make Flume pull data from source, rather than the current pu... #807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
6d6776a
d24d9d4
08176ad
03d6c1c
8df37e4
87775aa
0f10788
c604a3c
9741683
e7da512
d6fa3aa
70bcc2a
3c23c18
0d69604
bda01fc
4b0c7fc
205034d
86aa274
8136aa6
9fd0da7
120e2a1
393bd94
8c00289
1edc806
10b6214
d248d22
3c5194c
799509f
3572180
f3c99d1
e59cc20
65b76b4
73d6f6d
1f47364
a082eb3
7a1bc6e
981bf62
5f212ce
e48d785
96cfb6f
e7f70a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,10 +34,8 @@ import org.apache.avro.ipc.NettyServer | |
| import org.apache.avro.ipc.specific.SpecificResponder | ||
| import java.net.InetSocketAddress | ||
|
|
||
| class SparkSink() extends AbstractSink with Configurable { | ||
| class SparkSink extends AbstractSink with Configurable { | ||
| private val LOG = LoggerFactory.getLogger(this.getClass) | ||
| private val lock = new ReentrantLock() | ||
| private val blockingCondition = lock.newCondition() | ||
|
|
||
| // This sink will not persist sequence numbers and reuses them if it gets restarted. | ||
| // So it is possible to commit a transaction which may have been meant for the sink before the | ||
|
|
@@ -58,19 +56,20 @@ class SparkSink() extends AbstractSink with Configurable { | |
|
|
||
| private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]() | ||
|
|
||
| private var processorFactory: Option[SparkHandlerFactory] = None | ||
| private var processorManager: Option[TransactionProcessorManager] = None | ||
| private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME | ||
| private var port: Int = 0 | ||
| private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS | ||
| private var serverOpt: Option[NettyServer] = None | ||
| private var running = false | ||
|
|
||
| private val blockingLatch = new CountDownLatch(1) | ||
|
|
||
| override def start() { | ||
| transactionExecutorOpt = Option(Executors.newFixedThreadPool(numProcessors, | ||
| new ThreadFactoryBuilder().setDaemon(true) | ||
| .setNameFormat("Spark Sink, " + getName + " Processor Thread - %d").build())) | ||
|
|
||
| processorFactory = Option(new SparkHandlerFactory(numProcessors)) | ||
| processorManager = Option(new TransactionProcessorManager(numProcessors)) | ||
|
|
||
| val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler()) | ||
|
|
||
|
|
@@ -80,12 +79,6 @@ class SparkSink() extends AbstractSink with Configurable { | |
| serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) | ||
|
|
||
| serverOpt.map(server => server.start()) | ||
| lock.lock() | ||
| try { | ||
| running = true | ||
| } finally { | ||
| lock.unlock() | ||
| } | ||
| super.start() | ||
| } | ||
|
|
||
|
|
@@ -95,65 +88,48 @@ class SparkSink() extends AbstractSink with Configurable { | |
| server.close() | ||
| server.join() | ||
| }) | ||
| lock.lock() | ||
| try { | ||
| running = false | ||
| blockingCondition.signalAll() | ||
| } finally { | ||
| lock.unlock() | ||
| } | ||
| blockingLatch.countDown() | ||
| super.stop() | ||
| } | ||
|
|
||
| override def configure(ctx: Context) { | ||
| import SparkSinkConfig._ | ||
| hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME) | ||
| val portOpt = Option(ctx.getInteger(CONF_PORT)) | ||
| if(portOpt.isDefined) { | ||
| port = portOpt.get | ||
| } else { | ||
| throw new ConfigurationException("The Port to bind must be specified") | ||
| } | ||
| port = Option(ctx.getInteger(CONF_PORT)). | ||
| getOrElse(throw new ConfigurationException("The port to bind to must be specified")) | ||
| numProcessors = ctx.getInteger(PROCESSOR_COUNT, DEFAULT_PROCESSOR_COUNT) | ||
| transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT) | ||
| maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS) | ||
| } | ||
|
|
||
| override def process(): Status = { | ||
| // This method is called in a loop by the Flume framework - block it until the sink is | ||
| // stopped to save CPU resources | ||
| lock.lock() | ||
| try { | ||
| while(running) { | ||
| blockingCondition.await() | ||
| } | ||
| } finally { | ||
| lock.unlock() | ||
| } | ||
| // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is | ||
| // being shut down. | ||
| blockingLatch.await() | ||
| Status.BACKOFF | ||
| } | ||
|
|
||
|
|
||
| // Object representing an empty batch returned by the txn processor due to some error. | ||
| case object ErrorEventBatch extends EventBatch | ||
|
|
||
| private class AvroCallbackHandler() extends SparkFlumeProtocol { | ||
| private class AvroCallbackHandler extends SparkFlumeProtocol { | ||
|
|
||
| override def getEventBatch(n: Int): EventBatch = { | ||
| val processor = processorFactory.get.checkOut(n) | ||
| val processor = processorManager.get.checkOut(n) | ||
| transactionExecutorOpt.map(executor => executor.submit(processor)) | ||
| // Wait until a batch is available - can be null if some error was thrown | ||
| val eventBatch = processor.eventQueue.take() | ||
| eventBatch match { | ||
| processor.eventQueue.take() match { | ||
| case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" + | ||
| " retrieved from channel.") | ||
| case events => { | ||
| processorMap.put(events.getSequenceNumber, processor) | ||
| case eventBatch: EventBatch => | ||
| processorMap.put(eventBatch.getSequenceNumber, processor) | ||
| if (LOG.isDebugEnabled) { | ||
| LOG.debug("Sent " + events.getEventBatch.size() + | ||
| " events with sequence number: " + events.getSequenceNumber) | ||
| LOG.debug("Sent " + eventBatch.getEvents.size() + | ||
| " events with sequence number: " + eventBatch.getSequenceNumber) | ||
| } | ||
| events | ||
| } | ||
| eventBatch | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -214,41 +190,23 @@ class SparkSink() extends AbstractSink with Configurable { | |
| tx.begin() | ||
| try { | ||
| eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet()) | ||
| val events = eventBatch.getEventBatch | ||
| val events = eventBatch.getEvents | ||
| events.clear() | ||
| val loop = new Breaks | ||
| var gotEventsInThisTxn = false | ||
| loop.breakable { | ||
| var i = 0 | ||
| // Using for here causes the maxBatchSize change to be ineffective as the Range gets | ||
| // pregenerated | ||
| while (i < maxBatchSize) { | ||
| i += 1 | ||
| val eventOpt = Option(getChannel.take()) | ||
| eventOpt.map(event => { | ||
| events.add(new SparkSinkEvent(toCharSequenceMap(event | ||
| .getHeaders), | ||
| ByteBuffer.wrap(event.getBody))) | ||
| gotEventsInThisTxn = true | ||
| }) | ||
| if (eventOpt.isEmpty) { | ||
| if (!gotEventsInThisTxn) { | ||
| // To avoid sending empty batches, we wait till events are available backing off | ||
| // between attempts to get events. Each attempt to get an event though causes one | ||
| // iteration to be lost. To ensure that we still send back maxBatchSize number of | ||
| // events, we cheat and increase the maxBatchSize by 1 to account for the lost | ||
| // iteration. Even throwing an exception is expensive as Avro will serialize it | ||
| // and send it over the wire, which is useless. Before incrementing though, | ||
| // ensure that we are not anywhere near INT_MAX. | ||
| if (maxBatchSize >= Int.MaxValue / 2) { | ||
| // Random sanity check | ||
| throw new RuntimeException("Safety exception - polled too many times, no events!") | ||
| while (events.size() < maxBatchSize) { | ||
| Option(getChannel.take()) match { | ||
| case Some(event) => | ||
| events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders), | ||
| ByteBuffer.wrap(event.getBody))) | ||
| gotEventsInThisTxn = true | ||
| case None => | ||
| if (!gotEventsInThisTxn) { | ||
| Thread.sleep(500) | ||
| } else { | ||
| loop.break() | ||
| } | ||
| maxBatchSize += 1 | ||
| Thread.sleep(500) | ||
| } else { | ||
| loop.break() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -284,7 +242,7 @@ class SparkSink() extends AbstractSink with Configurable { | |
| } finally { | ||
| resultQueueUpdateLock.unlock() | ||
| } | ||
| eventBatch.getEventBatch.clear() | ||
| eventBatch.getEvents.clear() | ||
| // If the batch failed on spark side, throw a FlumeException | ||
| maybeResult.map(success => | ||
| if (!success) { | ||
|
|
@@ -315,7 +273,7 @@ class SparkSink() extends AbstractSink with Configurable { | |
| // remove the event from the map and then clear the value | ||
| resultQueue.clear() | ||
| processorMap.remove(eventBatch.getSequenceNumber) | ||
| processorFactory.get.checkIn(this) | ||
| processorManager.get.checkIn(this) | ||
| tx.close() | ||
| } | ||
| } | ||
|
|
@@ -328,7 +286,7 @@ class SparkSink() extends AbstractSink with Configurable { | |
| } | ||
| } | ||
|
|
||
| private class SparkHandlerFactory(val maxInstances: Int) { | ||
| private class TransactionProcessorManager(val maxInstances: Int) { | ||
| val queue = new scala.collection.mutable.Queue[TransactionProcessor] | ||
|
||
| val queueModificationLock = new ReentrantLock() | ||
| var currentSize = 0 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,13 +77,13 @@ private[streaming] class FlumePollingReceiver( | |
| private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later | ||
|
|
||
| override def onStart(): Unit = { | ||
| val connectionBuilder = new mutable.ArrayBuilder.ofRef[FlumeConnection]() | ||
| addresses.map(host => { | ||
| // Create the connections to each Flume agent. | ||
| connections = addresses.map(host => { | ||
| val transceiver = new NettyTransceiver(host, channelFactory) | ||
| val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) | ||
| connectionBuilder += new FlumeConnection(transceiver, client) | ||
| }) | ||
| connections = connectionBuilder.result() | ||
| new FlumeConnection(transceiver, client) | ||
| }).toArray | ||
|
|
||
| val dataReceiver = new Runnable { | ||
| override def run(): Unit = { | ||
| var counter = 0 | ||
|
|
@@ -93,14 +93,18 @@ private[streaming] class FlumePollingReceiver( | |
| counter += 1 | ||
| val batch = client.getEventBatch(maxBatchSize) | ||
| val seq = batch.getSequenceNumber | ||
| val events: java.util.List[SparkSinkEvent] = batch.getEventBatch | ||
| val events: java.util.List[SparkSinkEvent] = batch.getEvents | ||
| logDebug("Received batch of " + events.size() + " events with sequence number: " + seq) | ||
| try { | ||
| events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event))) | ||
| client.ack(seq) | ||
| } catch { | ||
| case e: Throwable => | ||
| client.nack(seq) | ||
| try { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, catch only exceptions. |
||
| client.nack(seq) // If the agent is down, even this could fail and throw | ||
| } catch { | ||
| case e: Throwable => logError("Sending Nack also failed. A Flume agent is down.") | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, catch only exceptions. |
||
| TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds. | ||
| logWarning("Error while attempting to store events", e) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are separate classes, its probably better to split these into separate files.