Skip to content
Closed
Prev Previous commit
Next Next commit
address
  • Loading branch information
zsxwing committed Aug 24, 2018
commit 7a02921950cda865e3cd45f1d1635212c2f707c0
Original file line number Diff line number Diff line change
Expand Up @@ -395,15 +395,13 @@ private[kafka010] case class InternalKafkaConsumer(
// Never happen as "reportDataLoss" will throw an exception
throw new IllegalStateException(
"reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
} else if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
// Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
fetchedRecord.withRecord(null, untilOffset)
} else {
if (record.offset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
// Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
fetchedRecord.withRecord(null, untilOffset)
} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})")
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
} else if (record.offset < offset) {
// This should not happen. If it does happen, then we probably misunderstand Kafka internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)

private val rangeToRead = resolveRange(offsetRange)

private val converter = new KafkaRecordToUnsafeRowConverter

private var nextOffset = rangeToRead.fromOffset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,10 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
s"AddKafkaData(topics = $topics, data = $data, message = $message)"
}

object WithKafkaProducer {
def apply(
topic: String,
producer: KafkaProducer[String, String])(
func: KafkaProducer[String, String] => Unit): AssertOnQuery = {
object WithOffsetSync {
def apply(topic: String)(func: () => Unit): StreamAction = {
Execute("Run Kafka Producer")(_ => {
func(producer)
func()
// This is a hack for the race condition that the committed message may be not visible to
// consumer for a short time.
// Looks like after the following call returns, the consumer can always read the committed
Expand Down Expand Up @@ -661,7 +658,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
CheckAnswer(),
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
// Send 5 messages. They should be visible only after being committed.
producer.beginTransaction()
(0 to 4).foreach { i =>
Expand All @@ -672,7 +669,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
waitUntilBatchProcessed,
// Should not see any uncommitted messages
CheckNewAnswer(),
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
producer.commitTransaction()
},
AdvanceManualClock(100),
Expand All @@ -681,7 +678,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this waitUntilBatchProcessed needed? CheckAnswer waits for the batch to complete anyways.

CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message]
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
// Send 5 messages and abort the transaction. They should not be read.
producer.beginTransaction()
(6 to 10).foreach { i =>
Expand All @@ -695,7 +692,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(), // offset: 9*, 10*, 11*
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
// Send 5 messages again. The consumer should skip the above aborted messages and read
// them.
producer.beginTransaction()
Expand All @@ -710,7 +707,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(15, 16), // offset: 15, 16, 17*
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
producer.beginTransaction()
producer.send(new ProducerRecord[String, String](topic, "18")).get()
producer.commitTransaction()
Expand Down Expand Up @@ -783,7 +780,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
CheckNewAnswer(),
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
// Send 5 messages. They should be visible only after being committed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why so? This read_uncommitted, right?

producer.beginTransaction()
(0 to 4).foreach { i =>
Expand All @@ -793,13 +790,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(0, 1, 2), // offset 0, 1, 2
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
producer.commitTransaction()
},
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message]
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
// Send 5 messages and abort the transaction. They should not be read.
producer.beginTransaction()
(6 to 10).foreach { i =>
Expand All @@ -813,7 +810,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(9, 10), // offset: 9, 10, 11*
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
// Send 5 messages again. The consumer should skip the above aborted messages and read
// them.
producer.beginTransaction()
Expand All @@ -828,7 +825,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(15, 16), // offset: 15, 16, 17*
WithKafkaProducer(topic, producer) { producer =>
WithOffsetSync(topic) { () =>
producer.beginTransaction()
producer.send(new ProducerRecord[String, String](topic, "18")).get()
producer.commitTransaction()
Expand Down