Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
3d567a3
[MINOR][SQL] Avoid unnecessary invocation on checkAndGlobPathIfNecessary
Ngone51 Oct 22, 2019
484f93e
[SPARK-29530][SQL] Make SQLConf in SQL parse process thread safe
AngersZhuuuu Oct 22, 2019
467c3f6
[SPARK-29529][DOCS] Remove unnecessary orc version and hive version i…
denglingang Oct 22, 2019
811d563
[SPARK-29536][PYTHON] Upgrade cloudpickle to 1.1.1 to support Python 3.8
HyukjinKwon Oct 22, 2019
868d851
[SPARK-29232][ML] Update the parameter maps of the DecisionTreeRegres…
huaxingao Oct 22, 2019
3163b6b
[SPARK-29516][SQL][TEST] Test ThriftServerQueryTestSuite asynchronously
wangyum Oct 22, 2019
bb49c80
[SPARK-21492][SQL] Fix memory leak in SortMergeJoin
xuanyuanking Oct 22, 2019
b4844ee
[SPARK-29517][SQL] TRUNCATE TABLE should look up catalog/table like v…
viirya Oct 22, 2019
8779938
[SPARK-28787][DOC][SQL] Document LOAD DATA statement in SQL Reference
huaxingao Oct 22, 2019
c1c6485
[SPARK-28793][DOC][SQL] Document CREATE FUNCTION in SQL Reference
dilipbiswal Oct 22, 2019
2036a8c
[SPARK-29488][WEBUI] In Web UI, stage page has js error when sort table
jennyinspur Oct 22, 2019
8009468
[SPARK-29556][CORE] Avoid putting request path in error response in E…
srowen Oct 22, 2019
3bf5355
[SPARK-29539][SQL] SHOW PARTITIONS should look up catalog/table like …
huaxingao Oct 22, 2019
f23c5d7
[SPARK-29560][BUILD] Add typesafe bintray repo for sbt-mima-plugin
dongjoon-hyun Oct 22, 2019
e674909
[SPARK-29107][SQL][TESTS] Port window.sql (Part 1)
DylanGuedes Oct 23, 2019
c128ac5
[SPARK-29511][SQL] DataSourceV2: Support CREATE NAMESPACE
imback82 Oct 23, 2019
8c34690
[SPARK-29546][TESTS] Recover jersey-guava test dependency in docker-i…
dongjoon-hyun Oct 23, 2019
cbe6ead
[SPARK-29352][SQL][SS] Track active streaming queries in the SparkSes…
brkyvz Oct 23, 2019
70dd9c0
[SPARK-29542][SQL][DOC] Make the descriptions of spark.sql.files.* be…
turboFei Oct 23, 2019
0a70951
[SPARK-29499][CORE][PYSPARK] Add mapPartitionsWithIndex for RDDBarrier
ConeyLiu Oct 23, 2019
df00b5c
[SPARK-29569][BUILD][DOCS] Copy and paste minified jquery instead whe…
HyukjinKwon Oct 23, 2019
53a5f17
[SPARK-29513][SQL] REFRESH TABLE should look up catalog/table like v2…
imback82 Oct 23, 2019
bfbf282
[SPARK-29503][SQL] Remove conversion CreateNamedStruct to CreateNamed…
HeartSaVioR Oct 23, 2019
7e8e4c0
[SPARK-29552][SQL] Execute the "OptimizeLocalShuffleReader" rule when…
JkSelf Oct 23, 2019
5867707
[SPARK-29557][BUILD] Update dropwizard/codahale metrics library to 3.2.6
LucaCanali Oct 23, 2019
b91356e
[SPARK-29533][SQL][TESTS][FOLLOWUP] Regenerate the result on EC2
dongjoon-hyun Oct 23, 2019
7ecf968
[SPARK-29567][TESTS] Update JDBC Integration Test Docker Images
dongjoon-hyun Oct 23, 2019
fd899d6
[SPARK-29576][CORE] Use Spark's CompressionCodec for Ser/Deser of Map…
dbtsai Oct 24, 2019
55ced9c
[SPARK-29571][SQL][TESTS][FOLLOWUP] Fix UT in AllExecutionsPageSuite
07ARB Oct 24, 2019
177bf67
[SPARK-29522][SQL] CACHE TABLE should look up catalog/table like v2 c…
viirya Oct 24, 2019
9e77d48
[SPARK-21492][SQL][FOLLOW UP] Reimplement UnsafeExternalRowSorter in …
xuanyuanking Oct 24, 2019
1296bbb
[SPARK-29504][WEBUI] Toggle full job description on click
PavithraRamachandran Oct 24, 2019
67cf043
[SPARK-29145][SQL] Support sub-queries in join conditions
AngersZhuuuu Oct 24, 2019
1ec1b2b
[SPARK-28791][DOC] Documentation for Alter table Command
PavithraRamachandran Oct 24, 2019
76d4beb
[SPARK-29559][WEBUI] Support pagination for JDBC/ODBC Server page
shahidki31 Oct 24, 2019
a35fb4f
[SPARK-29578][TESTS] Add "8634" as another skipped day for Kwajalein …
srowen Oct 24, 2019
cdea520
[SPARK-29532][SQL] Simplify interval string parsing
cloud-fan Oct 24, 2019
dcf5eaf
[SPARK-29444][FOLLOWUP] add doc and python parameter for ignoreNullFi…
Oct 24, 2019
92b2529
[SPARK-21287][SQL] Remove requirement of fetch_size>=0 from JDBCOptions
fuwhu Oct 24, 2019
dec99d8
[SPARK-29526][SQL] UNCACHE TABLE should look up catalog/table like v2…
imback82 Oct 24, 2019
40df9d2
[SPARK-29227][SS] Track rule info in optimization phase
wenxuanguan Oct 25, 2019
7417c3e
[SPARK-29597][DOCS] Deprecate old Java 8 versions prior to 8u92
dongjoon-hyun Oct 25, 2019
1474ed0
[SPARK-29562][SQL] Speed up and slim down metric aggregation in SQL l…
Oct 25, 2019
091cbc3
[SPARK-9612][ML] Add instance weight support for GBTs
zhengruifeng Oct 25, 2019
cfbdd9d
[SPARK-29461][SQL] Measure the number of records being updated for JD…
HeartSaVioR Oct 25, 2019
8bd8f49
[SPARK-29500][SQL][SS] Support partition column when writing to Kafka
redsk Oct 25, 2019
0cf4f07
[SPARK-29545][SQL] Add support for bit_xor aggregate function
yaooqinn Oct 25, 2019
68dca9a
[SPARK-29527][SQL] SHOW CREATE TABLE should look up catalog/table lik…
viirya Oct 25, 2019
ae5b60d
[SPARK-29182][CORE][FOLLOWUP] Cache preferred locations of checkpoint…
viirya Oct 25, 2019
2baf7a1
[SPARK-29608][BUILD] Add `hadoop-3.2` profile to release build
dongjoon-hyun Oct 25, 2019
2549391
[SPARK-29580][TESTS] Add kerberos debug messages for Kafka secure tests
gaborgsomogyi Oct 25, 2019
5bdc58b
[SPARK-27653][SQL][FOLLOWUP] Fix `since` version of `min_by/max_by`
dongjoon-hyun Oct 26, 2019
9a46702
[SPARK-29554][SQL] Add `version` SQL function
yaooqinn Oct 26, 2019
2115bf6
[SPARK-29490][SQL] Reset 'WritableColumnVector' in 'RowToColumnarExec'
marin-ma Oct 26, 2019
077fb99
[SPARK-29589][WEBUI] Support pagination for sqlstats session table in…
shahidki31 Oct 26, 2019
74514b4
[SPARK-29614][SQL][TEST] Fix failures of DateTimeUtilsSuite and Times…
MaxGekk Oct 27, 2019
a43b966
[SPARK-29613][BUILD][SS] Upgrade to Kafka 2.3.1
dongjoon-hyun Oct 27, 2019
b19fd48
[SPARK-29093][PYTHON][ML] Remove automatically generated param setter…
huaxingao Oct 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-29500][SQL][SS] Support partition column when writing to Kafka
### What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-29500

