From 0ac064061ec34fdcfd50422d9b192e2c46431cf4 Mon Sep 17 00:00:00 2001 From: redsk Date: Thu, 17 Oct 2019 13:45:28 +0200 Subject: [PATCH 1/7] [SPARK-29500] Add support for Kafka partition in KafkaWriter --- .../spark/sql/kafka010/KafkaWriteTask.scala | 22 +++++++++++++++---- .../spark/sql/kafka010/KafkaWriter.scala | 11 +++++++++- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index b423ddc959c1..a006887fa67a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} -import org.apache.spark.sql.types.{BinaryType, StringType} +import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType} /** * Writes out data in a single Spark task, without any concerns about how @@ -92,8 +92,10 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } + val partition: java.lang.Integer = + if (projectedRow.isNullAt(4)) null else projectedRow.getInt(4) val record = if (projectedRow.isNullAt(3)) { - new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, null, key, value) + new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, partition, key, value) } else { val headerArray = projectedRow.getArray(3) val headers = (0 until headerArray.numElements()).map { i => @@ -101,7 +103,8 @@ private[kafka010] abstract class KafkaRowWriter( new RecordHeader(struct.getUTF8String(0).toString, struct.getBinary(1)) .asInstanceOf[Header] } - new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, null, key, value, headers.asJava) + new ProducerRecord[Array[Byte], Array[Byte]]( + topic.toString, partition, key, value, headers.asJava) } producer.send(record, callback) } @@ -156,12 +159,23 @@ private[kafka010] abstract class KafkaRowWriter( throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " + s"attribute unsupported type ${t.catalogString}") } + val partitionExpression = + inputSchema.find(_.name == KafkaWriter.PARTITION_ATTRIBUTE_NAME) + .getOrElse(Literal(null, IntegerType)) + partitionExpression.dataType match { + case IntegerType => // good + case t => + throw new IllegalStateException(s"${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " + + s"attribute unsupported type $t. ${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " + + s"must be a ${IntegerType.catalogString}") + } UnsafeProjection.create( Seq( topicExpression, Cast(keyExpression, BinaryType), Cast(valueExpression, BinaryType), - headersExpression + headersExpression, + partitionExpression ), inputSchema ) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index bbb060356f73..9b0d11f137ce 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.types.{BinaryType, MapType, StringType} +import org.apache.spark.sql.types.{BinaryType, IntegerType, MapType, StringType} import org.apache.spark.util.Utils /** @@ -41,6 +41,7 @@ private[kafka010] object KafkaWriter extends Logging { val KEY_ATTRIBUTE_NAME: String = "key" val VALUE_ATTRIBUTE_NAME: String = "value" val HEADERS_ATTRIBUTE_NAME: String = "headers" + val PARTITION_ATTRIBUTE_NAME: String = "partition" override def toString: String = "KafkaWriter" @@ -86,6 +87,14 @@ private[kafka010] object KafkaWriter extends Logging { throw new AnalysisException(s"$HEADERS_ATTRIBUTE_NAME attribute type " + s"must be a ${KafkaRecordToRowConverter.headersType.catalogString}") } + schema.find(_.name == PARTITION_ATTRIBUTE_NAME).getOrElse( + Literal(null, IntegerType) + ).dataType match { + case IntegerType => // good + case _ => + throw new AnalysisException(s"$PARTITION_ATTRIBUTE_NAME attribute type " + + s"must be an ${IntegerType.catalogString}") + } } def write( From 1172d79915e3173cb6c5b4c8d37e1a7e4d00e220 Mon Sep 17 00:00:00 2001 From: redsk Date: Thu, 17 Oct 2019 18:01:41 +0200 Subject: [PATCH 2/7] [SPARK-29500] Add support for Kafka partition in KafkaWriter (tests) --- .../kafka010/KafkaContinuousSinkSuite.scala | 9 ++ .../spark/sql/kafka010/KafkaSinkSuite.scala | 109 +++++++++++++++--- 2 files changed, 104 insertions(+), 14 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index 65adbd6b9887..cbf4952406c0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -286,6 +286,15 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { } assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains( "key attribute type must be a string or binary")) + + val ex4 = intercept[AnalysisException] { + /* partition field wrong type */ + createKafkaWriter(input.toDF())( + withSelectExpr = s"'$topic' as topic", "value as partition", "value" + ) + } + assert(ex4.getMessage.toLowerCase(Locale.ROOT).contains( + "partition attribute type must be an int")) } test("streaming - write to non-existing topic") { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index d77b9a3b6a9e..e88706057c1e 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{BinaryType, DataType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, StringType, StructField, StructType} abstract class KafkaSinkSuiteBase extends QueryTest with SharedSparkSession with KafkaTest { protected var testUtils: KafkaTestUtils = _ @@ -293,6 +293,21 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest { } assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( "key attribute type must be a string or binary")) + + try { + ex = intercept[StreamingQueryException] { + /* partition field wrong type */ + writer = createKafkaWriter(input.toDF())( + withSelectExpr = s"'$topic' as topic", "value", "value as partition" + ) + input.addData("1", "2", "3", "4", "5") + writer.processAllAvailable() + } + } finally { + writer.stop() + } + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( + "partition attribute type must be an int")) } test("streaming - write to non-existing topic") { @@ -369,31 +384,32 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { test("batch - write to kafka") { val topic = newTopic() - testUtils.createTopic(topic) + testUtils.createTopic(topic, 4) val data = Seq( Row(topic, "1", Seq( Row("a", "b".getBytes(UTF_8)) - )), + ), 0), Row(topic, "2", Seq( Row("c", "d".getBytes(UTF_8)), Row("e", "f".getBytes(UTF_8)) - )), + ), 1), Row(topic, "3", Seq( Row("g", "h".getBytes(UTF_8)), Row("g", "i".getBytes(UTF_8)) - )), - Row(topic, "4", null), + ), 2), + Row(topic, "4", null, 3), Row(topic, "5", Seq( Row("j", "k".getBytes(UTF_8)), Row("j", "l".getBytes(UTF_8)), Row("m", "n".getBytes(UTF_8)) - )) + ), 0) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), StructType(Seq(StructField("topic", StringType), StructField("value", StringType), - StructField("headers", KafkaRecordToRowConverter.headersType))) + StructField("headers", KafkaRecordToRowConverter.headersType), + StructField("partition", IntegerType))) ) df.write @@ -404,20 +420,85 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { .save() checkAnswer( createKafkaReader(topic, includeHeaders = true).selectExpr( - "CAST(value as STRING) value", "headers" + "CAST(value as STRING) value", "headers", "partition" ), - Row("1", Seq(Row("a", "b".getBytes(UTF_8)))) :: - Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", "f".getBytes(UTF_8)))) :: - Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", "i".getBytes(UTF_8)))) :: - Row("4", null) :: + Row("1", Seq(Row("a", "b".getBytes(UTF_8))), 0) :: + Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", "f".getBytes(UTF_8))), 1) :: + Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", "i".getBytes(UTF_8))), 2) :: + Row("4", null, 3) :: Row("5", Seq( Row("j", "k".getBytes(UTF_8)), Row("j", "l".getBytes(UTF_8)), - Row("m", "n".getBytes(UTF_8)))) :: + Row("m", "n".getBytes(UTF_8))), 0) :: Nil ) } + test("batch - partition column vs default Kafka partitioner") { + val fixedKey = "fixed_key" + val nrPartitions = 100 + + // default Kafka partitioner calculate partition deterministically based on the key + val keyTopic = newTopic() + testUtils.createTopic(keyTopic, nrPartitions) + + Seq((keyTopic, fixedKey, "value")) + .toDF("topic", "key", "value") + .write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("topic", keyTopic) + .mode("append") + .save() + + // getting the partition corresponding to the fixed key + val keyPartition = createKafkaReader(keyTopic).select("partition") + .map(_.getInt(0)).collect().toList.head + + val topic = newTopic() + testUtils.createTopic(topic, nrPartitions) + + // even values use default kafka partitioner, odd use 'n' + val df = (0 until 100) + .map(n => (topic, fixedKey, s"$n", if (n % 2 == 0) None else Some(n))) + .toDF("topic", "key", "value", "partition") + + df.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("topic", topic) + .mode("append") + .save() + + checkAnswer( + createKafkaReader(topic).selectExpr( + "CAST(key as STRING) key", "CAST(value as STRING) value", "partition" + ), + (0 until 100) + .map(n => (fixedKey, s"$n", if (n % 2 == 0) keyPartition else n)) + .toDF("key", "value", "partition") + ) + } + + test("batch - non-existing partitions trigger standard Kafka exception") { + val topic = newTopic() + val producerTimeout = "1000" + testUtils.createTopic(topic, 4) + val df = Seq((topic, "1", 5)).toDF("topic", "value", "partition") + + val ex = intercept[SparkException] { + df.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.max.block.ms", producerTimeout) // default (60000 ms) would be too slow + .option("topic", topic) + .mode("append") + .save() + } + TestUtils.assertExceptionMsg( + ex, s"Topic $topic not present in metadata after $producerTimeout ms") + } + test("batch - null topic field value, and no topic option") { val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value") val ex = intercept[SparkException] { From 5b6487225fc825fe432858d46f79932ee728c259 Mon Sep 17 00:00:00 2001 From: redsk Date: Thu, 17 Oct 2019 22:52:34 +0200 Subject: [PATCH 3/7] [SPARK-29500] Add support for Kafka partition in KafkaWriter (doc) --- docs/structured-streaming-kafka-integration.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 89732d309aa2..bcbd58ca2124 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -614,6 +614,10 @@ The Dataframe being written to Kafka should have the following columns in schema topic (*optional) string + + partition (optional) + int + \* The topic column is required if the "topic" configuration option is not specified.
@@ -622,6 +626,10 @@ a ```null``` valued key column will be automatically added (see Kafka semantics how ```null``` valued key values are handled). If a topic column exists then its value is used as the topic when writing the given row to Kafka, unless the "topic" configuration option is set i.e., the "topic" configuration option overrides the topic column. +If a partition column is not specified then the partition is calculated by the Kafka producer +(using ```org.apache.kafka.clients.producer.internals.DefaultPartitioner```). +This can be overridden in Spark by setting the ```kafka.partitioner.class``` option. + The following options must be set for the Kafka sink for both batch and streaming queries. From 10308aed4f47a7687021a8b2d99cbe85b54a03b6 Mon Sep 17 00:00:00 2001 From: redsk Date: Sat, 19 Oct 2019 11:16:11 +0200 Subject: [PATCH 4/7] [SPARK-29500] Rewritten tests --- .../structured-streaming-kafka-integration.md | 8 +- .../spark/sql/kafka010/KafkaWriteTask.scala | 2 +- .../spark/sql/kafka010/KafkaSinkSuite.scala | 165 +++++++++++------- 3 files changed, 106 insertions(+), 69 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index bcbd58ca2124..badf0429545f 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -626,9 +626,11 @@ a ```null``` valued key column will be automatically added (see Kafka semantics how ```null``` valued key values are handled). If a topic column exists then its value is used as the topic when writing the given row to Kafka, unless the "topic" configuration option is set i.e., the "topic" configuration option overrides the topic column. -If a partition column is not specified then the partition is calculated by the Kafka producer -(using ```org.apache.kafka.clients.producer.internals.DefaultPartitioner```). -This can be overridden in Spark by setting the ```kafka.partitioner.class``` option. +If a "partition" column is not specified (or its value is ```null```) +then the partition is calculated by the Kafka producer. +A Kafka partitioner can be specified in Spark by setting the +```kafka.partitioner.class``` option. If not present, Kafka default partitioner +will be used. The following options must be set for the Kafka sink diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index a006887fa67a..5bdc1b5fe9f3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -92,7 +92,7 @@ private[kafka010] abstract class KafkaRowWriter( throw new NullPointerException(s"null topic present in the data. Use the " + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") } - val partition: java.lang.Integer = + val partition: Integer = if (projectedRow.isNullAt(4)) null else projectedRow.getInt(4) val record = if (projectedRow.isNullAt(3)) { new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, partition, key, value) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index e88706057c1e..1a37cdaf1560 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -22,6 +22,8 @@ import java.util.Locale import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.internals.DefaultPartitioner +import org.apache.kafka.common.Cluster import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest.time.SpanSugar._ @@ -384,32 +386,31 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { test("batch - write to kafka") { val topic = newTopic() - testUtils.createTopic(topic, 4) + testUtils.createTopic(topic) val data = Seq( Row(topic, "1", Seq( Row("a", "b".getBytes(UTF_8)) - ), 0), + )), Row(topic, "2", Seq( Row("c", "d".getBytes(UTF_8)), Row("e", "f".getBytes(UTF_8)) - ), 1), + )), Row(topic, "3", Seq( Row("g", "h".getBytes(UTF_8)), Row("g", "i".getBytes(UTF_8)) - ), 2), - Row(topic, "4", null, 3), + )), + Row(topic, "4", null), Row(topic, "5", Seq( Row("j", "k".getBytes(UTF_8)), Row("j", "l".getBytes(UTF_8)), Row("m", "n".getBytes(UTF_8)) - ), 0) + )) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), StructType(Seq(StructField("topic", StringType), StructField("value", StringType), - StructField("headers", KafkaRecordToRowConverter.headersType), - StructField("partition", IntegerType))) + StructField("headers", KafkaRecordToRowConverter.headersType))) ) df.write @@ -420,83 +421,105 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { .save() checkAnswer( createKafkaReader(topic, includeHeaders = true).selectExpr( - "CAST(value as STRING) value", "headers", "partition" + "CAST(value as STRING) value", "headers" ), - Row("1", Seq(Row("a", "b".getBytes(UTF_8))), 0) :: - Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", "f".getBytes(UTF_8))), 1) :: - Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", "i".getBytes(UTF_8))), 2) :: - Row("4", null, 3) :: + Row("1", Seq(Row("a", "b".getBytes(UTF_8)))) :: + Row("2", Seq(Row("c", "d".getBytes(UTF_8)), Row("e", "f".getBytes(UTF_8)))) :: + Row("3", Seq(Row("g", "h".getBytes(UTF_8)), Row("g", "i".getBytes(UTF_8)))) :: + Row("4", null) :: Row("5", Seq( Row("j", "k".getBytes(UTF_8)), Row("j", "l".getBytes(UTF_8)), - Row("m", "n".getBytes(UTF_8))), 0) :: + Row("m", "n".getBytes(UTF_8)))) :: Nil ) } - test("batch - partition column vs default Kafka partitioner") { - val fixedKey = "fixed_key" - val nrPartitions = 100 - - // default Kafka partitioner calculate partition deterministically based on the key - val keyTopic = newTopic() - testUtils.createTopic(keyTopic, nrPartitions) - - Seq((keyTopic, fixedKey, "value")) - .toDF("topic", "key", "value") + def writeToKafka(df: DataFrame, topic: String, options: Map[String, String] = Map.empty): Unit = { + df .write .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("topic", keyTopic) + .option("topic", topic) + .options(options) .mode("append") .save() + } - // getting the partition corresponding to the fixed key - val keyPartition = createKafkaReader(keyTopic).select("partition") - .map(_.getInt(0)).collect().toList.head + def partitionsInTopic(topic: String): Set[Int] = { + createKafkaReader(topic) + .select("partition") + .map(_.getInt(0)) + .collect() + .toSet + } + + test("batch - partition column sets partition in kafka writes") { + val fixedKey = "fixed_key" + val nrPartitions = 4 val topic = newTopic() testUtils.createTopic(topic, nrPartitions) - // even values use default kafka partitioner, odd use 'n' - val df = (0 until 100) - .map(n => (topic, fixedKey, s"$n", if (n % 2 == 0) None else Some(n))) - .toDF("topic", "key", "value", "partition") - - df.write - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("topic", topic) - .mode("append") - .save() - - checkAnswer( - createKafkaReader(topic).selectExpr( - "CAST(key as STRING) key", "CAST(value as STRING) value", "partition" - ), - (0 until 100) - .map(n => (fixedKey, s"$n", if (n % 2 == 0) keyPartition else n)) - .toDF("key", "value", "partition") - ) + // default Kafka partitioner calculates partitions deterministically based on the key + val df = (0 until 5) + .map(n => (topic, fixedKey, s"$n")) + .toDF("topic", "key", "value") + writeToKafka(df, topic) + val partitionsForFixedKey = partitionsInTopic(topic) + assert(partitionsForFixedKey.size == 1) + val keyPartition = partitionsForFixedKey.head + + val topic2 = newTopic() + testUtils.createTopic(topic2, nrPartitions) + + val differentPartition = (0 until nrPartitions).find(p => p != keyPartition).get + val df2 = df.withColumn("partition", lit(differentPartition)) + writeToKafka(df2, topic2) + val partitions = partitionsInTopic(topic2) + assert(partitions.size == 1) + assert(partitions.head != keyPartition) } - test("batch - non-existing partitions trigger standard Kafka exception") { - val topic = newTopic() - val producerTimeout = "1000" - testUtils.createTopic(topic, 4) - val df = Seq((topic, "1", 5)).toDF("topic", "value", "partition") - - val ex = intercept[SparkException] { - df.write - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.max.block.ms", producerTimeout) // default (60000 ms) would be too slow - .option("topic", topic) - .mode("append") - .save() - } - TestUtils.assertExceptionMsg( - ex, s"Topic $topic not present in metadata after $producerTimeout ms") + test("batch - partition column and partitioner priorities") { + val nrPartitions = 4 + val topic1 = newTopic() + val topic2 = newTopic() + val topic3 = newTopic() + val topic4 = newTopic() + testUtils.createTopic(topic1, nrPartitions) + testUtils.createTopic(topic2, nrPartitions) + testUtils.createTopic(topic3, nrPartitions) + testUtils.createTopic(topic4, nrPartitions) + + val df = (0 until 5).map(n => (topic1, s"$n", s"$n")).toDF("topic", "key", "value") + + // default kafka partitioner + writeToKafka(df, topic1) + val partitionsInTopic1 = partitionsInTopic(topic1) + assert(partitionsInTopic1.size > 1) + + // custom partitioner (always returns 0) overrides default partitioner + writeToKafka(df, topic2, Map( + "kafka.partitioner.class" -> "org.apache.spark.sql.kafka010.TestKafkaPartitioner" + )) + val partitionsInTopic2 = partitionsInTopic(topic2) + assert(partitionsInTopic2.size == 1) + assert(partitionsInTopic2.head == 0) + + // partition column overrides custom partitioner + val dfWithCustomPartition = df.withColumn("partition", lit(2)) + writeToKafka(dfWithCustomPartition, topic3, Map( + "kafka.partitioner.class" -> "org.apache.spark.sql.kafka010.TestKafkaPartitioner" + )) + val partitionsInTopic3 = partitionsInTopic(topic3) + assert(partitionsInTopic3.size == 1) + assert(partitionsInTopic3.head == 2) + + // when the partition column value is null, it is ignored + val dfWithNullPartitions = df.withColumn("partition", lit(null).cast(IntegerType)) + writeToKafka(dfWithNullPartitions, topic4) + assert(partitionsInTopic(topic4) == partitionsInTopic1) } test("batch - null topic field value, and no topic option") { @@ -596,3 +619,15 @@ class KafkaSinkBatchSuiteV2 extends KafkaSinkBatchSuiteBase { } } } + +class TestKafkaPartitioner extends DefaultPartitioner { + override def partition( + topic: String, + key: Any, + keyBytes: Array[Byte], + value: Any, + valueBytes: Array[Byte], + cluster: Cluster + ) + : Int = 0 +} From 021d2b753d8e0bd5f0b38145761ceca13d0329b8 Mon Sep 17 00:00:00 2001 From: redsk Date: Mon, 21 Oct 2019 11:34:15 +0200 Subject: [PATCH 5/7] [SPARK-29500] Fixed indentation --- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index 1a37cdaf1560..c588bd974921 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -622,12 +622,11 @@ class KafkaSinkBatchSuiteV2 extends KafkaSinkBatchSuiteBase { class TestKafkaPartitioner extends DefaultPartitioner { override def partition( - topic: String, - key: Any, - keyBytes: Array[Byte], - value: Any, - valueBytes: Array[Byte], - cluster: Cluster - ) + topic: String, + key: Any, + keyBytes: Array[Byte], + value: Any, + valueBytes: Array[Byte], + cluster: Cluster) : Int = 0 } From 57f7863fb993b1c8bdcd42d7b9d8ed3da82cebd7 Mon Sep 17 00:00:00 2001 From: redsk Date: Mon, 21 Oct 2019 15:06:14 +0200 Subject: [PATCH 6/7] [SPARK-29500] Tests cleanup --- .../spark/sql/kafka010/KafkaSinkSuite.scala | 38 +++---------------- 1 file changed, 5 insertions(+), 33 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index c588bd974921..b2934d3eff75 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -454,33 +454,6 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { .toSet } - test("batch - partition column sets partition in kafka writes") { - val fixedKey = "fixed_key" - val nrPartitions = 4 - - val topic = newTopic() - testUtils.createTopic(topic, nrPartitions) - - // default Kafka partitioner calculates partitions deterministically based on the key - val df = (0 until 5) - .map(n => (topic, fixedKey, s"$n")) - .toDF("topic", "key", "value") - writeToKafka(df, topic) - val partitionsForFixedKey = partitionsInTopic(topic) - assert(partitionsForFixedKey.size == 1) - val keyPartition = partitionsForFixedKey.head - - val topic2 = newTopic() - testUtils.createTopic(topic2, nrPartitions) - - val differentPartition = (0 until nrPartitions).find(p => p != keyPartition).get - val df2 = df.withColumn("partition", lit(differentPartition)) - writeToKafka(df2, topic2) - val partitions = partitionsInTopic(topic2) - assert(partitions.size == 1) - assert(partitions.head != keyPartition) - } - test("batch - partition column and partitioner priorities") { val nrPartitions = 4 val topic1 = newTopic() @@ -491,6 +464,9 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { testUtils.createTopic(topic2, nrPartitions) testUtils.createTopic(topic3, nrPartitions) testUtils.createTopic(topic4, nrPartitions) + val customKafkaPartitionerConf = Map( + "kafka.partitioner.class" -> "org.apache.spark.sql.kafka010.TestKafkaPartitioner" + ) val df = (0 until 5).map(n => (topic1, s"$n", s"$n")).toDF("topic", "key", "value") @@ -500,18 +476,14 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase { assert(partitionsInTopic1.size > 1) // custom partitioner (always returns 0) overrides default partitioner - writeToKafka(df, topic2, Map( - "kafka.partitioner.class" -> "org.apache.spark.sql.kafka010.TestKafkaPartitioner" - )) + writeToKafka(df, topic2, customKafkaPartitionerConf) val partitionsInTopic2 = partitionsInTopic(topic2) assert(partitionsInTopic2.size == 1) assert(partitionsInTopic2.head == 0) // partition column overrides custom partitioner val dfWithCustomPartition = df.withColumn("partition", lit(2)) - writeToKafka(dfWithCustomPartition, topic3, Map( - "kafka.partitioner.class" -> "org.apache.spark.sql.kafka010.TestKafkaPartitioner" - )) + writeToKafka(dfWithCustomPartition, topic3, customKafkaPartitionerConf) val partitionsInTopic3 = partitionsInTopic(topic3) assert(partitionsInTopic3.size == 1) assert(partitionsInTopic3.head == 2) From 96a8e9f24fe7446ae444621cdb5c16e8d7c86a54 Mon Sep 17 00:00:00 2001 From: redsk Date: Tue, 22 Oct 2019 16:49:51 +0200 Subject: [PATCH 7/7] [SPARK-29500] Code style --- .../scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index b2934d3eff75..aacb10f5197b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -599,6 +599,5 @@ class TestKafkaPartitioner extends DefaultPartitioner { keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], - cluster: Cluster) - : Int = 0 + cluster: Cluster): Int = 0 }