Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ The Dataframe being written to Kafka should have the following columns in schema
<td>topic (*optional)</td>
<td>string</td>
</tr>
<tr>
<td>partition (optional)</td>
<td>int</td>
</tr>
</table>
\* The topic column is required if the "topic" configuration option is not specified.<br>

Expand All @@ -622,6 +626,10 @@ a ```null``` valued key column will be automatically added (see Kafka semantics
how ```null``` valued key values are handled). If a topic column exists then its value
is used as the topic when writing the given row to Kafka, unless the "topic" configuration
option is set i.e., the "topic" configuration option overrides the topic column.
If a partition column is not specified then the partition is calculated by the Kafka producer
(using ```org.apache.kafka.clients.producer.internals.DefaultPartitioner```).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to avoid mentioning default of Kafka directly, as Kafka could change it and the information becomes stale then. I feel that's OK to just mention it will follow the configuration of Kafka.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on not mentioning the Kafka internal class, just saying default partitioner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

This can be overridden in Spark by setting the ```kafka.partitioner.class``` option.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly the fallback scenario is the following and the later entries can override the former?

  • Default
  • kafka.partitioner.class provided
  • Partition id column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. But this is KafkaProducer standard behaviour:

  • it uses ProducerRecord partition field. if null, fall backs to:
  • kafka.partitioner.class provided. If not set:
  • use default partitioner.

I don't believe we need a test for this (otherwise we would be testing Kafka API) but maybe we should explicitly state it in the doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would like to test here is that Spark parameterizes producer properly.
These testing points has nothing to do with producer internals:

  • kafka.partitioner.class config reaches producer instances and takes effect
  • Partition field is set under some circumstances

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would like to test here is that Spark parameterizes producer properly.
These testing points has nothing to do with producer internals:
* kafka.partitioner.class config reaches producer instances and takes effect

I actually manually tested this and it does because they are passed down by other classes (see val specifiedKafkaParams = kafkaParamsForProducer(caseInsensitiveParameters), line 177 of KafkaSourceProvider. Any config that starts with kafka. will be passed down to the producer, actually. The problem is that this is not stated in the doc (it only mentions two optional configs). Maybe I can update the doc in this respect?

* Partition field is set under some circumstances

Don't the other test I added prove this point already?

In any case, I'm fine with adding two more tests:

  1. kafka.partitioner.class overrides default partitioner.
  2. partition column overrides kafka.partitioner.class.

I can write a simple Kafka Partitioner that always spits 0 and test it with the same pattern
(collect() and then two topics as you suggested).

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any config that starts with kafka. will be passed down to the producer, actually.

There are exceptions, please see them either in the doc or in the code. If such thing is not checked by a test then it can be broken easily. Just a personal thought: if something is not covered in a test later it's super hard to find out it's a bug or a feature...

Don't the other test I added prove this point already?

If we add the mentioned simplified test then you're right it will cover the partition field part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need test 1 (kafka.partitioner.class overrides default partitioner)?



The following options must be set for the Kafka sink
for both batch and streaming queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.kafka.common.header.internals.RecordHeader

import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
import org.apache.spark.sql.types.{BinaryType, StringType}
import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType}

/**
* Writes out data in a single Spark task, without any concerns about how
Expand Down Expand Up @@ -92,16 +92,19 @@ private[kafka010] abstract class KafkaRowWriter(
throw new NullPointerException(s"null topic present in the data. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
}
val partition: java.lang.Integer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Int is not enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to put null into parameter : Scala Int doesn't allow null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks. This case Integer is enough, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.Integer allows null so it's enough. If I understood you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean: val partition: Integer =...

if (projectedRow.isNullAt(4)) null else projectedRow.getInt(4)
val record = if (projectedRow.isNullAt(3)) {
new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, null, key, value)
new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, partition, key, value)
} else {
val headerArray = projectedRow.getArray(3)
val headers = (0 until headerArray.numElements()).map { i =>
val struct = headerArray.getStruct(i, 2)
new RecordHeader(struct.getUTF8String(0).toString, struct.getBinary(1))
.asInstanceOf[Header]
}
new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, null, key, value, headers.asJava)
new ProducerRecord[Array[Byte], Array[Byte]](
topic.toString, partition, key, value, headers.asJava)
}
producer.send(record, callback)
}
Expand Down Expand Up @@ -156,12 +159,23 @@ private[kafka010] abstract class KafkaRowWriter(
throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
s"attribute unsupported type ${t.catalogString}")
}
val partitionExpression =
inputSchema.find(_.name == KafkaWriter.PARTITION_ATTRIBUTE_NAME)
.getOrElse(Literal(null, IntegerType))
partitionExpression.dataType match {
case IntegerType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t. ${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " +
s"must be a ${IntegerType.catalogString}")
}
UnsafeProjection.create(
Seq(
topicExpression,
Cast(keyExpression, BinaryType),
Cast(valueExpression, BinaryType),
headersExpression
headersExpression,
partitionExpression
),
inputSchema
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.types.{BinaryType, MapType, StringType}
import org.apache.spark.sql.types.{BinaryType, IntegerType, MapType, StringType}
import org.apache.spark.util.Utils

/**
Expand All @@ -41,6 +41,7 @@ private[kafka010] object KafkaWriter extends Logging {
val KEY_ATTRIBUTE_NAME: String = "key"
val VALUE_ATTRIBUTE_NAME: String = "value"
val HEADERS_ATTRIBUTE_NAME: String = "headers"
val PARTITION_ATTRIBUTE_NAME: String = "partition"

override def toString: String = "KafkaWriter"

Expand Down Expand Up @@ -86,6 +87,14 @@ private[kafka010] object KafkaWriter extends Logging {
throw new AnalysisException(s"$HEADERS_ATTRIBUTE_NAME attribute type " +
s"must be a ${KafkaRecordToRowConverter.headersType.catalogString}")
}
schema.find(_.name == PARTITION_ATTRIBUTE_NAME).getOrElse(
Literal(null, IntegerType)
).dataType match {
case IntegerType => // good
case _ =>
throw new AnalysisException(s"$PARTITION_ATTRIBUTE_NAME attribute type " +
s"must be an ${IntegerType.catalogString}")
}
}

def write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,15 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
}
assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))

val ex4 = intercept[AnalysisException] {
/* partition field wrong type */
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as partition", "value"
)
}
assert(ex4.getMessage.toLowerCase(Locale.ROOT).contains(
"partition attribute type must be an int"))
}