`KafkaRowWriter` now supports setting the Kafka partition by reading a "partition" column in the input dataframe.

Code changes in commit nr. 1.
Test changes in commit nr. 2.
Doc changes in commit nr. 3.

tcondie dongjinleekr srowen

### Why are the changes needed?
While it is possible to configure a custom Kafka Partitioner with
`.option("kafka.partitioner.class", "my.custom.Partitioner")`, this is not enough for certain use cases. See the Jira issue.

### Does this PR introduce any user-facing change?
No, as this behaviour is optional.

### How was this patch tested?
Two new UT were added and one was updated.

Closes apache#26153 from redsk/feature/SPARK-29500.

Authored-by: redsk <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
redsk authored and srowen committed Oct 25, 2019
commit 8bd8f492ea006ce03d215c3b272c31c1b8bc1858
10 changes: 10 additions & 0 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ The Dataframe being written to Kafka should have the following columns in schema
<td>topic (*optional)</td>
<td>string</td>
</tr>
<tr>
<td>partition (optional)</td>
<td>int</td>
</tr>
</table>
\* The topic column is required if the "topic" configuration option is not specified.<br>

Expand All @@ -622,6 +626,12 @@ a ```null``` valued key column will be automatically added (see Kafka semantics
how ```null``` valued key values are handled). If a topic column exists then its value
is used as the topic when writing the given row to Kafka, unless the "topic" configuration
option is set i.e., the "topic" configuration option overrides the topic column.
If a "partition" column is not specified (or its value is ```null```)
then the partition is calculated by the Kafka producer.
A Kafka partitioner can be specified in Spark by setting the
```kafka.partitioner.class``` option. If not present, Kafka default partitioner
will be used.


