Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
SPARK-20597 KafkaSourceProvider falls back on path as synonym for topic
  • Loading branch information
Satyajit Vegesna committed Aug 2, 2018
commit d4e1ed0c25121ad5bf24cfe137e2ee1bff430c94
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
// Picks the defaulttopicname from "path" key, an entry in "parameters" Map,
// if no topic key is present in the "parameters" Map and is provided with key "path".
val defaultTopic = parameters.get(TOPIC_OPTION_KEY) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this simpler as something like

val defaultTopic = parameters.getOrElse(TOPIC_OPTION_KEY, parameters.get(PATH_OPTION_KEY)).map(_.trim)

case None => parameters.get(PATH_OPTION_KEY) match {
case path: Option[String] => parameters.get(PATH_OPTION_KEY).map(_.trim) case _ => None}
case topic: Option[String] => parameters.get(TOPIC_OPTION_KEY).map(_.trim)
}
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
new KafkaSink(sqlContext,
new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic)
Expand All @@ -249,7 +255,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val topic = parameters.get(TOPIC_OPTION_KEY) match {
case None => parameters.get(PATH_OPTION_KEY) match {
case path: Option[String] => parameters.get(PATH_OPTION_KEY).map(_.trim) case _ => None}
case topic: Option[String] => parameters.get(TOPIC_OPTION_KEY).map(_.trim)
}// parameters.get(TOPIC_OPTION_KEY).map(_.trim)

val specifiedKafkaParams = kafkaParamsForProducer(parameters)
KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic)
Expand Down Expand Up @@ -465,24 +476,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private val MIN_PARTITIONS_OPTION_KEY = "minpartitions"

val TOPIC_OPTION_KEY = "topic"

val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
"""
|Some data may have been lost because they are not available in Kafka any more; either the
| data was aged out by Kafka or the topic may have been deleted before all the data in the
| topic was processed. If you want your streaming query to fail on such cases, set the source
| option "failOnDataLoss" to "true".
""".stripMargin

val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
"""
|Some data may have been lost because they are not available in Kafka any more; either the
| data was aged out by Kafka or the topic may have been deleted before all the data in the
| topic was processed. If you don't want your streaming query to fail on such cases, set the
| source option "failOnDataLoss" to "false".
""".stripMargin


val PATH_OPTION_KEY = "path"

private val deserClassName = classOf[ByteArrayDeserializer].getName

Expand Down