test("streaming - write to non-existing topic") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{BinaryType, DataType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, StringType, StructField, StructType}

abstract class KafkaSinkSuiteBase extends QueryTest with SharedSparkSession with KafkaTest {
protected var testUtils: KafkaTestUtils = _
Expand Down Expand Up @@ -293,6 +293,21 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest {
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this copy-paste started earlier but it's a good opportunity to reduce it.
As I see only the select expression and the message is changing here, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I just submitted a patch #26158 to handle refactor to old ones: depending on which one to merge first, we can apply it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, started to review it...

ex = intercept[StreamingQueryException] {
/* partition field wrong type */
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value", "value as partition"
)
input.addData("1", "2", "3", "4", "5")
writer.processAllAvailable()
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"partition attribute type must be an int"))
}

test("streaming - write to non-existing topic") {
Expand Down Expand Up @@ -369,31 +384,32 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {

test("batch - write to kafka") {
val topic = newTopic()
testUtils.createTopic(topic)
testUtils.createTopic(topic, 4)
val data = Seq(
Row(topic, "1", Seq(
Row("a", "b".getBytes(UTF_8))
)),
), 0),
Row(topic, "2", Seq(
Row("c", "d".getBytes(UTF_8)),
Row("e", "f".getBytes(UTF_8))
)),
), 1),
Row(topic, "3", Seq(
Row("g", "h".getBytes(UTF_8)),
Row("g", "i".getBytes(UTF_8))
)),
Row(topic, "4", null),
), 2),
Row(topic, "4", null, 3),
Row(topic, "5", Seq(
Row("j", "k".getBytes(UTF_8)),
Row("j", "l".getBytes(UTF_8)),
Row("m", "n".getBytes(UTF_8))
))
), 0)
)

val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(Seq(StructField("topic", StringType), StructField("value", StringType),
StructField("headers", KafkaRecordToRowConverter.headersType)))
StructField("headers", KafkaRecordToRowConverter.headersType),
StructField("partition", IntegerType)))
)

df.write
Expand All @@ -404,20 +420,85 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
.save()
checkAnswer(
createKafkaReader(topic, includeHeaders = true).selectExpr(
"CAST(value as STRING) value", "headers"
"CAST(value as STRING) value", "headers", "partition"
),
Row("1", Seq(Row("a", "b".getBytes(UTF_8)))) ::
Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", "f".getBytes(UTF_8)))) ::
Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", "i".getBytes(UTF_8)))) ::
Row("4", null) ::
Row("1", Seq(Row("a", "b".getBytes(UTF_8))), 0) ::
Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", "f".getBytes(UTF_8))), 1) ::
Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", "i".getBytes(UTF_8))), 2) ::
Row("4", null, 3) ::
Row("5", Seq(
Row("j", "k".getBytes(UTF_8)),
Row("j", "l".getBytes(UTF_8)),
Row("m", "n".getBytes(UTF_8)))) ::
Row("m", "n".getBytes(UTF_8))), 0) ::
Nil
)
}

test("batch - partition column vs default Kafka partitioner") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we add this test to check the functionality to specify partition, I think we don't need to change above test.

Copy link
Contributor Author