The following options must be set for the Kafka sink
for both batch and streaming queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.kafka.common.header.internals.RecordHeader

import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
import org.apache.spark.sql.types.{BinaryType, StringType}
import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType}

/**
* Writes out data in a single Spark task, without any concerns about how
Expand Down Expand Up @@ -92,16 +92,19 @@ private[kafka010] abstract class KafkaRowWriter(
throw new NullPointerException(s"null topic present in the data. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
}
val partition: Integer =
if (projectedRow.isNullAt(4)) null else projectedRow.getInt(4)
val record = if (projectedRow.isNullAt(3)) {
new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, null, key, value)
new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, partition, key, value)
} else {
val headerArray = projectedRow.getArray(3)
val headers = (0 until headerArray.numElements()).map { i =>
val struct = headerArray.getStruct(i, 2)
new RecordHeader(struct.getUTF8String(0).toString, struct.getBinary(1))
.asInstanceOf[Header]
}
new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, null, key, value, headers.asJava)
new ProducerRecord[Array[Byte], Array[Byte]](
topic.toString, partition, key, value, headers.asJava)
}
producer.send(record, callback)
}
Expand Down Expand Up @@ -156,12 +159,23 @@ private[kafka010] abstract class KafkaRowWriter(
throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
s"attribute unsupported type ${t.catalogString}")
}
val partitionExpression =
inputSchema.find(_.name == KafkaWriter.PARTITION_ATTRIBUTE_NAME)
.getOrElse(Literal(null, IntegerType))
partitionExpression.dataType match {
case IntegerType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t. ${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " +
s"must be a ${IntegerType.catalogString}")
}
UnsafeProjection.create(
Seq(
topicExpression,
Cast(keyExpression, BinaryType),
Cast(valueExpression, BinaryType),
headersExpression
headersExpression,
partitionExpression
),
inputSchema
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.types.{BinaryType, MapType, StringType}
import org.apache.spark.sql.types.{BinaryType, IntegerType, MapType, StringType}
import org.apache.spark.util.Utils

/**
Expand All @@ -41,6 +41,7 @@ private[kafka010] object KafkaWriter extends Logging {
val KEY_ATTRIBUTE_NAME: String = "key"
val VALUE_ATTRIBUTE_NAME: String = "value"
val HEADERS_ATTRIBUTE_NAME: String = "headers"
val PARTITION_ATTRIBUTE_NAME: String = "partition"

override def toString: String = "KafkaWriter"

Expand Down Expand Up @@ -86,6 +87,14 @@ private[kafka010] object KafkaWriter extends Logging {
throw new AnalysisException(s"$HEADERS_ATTRIBUTE_NAME attribute type " +
s"must be a ${KafkaRecordToRowConverter.headersType.catalogString}")
}
schema.find(_.name == PARTITION_ATTRIBUTE_NAME).getOrElse(
Literal(null, IntegerType)
).dataType match {
case IntegerType => // good
case _ =>
throw new AnalysisException(s"$PARTITION_ATTRIBUTE_NAME attribute type " +
s"must be an ${IntegerType.catalogString}")
}
}

def write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,15 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
}
assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))

val ex4 = intercept[AnalysisException] {
/* partition field wrong type */
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as partition", "value"
)
}
assert(ex4.getMessage.toLowerCase(Locale.ROOT).contains(
"partition attribute type must be an int"))
}

