Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-29500] Add support for Kafka partition in KafkaWriter (doc)
  • Loading branch information
redsk committed Oct 17, 2019
commit 5b6487225fc825fe432858d46f79932ee728c259
8 changes: 8 additions & 0 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ The Dataframe being written to Kafka should have the following columns in schema
<td>topic (*optional)</td>
<td>string</td>
</tr>
<tr>
<td>partition (optional)</td>
<td>int</td>
</tr>
</table>
\* The topic column is required if the "topic" configuration option is not specified.<br>

Expand All @@ -622,6 +626,10 @@ a ```null``` valued key column will be automatically added (see Kafka semantics
how ```null``` valued key values are handled). If a topic column exists then its value
is used as the topic when writing the given row to Kafka, unless the "topic" configuration
option is set i.e., the "topic" configuration option overrides the topic column.
If a partition column is not specified then the partition is calculated by the Kafka producer
(using ```org.apache.kafka.clients.producer.internals.DefaultPartitioner```).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to avoid mentioning default of Kafka directly, as Kafka could change it and the information becomes stale then. I feel that's OK to just mention it will follow the configuration of Kafka.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on not mentioning the Kafka internal class, just saying default partitioner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

This can be overridden in Spark by setting the ```kafka.partitioner.class``` option.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly the fallback scenario is the following and the later entries can override the former?

  • Default
  • kafka.partitioner.class provided
  • Partition id column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. But this is KafkaProducer standard behaviour:

  • it uses ProducerRecord partition field. if null, fall backs to:
  • kafka.partitioner.class provided. If not set:
  • use default partitioner.

I don't believe we need a test for this (otherwise we would be testing Kafka API) but maybe we should explicitly state it in the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would like to test here is that Spark parameterizes producer properly.
These testing points has nothing to do with producer internals:

  • kafka.partitioner.class config reaches producer instances and takes effect
  • Partition field is set under some circumstances

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would like to test here is that Spark parameterizes producer properly.
These testing points has nothing to do with producer internals:
* kafka.partitioner.class config reaches producer instances and takes effect

I actually manually tested this and it does because they are passed down by other classes (see val specifiedKafkaParams = kafkaParamsForProducer(caseInsensitiveParameters), line 177 of KafkaSourceProvider. Any config that starts with kafka. will be passed down to the producer, actually. The problem is that this is not stated in the doc (it only mentions two optional configs). Maybe I can update the doc in this respect?

* Partition field is set under some circumstances

Don't the other test I added prove this point already?

In any case, I'm fine with adding two more tests:

  1. kafka.partitioner.class overrides default partitioner.
  2. partition column overrides kafka.partitioner.class.

I can write a simple Kafka Partitioner that always spits 0 and test it with the same pattern
(collect() and then two topics as you suggested).

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any config that starts with kafka. will be passed down to the producer, actually.

There are exceptions, please see them either in the doc or in the code. If such thing is not checked by a test then it can be broken easily. Just a personal thought: if something is not covered in a test later it's super hard to find out it's a bug or a feature...

Don't the other test I added prove this point already?

If we add the mentioned simplified test then you're right it will cover the partition field part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need test 1 (kafka.partitioner.class overrides default partitioner)?



The following options must be set for the Kafka sink
for both batch and streaming queries.
Expand Down