@redsk redsk Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it because a previous PR (headers) changed it as well. But I can revert it to the previous state if so desired. Shall I?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally yes, but we can hear more voices on this as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think test("batch - write to kafka") should be the simplest test and shouldn't contain neither headers nor partitions (I know for headers it's a bit late).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone through the test but not sure if it covers all. Here are my thought:

  • Not sure why 100 partitions necessary?
  • Don't we need the following tests? 1. kafka.partitioner.class overrides default 2. column overrides kafka.partitioner.class and default
  • I think this test can be formed more simple. Put fixed key into a topic, get the partition (just like it is now but see my next comment). In another topic put all the data into a partition which is not the formerly get partition. Finally read back and double check.
  • .collect().toList.head maybe not enough because it would be good to make sure the data is in one partition.
  • There are couple of copy-pastes here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.collect() docstring says "Returns an array that contains all rows in this Dataset. Running collect requires moving all the data into the application's driver process, and doing so on a very large dataset can crash the driver process with OutOfMemoryError." Does collect() work on a per-partition base? If so maybe I could use .coalesce(1) before calling collect().

What I mean something like this:
createKafkaReader(keyTopic).select("partition").map(_.getInt(0)).collect().toSet.size() === 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, they are also all over the file.

Yes, the code can be enhanced several places but that doesn't mean we have to do the same copy-paste.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean something like this:
createKafkaReader(keyTopic).select("partition").map(_.getInt(0)).collect().toSet.size() === 1

I'm not sure I understand. Do you want me to assert that? Is this necessary considering that I send just one record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the code can be enhanced several places but that doesn't mean we have to do the same copy-paste.

There are different opinions on copy-paste in UTs and I just copied the pattern in use in the file. But I also lean towards not copy-pasting. I can write a small function for that. Shall I apply it to other tests in the same function or just my contribution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order not to have feature creep I would suggest to keep this for you change set. Later we can file another PR for the rest...

val fixedKey = "fixed_key"
val nrPartitions = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't matter if we just set number of partitions to 10 - we have flaky test failures where we cannot get metadata from Kafka in timeout, so I'd rather not make pressure on Kafka side.


// default Kafka partitioner calculate partition deterministically based on the key
val keyTopic = newTopic()
testUtils.createTopic(keyTopic, nrPartitions)

Seq((keyTopic, fixedKey, "value"))
.toDF("topic", "key", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", keyTopic)
.mode("append")
.save()

// getting the partition corresponding to the fixed key
val keyPartition = createKafkaReader(keyTopic).select("partition")
.map(_.getInt(0)).collect().toList.head

val topic = newTopic()
testUtils.createTopic(topic, nrPartitions)

// even values use default kafka partitioner, odd use 'n'
val df = (0 until 100)
.map(n => (topic, fixedKey, s"$n", if (n % 2 == 0) None else Some(n)))
.toDF("topic", "key", "value", "partition")

df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.mode("append")
.save()

checkAnswer(
createKafkaReader(topic).selectExpr(
"CAST(key as STRING) key", "CAST(value as STRING) value", "partition"
),
(0 until 100)
.map(n => (fixedKey, s"$n", if (n % 2 == 0) keyPartition else n))
.toDF("key", "value", "partition")
)
}

test("batch - non-existing partitions trigger standard Kafka exception") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would avoid depending on the behavior of Kafka - I'd feel we don't need to have this test, but let's hear more voices.

Copy link
Contributor Author

@redsk redsk Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main question was: "what happens if I specify partitions that don't exist?" And the answer is that Kafka throws an exception with msg: Topic $topic not present in metadata after $producerTimeout ms which is not particularly clear.

I could have called producer.partitionsFor(topic) to check if the specified partition is in range (and throw a more meaningful exception if not) but it's an expensive call for each message. I could cache the result but the number of partitions could change in between calls and I didn't want to complicate the implementation. Hence the need to see what the standard Kafka producer does (which is "not much").

And the rationale is: if Kafka implementation changes, this test will fail and we'll have a chance to provide a more meaningful exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to know what happens when such case happens. We've tested it and it's good.
From Spark test perspective we must ensure that our code works as expected and I think we don't have to pull in Kafka internals (I mean messages and timeouts) unless it's super important. I think it's not the case here which makes this test unnecessary from my point of view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll remove it.

val topic = newTopic()
val producerTimeout = "1000"
testUtils.createTopic(topic, 4)
val df = Seq((topic, "1", 5)).toDF("topic", "value", "partition")

val ex = intercept[SparkException] {
df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.max.block.ms", producerTimeout) // default (60000 ms) would be too slow
.option("topic", topic)
.mode("append")
.save()
}
TestUtils.assertExceptionMsg(
ex, s"Topic $topic not present in metadata after $producerTimeout ms")
}

test("batch - null topic field value, and no topic option") {
val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
val ex = intercept[SparkException] {
Expand Down