test("streaming - write to non-existing topic") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.internals.DefaultPartitioner
import org.apache.kafka.common.Cluster
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._

Expand All @@ -33,7 +35,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{BinaryType, DataType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, StringType, StructField, StructType}

abstract class KafkaSinkSuiteBase extends QueryTest with SharedSparkSession with KafkaTest {
protected var testUtils: KafkaTestUtils = _
Expand Down Expand Up @@ -293,6 +295,21 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest {
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))

try {
ex = intercept[StreamingQueryException] {
/* partition field wrong type */
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value", "value as partition"
)
input.addData("1", "2", "3", "4", "5")
writer.processAllAvailable()
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"partition attribute type must be an int"))
}

test("streaming - write to non-existing topic") {
Expand Down Expand Up @@ -418,6 +435,65 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
)
}

def writeToKafka(df: DataFrame, topic: String, options: Map[String, String] = Map.empty): Unit = {
df
.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.options(options)
.mode("append")
.save()
}

def partitionsInTopic(topic: String): Set[Int] = {
createKafkaReader(topic)
.select("partition")
.map(_.getInt(0))
.collect()
.toSet
}

test("batch - partition column and partitioner priorities") {
val nrPartitions = 4
val topic1 = newTopic()
val topic2 = newTopic()
val topic3 = newTopic()
val topic4 = newTopic()
testUtils.createTopic(topic1, nrPartitions)
testUtils.createTopic(topic2, nrPartitions)
testUtils.createTopic(topic3, nrPartitions)
testUtils.createTopic(topic4, nrPartitions)
val customKafkaPartitionerConf = Map(
"kafka.partitioner.class" -> "org.apache.spark.sql.kafka010.TestKafkaPartitioner"
)

val df = (0 until 5).map(n => (topic1, s"$n", s"$n")).toDF("topic", "key", "value")

// default kafka partitioner
writeToKafka(df, topic1)
val partitionsInTopic1 = partitionsInTopic(topic1)
assert(partitionsInTopic1.size > 1)

// custom partitioner (always returns 0) overrides default partitioner
writeToKafka(df, topic2, customKafkaPartitionerConf)
val partitionsInTopic2 = partitionsInTopic(topic2)
assert(partitionsInTopic2.size == 1)
assert(partitionsInTopic2.head == 0)

// partition column overrides custom partitioner
val dfWithCustomPartition = df.withColumn("partition", lit(2))
writeToKafka(dfWithCustomPartition, topic3, customKafkaPartitionerConf)
val partitionsInTopic3 = partitionsInTopic(topic3)
assert(partitionsInTopic3.size == 1)
assert(partitionsInTopic3.head == 2)

// when the partition column value is null, it is ignored
val dfWithNullPartitions = df.withColumn("partition", lit(null).cast(IntegerType))
writeToKafka(dfWithNullPartitions, topic4)
assert(partitionsInTopic(topic4) == partitionsInTopic1)
}

test("batch - null topic field value, and no topic option") {
val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
val ex = intercept[SparkException] {
Expand Down Expand Up @@ -515,3 +591,13 @@ class KafkaSinkBatchSuiteV2 extends KafkaSinkBatchSuiteBase {
}
}
}

class TestKafkaPartitioner extends DefaultPartitioner {
override def partition(
topic: String,
key: Any,
keyBytes: Array[Byte],
value: Any,
valueBytes: Array[Byte],
cluster: Cluster): Int = 0
}