Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ trait ConsumerStrategy[K, V] {
}

/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
Expand All @@ -68,8 +67,7 @@ trait ConsumerStrategy[K, V] {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
case class Subscribe[K, V] private(
private case class Subscribe[K, V](
topics: ju.Collection[java.lang.String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: keep this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just told me in the last PR to remove Experimental annotations from private classes. This is private. What's the actual rule?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I commented at a wrong place. I meant ConsumerStrategies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, makes sense, believe I fixed it there.

kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
Expand All @@ -90,12 +88,45 @@ case class Subscribe[K, V] private(
}
}

/**
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
private case class Assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
) extends ConsumerStrategy[K, V] {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}

consumer
}
}

/**
* :: Experimental ::
* Companion object for creating [[Subscribe]] strategy
* object for obtaining instances of [[ConsumerStrategy]]
*/
@Experimental
object Subscribe {
object ConsumerStrategies {
/**
* :: Experimental ::
* Subscribe to a collection of topics.
Expand All @@ -111,10 +142,10 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
def apply[K, V](
def subscribe[K, V](
topics: Iterable[java.lang.String],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
Expand All @@ -133,9 +164,9 @@ object Subscribe {
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def apply[K, V](
def subscribe[K, V](
topics: Iterable[java.lang.String],
kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
Expand All @@ -157,10 +188,10 @@ object Subscribe {
* auto.offset.reset will be used.
*/
@Experimental
def create[K, V](
def subscribe[K, V](
topics: ju.Collection[java.lang.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
offsets: ju.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
Subscribe[K, V](topics, kafkaParams, offsets)
}

Expand All @@ -176,56 +207,12 @@ object Subscribe {
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def create[K, V](
def subscribe[K, V](
topics: ju.Collection[java.lang.String],
kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
}

}

/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
case class Assign[K, V] private(
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
) extends ConsumerStrategy[K, V] {

def executorKafkaParams: ju.Map[String, Object] = kafkaParams

def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}

consumer
}
}

/**
* :: Experimental ::
* Companion object for creating [[Assign]] strategy
*/
@Experimental
object Assign {
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
Expand All @@ -241,10 +228,10 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
def apply[K, V](
def assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
Expand All @@ -263,9 +250,9 @@ object Assign {
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def apply[K, V](
def assign[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
Expand All @@ -287,10 +274,10 @@ object Assign {
* auto.offset.reset will be used.
*/
@Experimental
def create[K, V](
def assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
offsets: ju.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
Assign[K, V](topicPartitions, kafkaParams, offsets)
}

Expand All @@ -306,9 +293,10 @@ object Assign {
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def create[K, V](
def assign[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ object KafkaUtils extends Logging {
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
* see [[LocationStrategies]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand Down Expand Up @@ -87,8 +87,8 @@ object KafkaUtils extends Logging {
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
* see [[LocationStrategies]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand All @@ -110,10 +110,10 @@ object KafkaUtils extends Logging {
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* see [[ConsumerStrategy]] for more details
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
* see [[LocationStrategies]] for more details.
* @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
* see [[ConsumerStrategies]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand All @@ -132,10 +132,10 @@ object KafkaUtils extends Logging {
* each given Kafka topic/partition corresponds to an RDD partition.
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* see [[ConsumerStrategy]] for more details
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
* see [[LocationStrategies]] for more details.
* @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
* see [[ConsumerStrategies]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,42 +36,47 @@ import org.apache.spark.annotation.Experimental
@Experimental
sealed trait LocationStrategy

/**
* :: Experimental ::
* Use this only if your executors are on the same nodes as your Kafka brokers.
*/
@Experimental
case object PreferBrokers extends LocationStrategy {
def create: PreferBrokers.type = this
}
private case object PreferBrokers extends LocationStrategy

/**
* :: Experimental ::
* Use this in most cases, it will consistently distribute partitions across all executors.
*/
@Experimental
case object PreferConsistent extends LocationStrategy {
def create: PreferConsistent.type = this
}
private case object PreferConsistent extends LocationStrategy

/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy
private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy

/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
* :: Experimental :: object to obtain instances of [[LocationStrategy]]
*
*/
@Experimental
object PreferFixed {
def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = {
object LocationStrategies {
/**
* :: Experimental ::
* Use this only if your executors are on the same nodes as your Kafka brokers.
*/
@Experimental
def preferBrokers: LocationStrategy = PreferBrokers

/**
* :: Experimental ::
* Use this in most cases, it will consistently distribute partitions across all executors.
*/
@Experimental
def preferConsistent: LocationStrategy = PreferConsistent

/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
def preferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
}
def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed =

/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
def preferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
PreferFixed(hostMap)
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,26 @@ public void testConsumerStrategyConstructors() {
// final ConsumerStrategy<String, String> sub0 = // does not compile in Scala 2.10
// Subscribe.<String, String>apply(topics, kafkaParams, offsets);
final ConsumerStrategy<String, String> sub1 =
Subscribe.<String, String>apply(sTopics, sKafkaParams, sOffsets);
ConsumerStrategies.<String, String>subscribe(sTopics, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> sub2 =
Subscribe.<String, String>apply(sTopics, sKafkaParams);
ConsumerStrategies.<String, String>subscribe(sTopics, sKafkaParams);
final ConsumerStrategy<String, String> sub3 =
Subscribe.<String, String>create(topics, kafkaParams, offsets);
ConsumerStrategies.<String, String>subscribe(topics, kafkaParams, offsets);
final ConsumerStrategy<String, String> sub4 =
Subscribe.<String, String>create(topics, kafkaParams);
ConsumerStrategies.<String, String>subscribe(topics, kafkaParams);

Assert.assertEquals(
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));

// final ConsumerStrategy<String, String> asn0 = // does not compile in Scala 2.10
// Assign.<String, String>apply(parts, kafkaParams, offsets);
final ConsumerStrategy<String, String> asn1 =
Assign.<String, String>apply(sParts, sKafkaParams, sOffsets);
ConsumerStrategies.<String, String>assign(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =
Assign.<String, String>apply(sParts, sKafkaParams);
ConsumerStrategies.<String, String>assign(sParts, sKafkaParams);
final ConsumerStrategy<String, String> asn3 =
Assign.<String, String>create(parts, kafkaParams, offsets);
ConsumerStrategies.<String, String>assign(parts, kafkaParams, offsets);
final ConsumerStrategy<String, String> asn4 =
Assign.<String, String>create(parts, kafkaParams);
ConsumerStrategies.<String, String>assign(parts, kafkaParams);

Assert.assertEquals(
asn1.executorKafkaParams().get("bootstrap.servers"),
Expand Down
Loading