Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add a unit test
  • Loading branch information
zsxwing committed Dec 17, 2018
commit 063d6290f88a8adecfd7141aab7d1b99c760d2e3
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,22 @@ private[kafka010] class KafkaMicroBatchReadSupport(
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))

val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
val untilOffsets = endPartitionOffsets
untilOffsets.foreach { case (tp, untilOffset) =>
fromOffsets.get(tp).foreach { fromOffset =>
if (untilOffset < fromOffset) {
reportDataLoss(s"Partition $tp's offset was changed from " +
s"$fromOffset to $untilOffset, some data may have been missed")
}
}
}

// Calculate offset ranges
val offsetRanges = rangeCalculator.getRanges(
fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
untilOffsets = endPartitionOffsets,
fromOffsets = fromOffsets,
untilOffsets = untilOffsets,
executorLocations = getSortedExecutorList())
offsetRanges.filter { range =>
if (range.untilOffset < range.fromOffset) {
reportDataLoss(s"Partition ${range.topicPartition}'s offset was changed from " +
s"${range.fromOffset} to ${range.untilOffset}, some data may have been missed")
false
} else {
true
}
}

// Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
// that is, concurrent tasks will not read the same TopicPartitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ private[kafka010] class KafkaOffsetReader(
if (incorrectOffsets.nonEmpty) {
logWarning("Found incorrect offsets in some partitions " +
s"(partition, previous offset, fetched offset): $incorrectOffsets")
}
if (attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
Thread.sleep(offsetFetchAttemptIntervalMs)
if (attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
Thread.sleep(offsetFetchAttemptIntervalMs)
}
}
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
)
}

test("subscribe topic by pattern with topic recreation between batches") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-good"
val topic2 = topicPrefix + "-bad"
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, Array("1", "3"))
testUtils.createTopic(topic2, partitions = 1)
testUtils.sendMessages(topic2, Array("2", "4"))

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("kafka.default.api.timeout.ms", "3000")
.option("startingOffsets", "earliest")
.option("subscribePattern", s"$topicPrefix-.*")

val ds = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.map(kv => kv._2.toInt)

testStream(ds)(
StartStream(),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer(1, 2, 3, 4),
// Restart the stream in this test to make the test stable. When recreating a topic when a
// consumer is alive, it may not be able to see the recreated topic even if a fresh consumer
// has seen it.
StopStream,
// Recreate `topic2` and wait until it's available
WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
testUtils.deleteTopic(topic2)
testUtils.createTopic(topic2)
testUtils.sendMessages(topic2, Array("6"))
},
StartStream(),
ExpectFailure[IllegalStateException](e => {
// The offset of `topic2` should be changed from 2 to 1
assert(e.getMessage.contains("was changed from 2 to 1"))
})
)
}

test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
withTempDir { metadataPath =>
val topic = "kafka-initial-offset-current"
Expand Down