-
Notifications
You must be signed in to change notification settings - Fork 29k
[STREAMING] SPARK-1729. Make Flume pull data from source, rather than the current pu... #807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
6d6776a
d24d9d4
08176ad
03d6c1c
8df37e4
87775aa
0f10788
c604a3c
9741683
e7da512
d6fa3aa
70bcc2a
3c23c18
0d69604
bda01fc
4b0c7fc
205034d
86aa274
8136aa6
9fd0da7
120e2a1
393bd94
8c00289
1edc806
10b6214
d248d22
3c5194c
799509f
3572180
f3c99d1
e59cc20
65b76b4
73d6f6d
1f47364
a082eb3
7a1bc6e
981bf62
5f212ce
e48d785
96cfb6f
e7f70a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
… push model Update to the previous patch fixing some error cases and also excluding Netty dependencies. Also updated the unit tests.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,9 +30,8 @@ import java.util.concurrent._ | |
| import java.util | ||
| import org.apache.flume.conf.{ConfigurationException, Configurable} | ||
| import com.google.common.util.concurrent.ThreadFactoryBuilder | ||
| import org.apache.avro.ipc.{NettyTransceiver, NettyServer} | ||
| import org.apache.avro.ipc.NettyServer | ||
| import org.apache.avro.ipc.specific.SpecificResponder | ||
| import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory | ||
| import java.net.InetSocketAddress | ||
|
|
||
| class SparkSink() extends AbstractSink with Configurable { | ||
|
|
@@ -75,12 +74,10 @@ class SparkSink() extends AbstractSink with Configurable { | |
|
|
||
| val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler()) | ||
|
|
||
| serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port), | ||
| new NioServerSocketChannelFactory( | ||
| Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat( | ||
| "Spark Sink " + classOf[NettyTransceiver].getSimpleName + " Boss-%d").build), | ||
| Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().setNameFormat( | ||
| "Spark Sink " + classOf[NettyTransceiver].getSimpleName + " I/O Worker-%d").build)))) | ||
| // Using the constructor that takes specific thread-pools requires bringing in netty | ||
| // dependencies which are being excluded in the build. In practice, | ||
| // Netty dependencies are already available on the JVM as Flume would have pulled them in. | ||
| serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) | ||
|
|
||
| serverOpt.map(server => server.start()) | ||
| lock.lock() | ||
|
|
@@ -93,10 +90,14 @@ class SparkSink() extends AbstractSink with Configurable { | |
| } | ||
|
|
||
| override def stop() { | ||
|
||
| transactionExecutorOpt.map(executor => executor.shutdownNow()) | ||
| serverOpt.map(server => { | ||
| server.close() | ||
| server.join() | ||
| }) | ||
| lock.lock() | ||
| try { | ||
| running = false | ||
| transactionExecutorOpt.map(executor => executor.shutdownNow()) | ||
| blockingCondition.signalAll() | ||
| } finally { | ||
| lock.unlock() | ||
|
|
@@ -131,23 +132,28 @@ class SparkSink() extends AbstractSink with Configurable { | |
| Status.BACKOFF | ||
| } | ||
|
|
||
|
|
||
| // Object representing an empty batch returned by the txn processor due to some error. | ||
| case object ErrorEventBatch extends EventBatch | ||
|
|
||
| private class AvroCallbackHandler() extends SparkFlumeProtocol { | ||
|
||
|
|
||
| override def getEventBatch(n: Int): EventBatch = { | ||
| val processor = processorFactory.get.checkOut(n) | ||
| transactionExecutorOpt.map(executor => executor.submit(processor)) | ||
| // Wait until a batch is available - can be null if some error was thrown | ||
| val eventBatch = Option(processor.eventQueue.take()) | ||
| if (eventBatch.isDefined) { | ||
| val eventsToBeSent = eventBatch.get | ||
| processorMap.put(eventsToBeSent.getSequenceNumber, processor) | ||
| if (LOG.isDebugEnabled) { | ||
| LOG.debug("Sent " + eventsToBeSent.getEventBatch.size() + | ||
| " events with sequence number: " + eventsToBeSent.getSequenceNumber) | ||
| val eventBatch = processor.eventQueue.take() | ||
| eventBatch match { | ||
| case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" + | ||
| " retrieved from channel.") | ||
| case events => { | ||
| processorMap.put(events.getSequenceNumber, processor) | ||
| if (LOG.isDebugEnabled) { | ||
| LOG.debug("Sent " + events.getEventBatch.size() + | ||
| " events with sequence number: " + events.getSequenceNumber) | ||
| } | ||
| events | ||
| } | ||
| eventsToBeSent | ||
| } else { | ||
| throw new FlumeException("Error while trying to retrieve events from the channel.") | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -211,17 +217,38 @@ class SparkSink() extends AbstractSink with Configurable { | |
| val events = eventBatch.getEventBatch | ||
| events.clear() | ||
| val loop = new Breaks | ||
| var gotEventsInThisTxn = false | ||
| loop.breakable { | ||
| for (i <- 0 until maxBatchSize) { | ||
| var i = 0 | ||
| // Using for here causes the maxBatchSize change to be ineffective as the Range gets | ||
| // pregenerated | ||
| while (i < maxBatchSize) { | ||
| i += 1 | ||
| val eventOpt = Option(getChannel.take()) | ||
|
|
||
| eventOpt.map(event => { | ||
|
||
| events.add(new SparkSinkEvent(toCharSequenceMap(event | ||
| .getHeaders), | ||
| ByteBuffer.wrap(event.getBody))) | ||
| gotEventsInThisTxn = true | ||
| }) | ||
| if (eventOpt.isEmpty) { | ||
| loop.break() | ||
| if (!gotEventsInThisTxn) { | ||
| // To avoid sending empty batches, we wait till events are available backing off | ||
| // between attempts to get events. Each attempt to get an event though causes one | ||
| // iteration to be lost. To ensure that we still send back maxBatchSize number of | ||
| // events, we cheat and increase the maxBatchSize by 1 to account for the lost | ||
| // iteration. Even throwing an exception is expensive as Avro will serialize it | ||
| // and send it over the wire, which is useless. Before incrementing though, | ||
| // ensure that we are not anywhere near INT_MAX. | ||
| if (maxBatchSize >= Int.MaxValue / 2) { | ||
|
||
| // Random sanity check | ||
| throw new RuntimeException("Safety exception - polled too many times, no events!") | ||
| } | ||
| maxBatchSize += 1 | ||
| Thread.sleep(500) | ||
| } else { | ||
| loop.break() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -283,7 +310,7 @@ class SparkSink() extends AbstractSink with Configurable { | |
| null // No point rethrowing the exception | ||
| } finally { | ||
| // Must *always* release the caller thread | ||
| eventQueue.put(null) | ||
| eventQueue.put(ErrorEventBatch) | ||
| // In the case of success coming after the timeout, but before resetting the seq number | ||
| // remove the event from the map and then clear the value | ||
| resultQueue.clear() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala style nit: no need for parenthesis when no parameter present