From dc18a6ff59fe7c48ed188a4eb9a6abf04caee0bd Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 8 Aug 2018 10:40:37 -0700 Subject: [PATCH 01/10] Support non-consecutive offsets for Kafka --- .../sql/kafka010/KafkaContinuousReader.scala | 2 +- .../sql/kafka010/KafkaDataConsumer.scala | 50 +++- .../sql/kafka010/KafkaMicroBatchReader.scala | 3 +- .../spark/sql/kafka010/KafkaSourceRDD.scala | 38 --- .../kafka010/KafkaContinuousSourceSuite.scala | 149 +++++++++- .../kafka010/KafkaMicroBatchSourceSuite.scala | 267 +++++++++++++++++- .../sql/kafka010/KafkaRelationSuite.scala | 101 +++++++ .../spark/sql/kafka010/KafkaTestUtils.scala | 17 +- 8 files changed, 580 insertions(+), 47 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 48b91dfe764e..a0ec1b4260b4 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -220,7 +220,7 @@ class KafkaContinuousInputPartitionReader( // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, // or if it's the endpoint of the data range (i.e. the "true" next offset). - case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => + case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => val range = consumer.getAvailableOffsetRange() if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) { // retry diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 941f0ab177e4..aaae9290f594 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread +/** + * An exception to indicate there is a missing offset in the records returned by Kafka consumer. + * This means it's either a transaction (commit or abort) marker, or an aborted message if + * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are + * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. + */ +private[kafka010] class MissingOffsetException( + val offset: Long, + val nextOffsetToFetch: Long) extends Exception( + s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch") + private[kafka010] sealed trait KafkaDataConsumer { /** * Get the record for the given offset if available. Otherwise it will either throw error @@ -95,6 +106,10 @@ private[kafka010] case class InternalKafkaConsumer( ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET + @volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET + + @volatile private var offsetAfterPoll: Long = UNKNOWN_OFFSET + /** Create a KafkaConsumer to fetch records for `topicPartition` */ private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) @@ -170,6 +185,14 @@ private[kafka010] case class InternalKafkaConsumer( resetConsumer() reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset", e) toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset) + case e: MissingOffsetException => + toFetchOffset = e.nextOffsetToFetch + if (toFetchOffset >= untilOffset) { + resetFetchedData() + toFetchOffset = UNKNOWN_OFFSET + } else { + logDebug(s"Skipped offsets [$offset, $toFetchOffset]") + } } } @@ -251,25 +274,38 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { - if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { + if (offset != nextOffsetInFetchedData) { // This is the first fetch, or the last pre-fetched data has been drained. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) } + if (!fetchedData.hasNext) { + if (offset < offsetAfterPoll) { + resetFetchedData() + throw new MissingOffsetException(offset, offsetAfterPoll) + } else { + seek(offset) + poll(pollTimeoutMs) + } + } if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: + // We cannot fetch anything after `poll`. Three possible cases: // - `offset` is out of range so that Kafka returns nothing. Just throw // `OffsetOutOfRangeException` to let the caller handle it. // - Cannot fetch any data before timeout. TimeoutException will be thrown. + // - Fetched something but all of them are not valid date messages. In this case, the position + // will be changed and we can use it to determine this case. val range = getAvailableOffsetRange() if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) - } else { + } else if (offsetBeforePoll == offsetAfterPoll) { throw new TimeoutException( s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") + } else { + throw new MissingOffsetException(offset, offsetAfterPoll) } } else { val record = fetchedData.next() @@ -277,6 +313,11 @@ private[kafka010] case class InternalKafkaConsumer( // In general, Kafka uses the specified offset as the start point, and tries to fetch the next // available offset. Hence we need to handle offset mismatch. if (record.offset > offset) { + val range = getAvailableOffsetRange() + if (range.earliest <= offset) { + resetFetchedData() + throw new MissingOffsetException(offset, record.offset) + } // This may happen when some records aged out but their offsets already got verified if (failOnDataLoss) { reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})") @@ -347,9 +388,12 @@ private[kafka010] case class InternalKafkaConsumer( } private def poll(pollTimeoutMs: Long): Unit = { + offsetBeforePoll = consumer.position(topicPartition) val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") + offsetAfterPoll = consumer.position(topicPartition) + logDebug(s"Offset changed from $offsetBeforePoll to $offsetAfterPoll after polling") fetchedData = r.iterator } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index 6c95b2b2560c..acf185f38c97 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -327,6 +327,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) private val rangeToRead = resolveRange(offsetRange) + private val converter = new KafkaRecordToUnsafeRowConverter private var nextOffset = rangeToRead.fromOffset @@ -337,6 +338,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) if (record != null) { nextRow = converter.toUnsafeRow(record) + nextOffset = record.offset + 1 true } else { false @@ -348,7 +350,6 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( override def get(): UnsafeRow = { assert(nextRow != null) - nextOffset += 1 nextRow } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index 8b4494d2e9a2..f8b90056d293 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD( offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray } - override def count(): Long = offsetRanges.map(_.size).sum - - override def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = { - val c = count - new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } - - override def isEmpty(): Boolean = count == 0L - - override def take(num: Int): Array[ConsumerRecord[Array[Byte], Array[Byte]]] = { - val nonEmptyPartitions = - this.partitions.map(_.asInstanceOf[KafkaSourceRDDPartition]).filter(_.offsetRange.size > 0) - - if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[ConsumerRecord[Array[Byte], Array[Byte]]](0) - } - - // Determine in advance how many messages need to be taken from each partition - val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { - val taken = Math.min(remain, part.offsetRange.size) - result + (part.index -> taken.toInt) - } else { - result - } - } - - val buf = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]] - val res = context.runJob( - this, - (tc: TaskContext, it: Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]) => - it.take(parts(tc.partitionId)).toArray, parts.keys.toArray - ) - res.foreach(buf ++= _) - buf.toArray - } - override def getPreferredLocations(split: Partition): Seq[String] = { val part = split.asInstanceOf[KafkaSourceRDDPartition] part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index ea2a2a84d22c..a847601176a4 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -17,12 +17,159 @@ package org.apache.spark.sql.kafka010 +import org.apache.kafka.clients.producer.ProducerRecord + import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.streaming.Trigger // Run tests in KafkaSourceSuiteBase in continuous execution mode. -class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest +class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest { + import testImplicits._ + + test("read Kafka transactional messages: read_committed") { + val table = "kafka_continuous_source_test" + withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + val producer = testUtils.createProducer(usingTrascation = true) + try { + val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + .map(kv => kv._2.toInt) + + val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(ContinuousTrigger(100)) + .start() + try { + producer.initTransactions() + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + // Should not read any messages before they are committed + assert(spark.table(table).isEmpty) + + producer.commitTransaction() + + eventually(timeout(streamingTimeout)) { + // Should read all committed messages + checkAnswer(spark.table(table), (1 to 5).toDF) + } + + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + + // Should not read aborted messages + checkAnswer(spark.table(table), (1 to 5).toDF) + + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + + eventually(timeout(streamingTimeout)) { + // Should skip aborted messages and read new committed ones. + checkAnswer(spark.table(table), ((1 to 5) ++ (11 to 15)).toDF) + } + } finally { + q.stop() + } + } finally { + producer.close() + } + } + } + + test("read Kafka transactional messages: read_uncommitted") { + val table = "kafka_continuous_source_test" + withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + val producer = testUtils.createProducer(usingTrascation = true) + try { + val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_uncommitted") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + .map(kv => kv._2.toInt) + + val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(ContinuousTrigger(100)) + .start() + try { + producer.initTransactions() + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + eventually(timeout(streamingTimeout)) { + // Should read all committed messages + checkAnswer(spark.table(table), (1 to 5).toDF) + } + + producer.commitTransaction() + + eventually(timeout(streamingTimeout)) { + // Should read all committed messages + checkAnswer(spark.table(table), (1 to 5).toDF) + } + + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + + eventually(timeout(streamingTimeout)) { + // Should not read aborted messages + checkAnswer(spark.table(table), (1 to 10).toDF) + } + + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + + eventually(timeout(streamingTimeout)) { + // Should skip aborted messages and read new committed ones. + checkAnswer(spark.table(table), (1 to 15).toDF) + } + } finally { + q.stop() + } + } finally { + producer.close() + } + } + } +} class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { import testImplicits._ diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index aa898686c77c..c276e12d41b9 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -29,7 +29,7 @@ import scala.collection.mutable import scala.io.Source import scala.util.Random -import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ @@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { s"AddKafkaData(topics = $topics, data = $data, message = $message)" } + object WithKafkaProducer { + def apply( + topic: String, + producer: KafkaProducer[String, String])( + func: KafkaProducer[String, String] => Unit): AssertOnQuery = { + AssertOnQuery(_ => { + func(producer) + // This is a hack for the race condition that the committed message may be not visible to + // consumer for a short time. + // Looks like after the following call returns, the consumer can always read the committed + // messages. + testUtils.getLatestOffsets(Set(topic)) + true + }, "Run Kafka Producer") + } + } + private val topicId = new AtomicInteger(0) protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}" } @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { + // This test will cover the following cases: + // 1. the whole batch contains no data messages + // 2. the first offset in a batch is not a committed data message + // 3. the last offset in a batch is not a committed data message + // 4. there is a gap in the middle of a batch + + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + + val producer = testUtils.createProducer(usingTrascation = true) + try { + producer.initTransactions() + + testStream(mapped)( + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + // 1 from smallest, 1 from middle, 8 from biggest + CheckAnswer(), + WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + // Should not see any uncommitted messages + CheckAnswer(), + WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 3: _*), // offset 0, 1, 2 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] + WithKafkaProducer(topic, producer) { producer => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8* + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11* + WithKafkaProducer(topic, producer) { producer => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17* + WithKafkaProducer(topic, producer) { producer => + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "16")).get() + producer.commitTransaction() + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "17")).get() + producer.commitTransaction() + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "18")).get() + producer.send(new ProducerRecord[String, String](topic, "19")).get() + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24* + ) + } finally { + producer.close() + } + } + + test("read Kafka transactional messages: read_uncommitted") { + // This test will cover the following cases: + // 1. the whole batch contains no data messages + // 2. the first offset in a batch is not a committed data message + // 3. the last offset in a batch is not a committed data message + // 4. there is a gap in the middle of a batch + + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_uncommitted") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + + val producer = testUtils.createProducer(usingTrascation = true) + try { + producer.initTransactions() + + testStream(mapped)( + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + // 1 from smallest, 1 from middle, 8 from biggest + CheckAnswer(), + WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 3: _*), // offset 0, 1, 2 + WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] + WithKafkaProducer(topic, producer) { producer => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 8: _*), // offset: 6, 7, 8 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 10: _*), // offset: 9, 10, 11* + WithKafkaProducer(topic, producer) { producer => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 13: _*), // offset: 12, 13, 14 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 15: _*), // offset: 15, 16, 17* + WithKafkaProducer(topic, producer) { producer => + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "16")).get() + producer.commitTransaction() + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "17")).get() + producer.commitTransaction() + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "18")).get() + producer.send(new ProducerRecord[String, String](topic, "19")).get() + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 17: _*), // offset: 18, 19*, 20 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 19: _*), // offset: 21*, 22, 23 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 19: _*) // offset: 24* + ) + } finally { + producer.close() + } + } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 91893df4ec32..1dc0552dbdf1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010 import java.util.Locale import java.util.concurrent.atomic.AtomicInteger +import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.scalatest.BeforeAndAfter @@ -235,4 +236,104 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon testBadOptions("subscribe" -> "")("no topics to subscribe") testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") } + + test("read Kafka transactional messages: read_committed") { + val topic = newTopic() + testUtils.createTopic(topic) + val producer = testUtils.createProducer(usingTrascation = true) + try { + val df = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .load() + .selectExpr("CAST(value AS STRING)") + + producer.initTransactions() + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + // Should not read any messages before they are committed + assert(df.isEmpty) + + producer.commitTransaction() + + // Should read all committed messages + checkAnswer(df, (1 to 5).map(_.toString).toDF) + + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + + // Should not read aborted messages + checkAnswer(df, (1 to 5).map(_.toString).toDF) + + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + + // Should skip aborted messages and read new committed ones. + checkAnswer(df, ((1 to 5) ++ (11 to 15)).map(_.toString).toDF) + } finally { + producer.close() + } + } + + test("read Kafka transactional messages: read_uncommitted") { + val topic = newTopic() + testUtils.createTopic(topic) + val producer = testUtils.createProducer(usingTrascation = true) + try { + val df = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_uncommitted") + .option("subscribe", topic) + .load() + .selectExpr("CAST(value AS STRING)") + + producer.initTransactions() + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + // "read_uncommitted" should see all messages including uncommitted ones + checkAnswer(df, (1 to 5).map(_.toString).toDF) + + producer.commitTransaction() + + // Should read all committed messages + checkAnswer(df, (1 to 5).map(_.toString).toDF) + + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + + // "read_uncommitted" should see all messages including uncommitted or aborted ones + checkAnswer(df, (1 to 10).map(_.toString).toDF) + + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + + // Should read all messages + checkAnswer(df, (1 to 15).map(_.toString).toDF) + } finally { + producer.close() + } + } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index d89cccd3c521..ec7ad11a2d8d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.util.{Map => JMap, Properties} +import java.util.{Map => JMap, Properties, UUID} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -308,9 +308,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") + props.put("group.initial.rebalance.delay.ms", "10") + + // Change the following settings as we have only 1 broker props.put("offsets.topic.num.partitions", "1") props.put("offsets.topic.replication.factor", "1") - props.put("group.initial.rebalance.delay.ms", "10") + props.put("transaction.state.log.replication.factor", "1") + props.put("transaction.state.log.min.isr", "1") + // Can not use properties.putAll(propsMap.asJava) in scala-2.12 // See https://github.com/scala/bug/issues/10418 withBrokerProps.foreach { case (k, v) => props.put(k, v) } @@ -327,6 +332,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props } + def createProducer(usingTrascation: Boolean): KafkaProducer[String, String] = { + val props = producerConfiguration + if (usingTrascation) { + props.put("transactional.id", UUID.randomUUID().toString) + } + new KafkaProducer[String, String](props) + } + private def consumerConfiguration: Properties = { val props = new Properties() props.put("bootstrap.servers", brokerAddress) From dfea7e363ef479c3783171bc3644be61d74beee7 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 8 Aug 2018 11:08:34 -0700 Subject: [PATCH 02/10] minor --- .../apache/spark/sql/kafka010/KafkaDataConsumer.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index aaae9290f594..5749ddeb6946 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -275,13 +275,14 @@ private[kafka010] case class InternalKafkaConsumer( pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { if (offset != nextOffsetInFetchedData) { - // This is the first fetch, or the last pre-fetched data has been drained. + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) - } - if (!fetchedData.hasNext) { + } else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. if (offset < offsetAfterPoll) { + // Offsets in [offset, offsetAfterPoll) are missing. We should skip them. resetFetchedData() throw new MissingOffsetException(offset, offsetAfterPoll) } else { @@ -305,6 +306,8 @@ private[kafka010] case class InternalKafkaConsumer( throw new TimeoutException( s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") } else { + assert(offset <= offsetAfterPoll, + s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") throw new MissingOffsetException(offset, offsetAfterPoll) } } else { From baef29f2983560c8010681c9bb7e74f711c8f2e7 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 15 Aug 2018 10:53:21 -0700 Subject: [PATCH 03/10] address --- .../sql/kafka010/KafkaDataConsumer.scala | 170 ++++++++++-------- 1 file changed, 93 insertions(+), 77 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 5749ddeb6946..61bb8c235d3e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread -/** - * An exception to indicate there is a missing offset in the records returned by Kafka consumer. - * This means it's either a transaction (commit or abort) marker, or an aborted message if - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. - */ -private[kafka010] class MissingOffsetException( - val offset: Long, - val nextOffsetToFetch: Long) extends Exception( - s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch") - private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the record for the given offset if available. + * + * If the record is invisible (either a + * transaction message, or an aborted message when the consumer's `isolation.level` is + * `read_committed`), it will be skipped and this method will try to fetch next available record + * within [offset, untilOffset). + * + * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this + * method will try to fetch next available record within [offset, untilOffset). + * + * When this method tries to skip offsets due to either invisible messages or data loss and + * reaches `untilOffset`, it will return `null`. * * @param offset the offset to fetch. * @param untilOffset the max offset to fetch. Exclusive. @@ -91,6 +90,17 @@ private[kafka010] case class InternalKafkaConsumer( kafkaParams: ju.Map[String, Object]) extends Logging { import InternalKafkaConsumer._ + /** + * The internal object returned by the `fetchData` method. If `record` is empty, it means it is + * invisible (either a transaction message, or an aborted message when the consumer's + * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch + * instead. + */ + private case class FetchedRecord( + record: Option[ConsumerRecord[Array[Byte], Array[Byte]]], + nextOffsetToFetch: Long + ) + private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] @volatile private var consumer = createConsumer @@ -103,11 +113,13 @@ private[kafka010] case class InternalKafkaConsumer( /** Iterator to the already fetch data */ @volatile private var fetchedData = - ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] + ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]] @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET - @volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET - + /** + * The next available offset returned by Kafka after polling. This is the next offset after + * draining `fetchedData`. + */ @volatile private var offsetAfterPoll: Long = UNKNOWN_OFFSET /** Create a KafkaConsumer to fetch records for `topicPartition` */ @@ -140,20 +152,7 @@ private[kafka010] case class InternalKafkaConsumer( AvailableOffsetRange(earliestOffset, latestOffset) } - /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. - * - * @param offset the offset to fetch. - * @param untilOffset the max offset to fetch. Exclusive. - * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. - * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at - * offset if available, or throw exception.when `failOnDataLoss` is `false`, - * this method will either return record at offset if available, or return - * the next earliest available record less than untilOffset, or null. It - * will not throw any exception. - */ + /** @see [[KafkaDataConsumer.get]] */ def get( offset: Long, untilOffset: Long, @@ -168,15 +167,25 @@ private[kafka010] case class InternalKafkaConsumer( // we will move to the next available offset within `[offset, untilOffset)` and retry. // If `failOnDataLoss` is `true`, the loop body will be executed only once. var toFetchOffset = offset - var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null + var fetchedRecord: FetchedRecord = null // We want to break out of the while loop on a successful fetch to avoid using "return" // which may causes a NonLocalReturnControl exception when this method is used as a function. var isFetchComplete = false while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) { try { - consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) - isFetchComplete = true + fetchedRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) + if (fetchedRecord.record.nonEmpty) { + isFetchComplete = true + } else { + toFetchOffset = fetchedRecord.nextOffsetToFetch + if (toFetchOffset >= untilOffset) { + resetFetchedData() + toFetchOffset = UNKNOWN_OFFSET + } else { + logDebug(s"Skipped offsets [$offset, $toFetchOffset]") + } + } } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -185,19 +194,11 @@ private[kafka010] case class InternalKafkaConsumer( resetConsumer() reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset", e) toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset) - case e: MissingOffsetException => - toFetchOffset = e.nextOffsetToFetch - if (toFetchOffset >= untilOffset) { - resetFetchedData() - toFetchOffset = UNKNOWN_OFFSET - } else { - logDebug(s"Skipped offsets [$offset, $toFetchOffset]") - } } } if (isFetchComplete) { - consumerRecord + fetchedRecord.record.get } else { resetFetchedData() null @@ -262,7 +263,8 @@ private[kafka010] case class InternalKafkaConsumer( } /** - * Get the record for the given offset if available. Otherwise it will either throw error + * Get the fetched record for the given offset if available. If the Otherwise it will either throw + * error * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), * or null. * @@ -273,43 +275,26 @@ private[kafka010] case class InternalKafkaConsumer( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { + failOnDataLoss: Boolean): FetchedRecord = { if (offset != nextOffsetInFetchedData) { // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) + poll(offset, pollTimeoutMs) } else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained. if (offset < offsetAfterPoll) { // Offsets in [offset, offsetAfterPoll) are missing. We should skip them. resetFetchedData() - throw new MissingOffsetException(offset, offsetAfterPoll) + return FetchedRecord(None, offsetAfterPoll) } else { - seek(offset) - poll(pollTimeoutMs) + poll(offset, pollTimeoutMs) } } if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Three possible cases: - // - `offset` is out of range so that Kafka returns nothing. Just throw - // `OffsetOutOfRangeException` to let the caller handle it. - // - Cannot fetch any data before timeout. TimeoutException will be thrown. - // - Fetched something but all of them are not valid date messages. In this case, the position - // will be changed and we can use it to determine this case. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { - throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) - } else if (offsetBeforePoll == offsetAfterPoll) { - throw new TimeoutException( - s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") - } else { - assert(offset <= offsetAfterPoll, - s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") - throw new MissingOffsetException(offset, offsetAfterPoll) - } + assert(offset <= offsetAfterPoll, + s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") + FetchedRecord(None, offsetAfterPoll) } else { val record = fetchedData.next() nextOffsetInFetchedData = record.offset + 1 @@ -318,8 +303,13 @@ private[kafka010] case class InternalKafkaConsumer( if (record.offset > offset) { val range = getAvailableOffsetRange() if (range.earliest <= offset) { - resetFetchedData() - throw new MissingOffsetException(offset, record.offset) + // `offset` is still valid but the corresponding message is invisible. We should skip it + // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of + // `fetchData` can just return `record` directly. + assert(fetchedData.hasPrevious, "fetchedData cannot move back") + fetchedData.previous() + nextOffsetInFetchedData = record.offset + return FetchedRecord(None, record.offset) } // This may happen when some records aged out but their offsets already got verified if (failOnDataLoss) { @@ -332,7 +322,7 @@ private[kafka010] case class InternalKafkaConsumer( null } else { reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") - record + FetchedRecord(Some(record), nextOffsetInFetchedData) } } } else if (record.offset < offset) { @@ -341,7 +331,7 @@ private[kafka010] case class InternalKafkaConsumer( throw new IllegalStateException( s"Tried to fetch $offset but the returned record offset was ${record.offset}") } else { - record + FetchedRecord(Some(record), nextOffsetInFetchedData) } } } @@ -356,7 +346,7 @@ private[kafka010] case class InternalKafkaConsumer( /** Reset the internal pre-fetched data. */ private def resetFetchedData(): Unit = { nextOffsetInFetchedData = UNKNOWN_OFFSET - fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] + fetchedData = ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]] } /** @@ -390,14 +380,40 @@ private[kafka010] case class InternalKafkaConsumer( consumer.seek(topicPartition, offset) } - private def poll(pollTimeoutMs: Long): Unit = { - offsetBeforePoll = consumer.position(topicPartition) + /** + * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`. + * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible + * messages (either transaction messages, or aborted messages when `isolation.level` is + * `read_committed`). + * + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. + */ + private def poll(offset: Long, pollTimeoutMs: Long): Unit = { + seek(offset) val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") offsetAfterPoll = consumer.position(topicPartition) - logDebug(s"Offset changed from $offsetBeforePoll to $offsetAfterPoll after polling") - fetchedData = r.iterator + logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") + fetchedData = r.listIterator + if (!fetchedData.hasNext) { + // We cannot fetch anything after `poll`. Two possible cases: + // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will + // be thrown. + // - Cannot fetch any data before timeout. `TimeoutException` will be thrown. + // - Fetched something but all of them are not invisible. This is a valid case and let the + // caller handles this. + val range = getAvailableOffsetRange() + if (offset < range.earliest || offset >= range.latest) { + throw new OffsetOutOfRangeException( + Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + } else if (offset == offsetAfterPoll) { + throw new TimeoutException( + s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") + } + } } } From a9b00b4a22f0b6b364cd1b35e2d99923d8b233dc Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 15 Aug 2018 10:57:50 -0700 Subject: [PATCH 04/10] update comment --- .../spark/sql/kafka010/KafkaDataConsumer.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 61bb8c235d3e..1438b459a06f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -263,10 +263,15 @@ private[kafka010] case class InternalKafkaConsumer( } /** - * Get the fetched record for the given offset if available. If the Otherwise it will either throw - * error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the fetched record for the given offset if available. + * + * If the record is invisible (either a transaction message, or an aborted message when the + * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the + * next offset to fetch. + * + * This method also will try the best to detect data loss. If failOnDataLoss` is `false`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this + * method will return `null` if the next available record is within [offset, untilOffset). * * @throws OffsetOutOfRangeException if `offset` is out of range * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. From c98bf50dc4d1b88d08f156230f0ca97920028d67 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 21 Aug 2018 16:02:33 -0700 Subject: [PATCH 05/10] reuse FetchedRecord --- .../sql/kafka010/KafkaDataConsumer.scala | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 1438b459a06f..b4f3cbeaa63b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -40,8 +40,8 @@ private[kafka010] sealed trait KafkaDataConsumer { * `read_committed`), it will be skipped and this method will try to fetch next available record * within [offset, untilOffset). * - * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will - * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this + * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this * method will try to fetch next available record within [offset, untilOffset). * * When this method tries to skip offsets due to either invisible messages or data loss and @@ -97,9 +97,17 @@ private[kafka010] case class InternalKafkaConsumer( * instead. */ private case class FetchedRecord( - record: Option[ConsumerRecord[Array[Byte], Array[Byte]]], - nextOffsetToFetch: Long - ) + var record: ConsumerRecord[Array[Byte], Array[Byte]], + var nextOffsetToFetch: Long) { + + def withRecord( + record: ConsumerRecord[Array[Byte], Array[Byte]], + nextOffsetToFetch: Long): FetchedRecord = { + this.record = record + this.nextOffsetToFetch = nextOffsetToFetch + this + } + } private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] @@ -116,6 +124,12 @@ private[kafka010] case class InternalKafkaConsumer( ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]] @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET + /** + * The fetched record returned from the `fetchData` method. This is a reusable private object to + * avoid memory allocation. + */ + private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET) + /** * The next available offset returned by Kafka after polling. This is the next offset after * draining `fetchedData`. @@ -175,7 +189,7 @@ private[kafka010] case class InternalKafkaConsumer( while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) { try { fetchedRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) - if (fetchedRecord.record.nonEmpty) { + if (fetchedRecord.record != null) { isFetchComplete = true } else { toFetchOffset = fetchedRecord.nextOffsetToFetch @@ -198,7 +212,7 @@ private[kafka010] case class InternalKafkaConsumer( } if (isFetchComplete) { - fetchedRecord.record.get + fetchedRecord.record } else { resetFetchedData() null @@ -269,8 +283,8 @@ private[kafka010] case class InternalKafkaConsumer( * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the * next offset to fetch. * - * This method also will try the best to detect data loss. If failOnDataLoss` is `false`, it will - * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this + * This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this * method will return `null` if the next available record is within [offset, untilOffset). * * @throws OffsetOutOfRangeException if `offset` is out of range @@ -290,7 +304,7 @@ private[kafka010] case class InternalKafkaConsumer( if (offset < offsetAfterPoll) { // Offsets in [offset, offsetAfterPoll) are missing. We should skip them. resetFetchedData() - return FetchedRecord(None, offsetAfterPoll) + return fetchedRecord.withRecord(null, offsetAfterPoll) } else { poll(offset, pollTimeoutMs) } @@ -299,7 +313,7 @@ private[kafka010] case class InternalKafkaConsumer( if (!fetchedData.hasNext()) { assert(offset <= offsetAfterPoll, s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") - FetchedRecord(None, offsetAfterPoll) + FetchedRecord(null, offsetAfterPoll) } else { val record = fetchedData.next() nextOffsetInFetchedData = record.offset + 1 @@ -314,7 +328,7 @@ private[kafka010] case class InternalKafkaConsumer( assert(fetchedData.hasPrevious, "fetchedData cannot move back") fetchedData.previous() nextOffsetInFetchedData = record.offset - return FetchedRecord(None, record.offset) + return fetchedRecord.withRecord(null, record.offset) } // This may happen when some records aged out but their offsets already got verified if (failOnDataLoss) { @@ -327,7 +341,7 @@ private[kafka010] case class InternalKafkaConsumer( null } else { reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") - FetchedRecord(Some(record), nextOffsetInFetchedData) + fetchedRecord.withRecord(record, nextOffsetInFetchedData) } } } else if (record.offset < offset) { @@ -336,7 +350,7 @@ private[kafka010] case class InternalKafkaConsumer( throw new IllegalStateException( s"Tried to fetch $offset but the returned record offset was ${record.offset}") } else { - FetchedRecord(Some(record), nextOffsetInFetchedData) + fetchedRecord.withRecord(record, nextOffsetInFetchedData) } } } From f379d47e30643fe92b751aa7aa374815ac66a55c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 21 Aug 2018 16:05:06 -0700 Subject: [PATCH 06/10] one more place --- .../scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index b4f3cbeaa63b..3fa32760dd10 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -313,7 +313,7 @@ private[kafka010] case class InternalKafkaConsumer( if (!fetchedData.hasNext()) { assert(offset <= offsetAfterPoll, s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") - FetchedRecord(null, offsetAfterPoll) + fetchedRecord.withRecord(null, offsetAfterPoll) } else { val record = fetchedData.next() nextOffsetInFetchedData = record.offset + 1 From a06742fd3d19c3ee6d9c957b446bc5017be009bc Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 21 Aug 2018 16:59:26 -0700 Subject: [PATCH 07/10] Add FetchedData --- .../sql/kafka010/KafkaDataConsumer.scala | 106 ++++++++++++------ 1 file changed, 72 insertions(+), 34 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 3fa32760dd10..470a789cc356 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -90,6 +90,53 @@ private[kafka010] case class InternalKafkaConsumer( kafkaParams: ju.Map[String, Object]) extends Logging { import InternalKafkaConsumer._ + /** + * The internal object to store the fetched data from Kafka consumer and the next offset to poll. + * + * @param records the pre-fetched Kafka records. + * @param nextOffsetInFetchedData the next offset in `records`. We use this to verify if we should + * check if the pre-fetched data is still valid. + * @param offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to poll + * when `records` is drained. + */ + private case class FetchedData( + private var records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], + var nextOffsetInFetchedData: Long, + var offsetAfterPoll: Long) { + + def withNewPoll( + records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], + offsetAfterPoll: Long): FetchedData = { + this.records = records + this.nextOffsetInFetchedData = UNKNOWN_OFFSET + this.offsetAfterPoll = offsetAfterPoll + this + } + + /** Whether there are more elements */ + def hasNext: Boolean = records.hasNext + + /** Move `records` forward and return the next record. */ + def next(): ConsumerRecord[Array[Byte], Array[Byte]] = { + val record = records.next() + nextOffsetInFetchedData = record.offset + 1 + record + } + + /** Move `records` backward and return the previous record. */ + def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = { + assert(records.hasPrevious, "fetchedData cannot move back") + val record = records.previous() + nextOffsetInFetchedData = record.offset + record + } + + /** Reset the internal pre-fetched data. */ + def reset(): Unit = { + records = ju.Collections.emptyListIterator() + } + } + /** * The internal object returned by the `fetchData` method. If `record` is empty, it means it is * invisible (either a transaction message, or an aborted message when the consumer's @@ -119,10 +166,14 @@ private[kafka010] case class InternalKafkaConsumer( /** indicate whether this consumer is going to be stopped in the next release */ @volatile var markedForClose = false - /** Iterator to the already fetch data */ - @volatile private var fetchedData = - ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET + /** + * The fetched data returned from Kafka consumer. This is a reusable private object to avoid + * memory allocation. + */ + private val fetchedData = FetchedData( + ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], + UNKNOWN_OFFSET, + UNKNOWN_OFFSET) /** * The fetched record returned from the `fetchData` method. This is a reusable private object to @@ -130,11 +181,6 @@ private[kafka010] case class InternalKafkaConsumer( */ private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET) - /** - * The next available offset returned by Kafka after polling. This is the next offset after - * draining `fetchedData`. - */ - @volatile private var offsetAfterPoll: Long = UNKNOWN_OFFSET /** Create a KafkaConsumer to fetch records for `topicPartition` */ private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { @@ -175,7 +221,8 @@ private[kafka010] case class InternalKafkaConsumer( ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible { require(offset < untilOffset, s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]") - logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") + logDebug(s"Get $groupId $topicPartition nextOffset ${fetchedData.nextOffsetInFetchedData} " + + s"requested $offset") // The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is // `false`, first, we will try to fetch the record at `offset`. If no such record exists, then // we will move to the next available offset within `[offset, untilOffset)` and retry. @@ -194,7 +241,7 @@ private[kafka010] case class InternalKafkaConsumer( } else { toFetchOffset = fetchedRecord.nextOffsetToFetch if (toFetchOffset >= untilOffset) { - resetFetchedData() + fetchedData.reset() toFetchOffset = UNKNOWN_OFFSET } else { logDebug(s"Skipped offsets [$offset, $toFetchOffset]") @@ -214,7 +261,7 @@ private[kafka010] case class InternalKafkaConsumer( if (isFetchComplete) { fetchedRecord.record } else { - resetFetchedData() + fetchedData.reset() null } } @@ -295,28 +342,27 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): FetchedRecord = { - if (offset != nextOffsetInFetchedData) { + if (offset != fetchedData.nextOffsetInFetchedData) { // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. poll(offset, pollTimeoutMs) } else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained. - if (offset < offsetAfterPoll) { + if (offset < fetchedData.offsetAfterPoll) { // Offsets in [offset, offsetAfterPoll) are missing. We should skip them. - resetFetchedData() - return fetchedRecord.withRecord(null, offsetAfterPoll) + fetchedData.reset() + return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { poll(offset, pollTimeoutMs) } } - if (!fetchedData.hasNext()) { - assert(offset <= offsetAfterPoll, - s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") - fetchedRecord.withRecord(null, offsetAfterPoll) + if (!fetchedData.hasNext) { + assert(offset <= fetchedData.offsetAfterPoll, + s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}") + fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { val record = fetchedData.next() - nextOffsetInFetchedData = record.offset + 1 // In general, Kafka uses the specified offset as the start point, and tries to fetch the next // available offset. Hence we need to handle offset mismatch. if (record.offset > offset) { @@ -325,9 +371,7 @@ private[kafka010] case class InternalKafkaConsumer( // `offset` is still valid but the corresponding message is invisible. We should skip it // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of // `fetchData` can just return `record` directly. - assert(fetchedData.hasPrevious, "fetchedData cannot move back") fetchedData.previous() - nextOffsetInFetchedData = record.offset return fetchedRecord.withRecord(null, record.offset) } // This may happen when some records aged out but their offsets already got verified @@ -341,7 +385,7 @@ private[kafka010] case class InternalKafkaConsumer( null } else { reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") - fetchedRecord.withRecord(record, nextOffsetInFetchedData) + fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) } } } else if (record.offset < offset) { @@ -350,7 +394,7 @@ private[kafka010] case class InternalKafkaConsumer( throw new IllegalStateException( s"Tried to fetch $offset but the returned record offset was ${record.offset}") } else { - fetchedRecord.withRecord(record, nextOffsetInFetchedData) + fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) } } } @@ -359,13 +403,7 @@ private[kafka010] case class InternalKafkaConsumer( private def resetConsumer(): Unit = { consumer.close() consumer = createConsumer - resetFetchedData() - } - - /** Reset the internal pre-fetched data. */ - private def resetFetchedData(): Unit = { - nextOffsetInFetchedData = UNKNOWN_OFFSET - fetchedData = ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]] + fetchedData.reset() } /** @@ -414,9 +452,9 @@ private[kafka010] case class InternalKafkaConsumer( val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") - offsetAfterPoll = consumer.position(topicPartition) + val offsetAfterPoll = consumer.position(topicPartition) logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") - fetchedData = r.listIterator + fetchedData.withNewPoll(r.listIterator, offsetAfterPoll) if (!fetchedData.hasNext) { // We cannot fetch anything after `poll`. Two possible cases: // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will From e0d2c4d232bcd6cd6eeb0b5ba6a999b82001b282 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 22 Aug 2018 01:02:43 -0700 Subject: [PATCH 08/10] Address TD's comments --- .../sql/kafka010/KafkaDataConsumer.scala | 89 +++++++++++------- .../kafka010/KafkaContinuousSourceSuite.scala | 26 ++--- .../kafka010/KafkaMicroBatchSourceSuite.scala | 94 ++++++++----------- .../sql/kafka010/KafkaRelationSuite.scala | 12 +-- .../spark/sql/kafka010/KafkaTestUtils.scala | 13 ++- 5 files changed, 118 insertions(+), 116 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index cbe407f4349c..cac83a40599f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -93,52 +93,63 @@ private[kafka010] case class InternalKafkaConsumer( /** * The internal object to store the fetched data from Kafka consumer and the next offset to poll. * - * @param records the pre-fetched Kafka records. - * @param nextOffsetInFetchedData the next offset in `records`. We use this to verify if we should - * check if the pre-fetched data is still valid. - * @param offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to poll - * when `records` is drained. + * @param _records the pre-fetched Kafka records. + * @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we + * should check if the pre-fetched data is still valid. + * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to + * poll when `records` is drained. */ private case class FetchedData( - private var records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], - var nextOffsetInFetchedData: Long, - var offsetAfterPoll: Long) { + private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], + private var _nextOffsetInFetchedData: Long, + private var _offsetAfterPoll: Long) { def withNewPoll( records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], offsetAfterPoll: Long): FetchedData = { - this.records = records - this.nextOffsetInFetchedData = UNKNOWN_OFFSET - this.offsetAfterPoll = offsetAfterPoll + this._records = records + this._nextOffsetInFetchedData = UNKNOWN_OFFSET + this._offsetAfterPoll = offsetAfterPoll this } /** Whether there are more elements */ - def hasNext: Boolean = records.hasNext + def hasNext: Boolean = _records.hasNext /** Move `records` forward and return the next record. */ def next(): ConsumerRecord[Array[Byte], Array[Byte]] = { - val record = records.next() - nextOffsetInFetchedData = record.offset + 1 + val record = _records.next() + _nextOffsetInFetchedData = record.offset + 1 record } /** Move `records` backward and return the previous record. */ def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = { - assert(records.hasPrevious, "fetchedData cannot move back") - val record = records.previous() - nextOffsetInFetchedData = record.offset + assert(_records.hasPrevious, "fetchedData cannot move back") + val record = _records.previous() + _nextOffsetInFetchedData = record.offset record } /** Reset the internal pre-fetched data. */ def reset(): Unit = { - records = ju.Collections.emptyListIterator() + _records = ju.Collections.emptyListIterator() } + + /** + * Returns the next offset in `records`. We use this to verify if we should check if the + * pre-fetched data is still valid. + */ + def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData + + /** + * Returns the next offset to poll after draining the pre-fetched records. + */ + def offsetAfterPoll: Long = _offsetAfterPoll } /** - * The internal object returned by the `fetchData` method. If `record` is empty, it means it is + * The internal object returned by the `fetchRecord` method. If `record` is empty, it means it is * invisible (either a transaction message, or an aborted message when the consumer's * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch * instead. @@ -176,7 +187,7 @@ private[kafka010] case class InternalKafkaConsumer( UNKNOWN_OFFSET) /** - * The fetched record returned from the `fetchData` method. This is a reusable private object to + * The fetched record returned from the `fetchRecord` method. This is a reusable private object to * avoid memory allocation. */ private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET) @@ -235,7 +246,7 @@ private[kafka010] case class InternalKafkaConsumer( while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) { try { - fetchedRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) + fetchedRecord = fetchRecord(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) if (fetchedRecord.record != null) { isFetchComplete = true } else { @@ -337,27 +348,31 @@ private[kafka010] case class InternalKafkaConsumer( * @throws OffsetOutOfRangeException if `offset` is out of range * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. */ - private def fetchData( + private def fetchRecord( offset: Long, untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): FetchedRecord = { if (offset != fetchedData.nextOffsetInFetchedData) { // This is the first fetch, or the fetched data has been reset. - // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - poll(offset, pollTimeoutMs) - } else if (!fetchedData.hasNext) { - // The last pre-fetched data has been drained. + // Fetch records from Kafka and update `fetchedData`. + fetchData(offset, pollTimeoutMs) + } else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained. if (offset < fetchedData.offsetAfterPoll) { - // Offsets in [offset, offsetAfterPoll) are missing. We should skip them. + // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask + // the next call to start from `fetchedData.offsetAfterPoll`. fetchedData.reset() return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { - poll(offset, pollTimeoutMs) + // Fetch records from Kafka and update `fetchedData`. + fetchData(offset, pollTimeoutMs) } } if (!fetchedData.hasNext) { + // When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still + // empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a + // record to ask the next call to start from `fetchedData.offsetAfterPoll`. assert(offset <= fetchedData.offsetAfterPoll, s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}") fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) @@ -370,7 +385,7 @@ private[kafka010] case class InternalKafkaConsumer( if (range.earliest <= offset) { // `offset` is still valid but the corresponding message is invisible. We should skip it // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of - // `fetchData` can just return `record` directly. + // `fetchRecord` can just return `record` directly. fetchedData.previous() return fetchedRecord.withRecord(null, record.offset) } @@ -378,11 +393,13 @@ private[kafka010] case class InternalKafkaConsumer( if (failOnDataLoss) { reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})") // Never happen as "reportDataLoss" will throw an exception - null + throw new IllegalStateException( + "reportDataLoss didn't throw an exception when 'failOnDataLoss' is true") } else { if (record.offset >= untilOffset) { reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)") - null + // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch. + fetchedRecord.withRecord(null, untilOffset) } else { reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) @@ -438,16 +455,16 @@ private[kafka010] case class InternalKafkaConsumer( } /** - * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`. - * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible - * messages (either transaction messages, or aborted messages when `isolation.level` is - * `read_committed`). + * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be + * empty if the Kafka consumer fetches some messages but all of them are not visible messages + * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`). * * @throws OffsetOutOfRangeException if `offset` is out of range. * @throws TimeoutException if the consumer position is not changed after polling. It means the * consumer polls nothing before timeout. */ - private def poll(offset: Long, pollTimeoutMs: Long): Unit = { + private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = { + // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 0e7c2b2f98bf..f97809b51208 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -33,13 +33,13 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo withTable(table) { val topic = newTopic() testUtils.createTopic(topic) - val producer = testUtils.createProducer(usingTrascation = true) - try { + testUtils.withTranscationalProducer { producer => val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.isolation.level", "read_committed") + .option("startingOffsets", "earliest") .option("subscribe", topic) .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") @@ -53,7 +53,6 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo .trigger(ContinuousTrigger(100)) .start() try { - producer.initTransactions() producer.beginTransaction() (1 to 5).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() @@ -91,8 +90,6 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo } finally { q.stop() } - } finally { - producer.close() } } } @@ -102,13 +99,13 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo withTable(table) { val topic = newTopic() testUtils.createTopic(topic) - val producer = testUtils.createProducer(usingTrascation = true) - try { + testUtils.withTranscationalProducer { producer => val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.isolation.level", "read_uncommitted") + .option("startingOffsets", "earliest") .option("subscribe", topic) .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") @@ -122,14 +119,13 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo .trigger(ContinuousTrigger(100)) .start() try { - producer.initTransactions() producer.beginTransaction() (1 to 5).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() } eventually(timeout(streamingTimeout)) { - // Should read all committed messages + // Should read uncommitted messages checkAnswer(spark.table(table), (1 to 5).toDF) } @@ -147,7 +143,7 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo producer.abortTransaction() eventually(timeout(streamingTimeout)) { - // Should not read aborted messages + // Should read aborted messages checkAnswer(spark.table(table), (1 to 10).toDF) } @@ -155,17 +151,21 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo (11 to 15).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() } + + eventually(timeout(streamingTimeout)) { + // Should read all messages including committed, aborted and uncommitted messages + checkAnswer(spark.table(table), (1 to 15).toDF) + } + producer.commitTransaction() eventually(timeout(streamingTimeout)) { - // Should skip aborted messages and read new committed ones. + // Should read all messages including committed and aborted messages checkAnswer(spark.table(table), (1 to 15).toDF) } } finally { q.stop() } - } finally { - producer.close() } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 20aa75db9392..c338061e578f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -166,15 +166,14 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf topic: String, producer: KafkaProducer[String, String])( func: KafkaProducer[String, String] => Unit): AssertOnQuery = { - AssertOnQuery(_ => { + Execute("Run Kafka Producer")(_ => { func(producer) // This is a hack for the race condition that the committed message may be not visible to // consumer for a short time. // Looks like after the following call returns, the consumer can always read the committed // messages. testUtils.getLatestOffsets(Set(topic)) - true - }, "Run Kafka Producer") + }) } } @@ -635,7 +634,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .option("maxOffsetsPerTrigger", 3) .option("subscribe", topic) .option("startingOffsets", "earliest") - // Set a short timeout to make the test fast. When a batch contains no committed date + // Set a short timeout to make the test fast. When a batch doesn't contain any visible data // messages, "poll" will wait until timeout. .option("kafkaConsumer.pollTimeoutMs", 5000) val kafka = reader.load() @@ -657,35 +656,32 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { true } - val producer = testUtils.createProducer(usingTrascation = true) - try { - producer.initTransactions() - + // The message values are the same as their offsets to make the test easy to follow + testUtils.withTranscationalProducer { producer => testStream(mapped)( StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, - // 1 from smallest, 1 from middle, 8 from biggest CheckAnswer(), WithKafkaProducer(topic, producer) { producer => // Send 5 messages. They should be visible only after being committed. producer.beginTransaction() - (1 to 5).foreach { i => + (0 to 4).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() } }, AdvanceManualClock(100), waitUntilBatchProcessed, // Should not see any uncommitted messages - CheckAnswer(), + CheckNewAnswer(), WithKafkaProducer(topic, producer) { producer => producer.commitTransaction() }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 3: _*), // offset 0, 1, 2 + CheckNewAnswer(0, 1, 2), // offset 0, 1, 2 AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] + CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message] WithKafkaProducer(topic, producer) { producer => // Send 5 messages and abort the transaction. They should not be read. producer.beginTransaction() @@ -696,49 +692,47 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8* + CheckNewAnswer(), // offset: 6*, 7*, 8* AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11* + CheckNewAnswer(), // offset: 9*, 10*, 11* WithKafkaProducer(topic, producer) { producer => // Send 5 messages again. The consumer should skip the above aborted messages and read // them. producer.beginTransaction() - (11 to 15).foreach { i => + (12 to 16).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() } producer.commitTransaction() }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14 + CheckNewAnswer(12, 13, 14), // offset: 12, 13, 14 AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17* + CheckNewAnswer(15, 16), // offset: 15, 16, 17* WithKafkaProducer(topic, producer) { producer => producer.beginTransaction() - producer.send(new ProducerRecord[String, String](topic, "16")).get() + producer.send(new ProducerRecord[String, String](topic, "18")).get() producer.commitTransaction() producer.beginTransaction() - producer.send(new ProducerRecord[String, String](topic, "17")).get() + producer.send(new ProducerRecord[String, String](topic, "20")).get() producer.commitTransaction() producer.beginTransaction() - producer.send(new ProducerRecord[String, String](topic, "18")).get() - producer.send(new ProducerRecord[String, String](topic, "19")).get() + producer.send(new ProducerRecord[String, String](topic, "22")).get() + producer.send(new ProducerRecord[String, String](topic, "23")).get() producer.commitTransaction() }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20 + CheckNewAnswer(18, 20), // offset: 18, 19*, 20 AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23 + CheckNewAnswer(22, 23), // offset: 21*, 22, 23 AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24* + CheckNewAnswer() // offset: 24* ) - } finally { - producer.close() } } @@ -761,7 +755,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .option("maxOffsetsPerTrigger", 3) .option("subscribe", topic) .option("startingOffsets", "earliest") - // Set a short timeout to make the test fast. When a batch contains no committed date + // Set a short timeout to make the test fast. When a batch doesn't contain any visible data // messages, "poll" will wait until timeout. .option("kafkaConsumer.pollTimeoutMs", 5000) val kafka = reader.load() @@ -771,7 +765,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { val clock = new StreamManualClock - val waitUntilBatchProcessed = AssertOnQuery { q => + val waitUntilBatchProcessed = Execute { q => eventually(Timeout(streamingTimeout)) { if (!q.exception.isDefined) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) @@ -780,34 +774,30 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { if (q.exception.isDefined) { throw q.exception.get } - true } - val producer = testUtils.createProducer(usingTrascation = true) - try { - producer.initTransactions() - + // The message values are the same as their offsets to make the test easy to follow + testUtils.withTranscationalProducer { producer => testStream(mapped)( StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, - // 1 from smallest, 1 from middle, 8 from biggest - CheckAnswer(), + CheckNewAnswer(), WithKafkaProducer(topic, producer) { producer => // Send 5 messages. They should be visible only after being committed. producer.beginTransaction() - (1 to 5).foreach { i => + (0 to 4).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() } }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 3: _*), // offset 0, 1, 2 + CheckNewAnswer(0, 1, 2), // offset 0, 1, 2 WithKafkaProducer(topic, producer) { producer => producer.commitTransaction() }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] + CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message] WithKafkaProducer(topic, producer) { producer => // Send 5 messages and abort the transaction. They should not be read. producer.beginTransaction() @@ -818,49 +808,47 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 8: _*), // offset: 6, 7, 8 + CheckNewAnswer(6, 7, 8), // offset: 6, 7, 8 AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 10: _*), // offset: 9, 10, 11* + CheckNewAnswer(9, 10), // offset: 9, 10, 11* WithKafkaProducer(topic, producer) { producer => // Send 5 messages again. The consumer should skip the above aborted messages and read // them. producer.beginTransaction() - (11 to 15).foreach { i => + (12 to 16).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() } producer.commitTransaction() }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 13: _*), // offset: 12, 13, 14 + CheckNewAnswer(12, 13, 14), // offset: 12, 13, 14 AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 15: _*), // offset: 15, 16, 17* + CheckNewAnswer(15, 16), // offset: 15, 16, 17* WithKafkaProducer(topic, producer) { producer => producer.beginTransaction() - producer.send(new ProducerRecord[String, String](topic, "16")).get() + producer.send(new ProducerRecord[String, String](topic, "18")).get() producer.commitTransaction() producer.beginTransaction() - producer.send(new ProducerRecord[String, String](topic, "17")).get() + producer.send(new ProducerRecord[String, String](topic, "20")).get() producer.commitTransaction() producer.beginTransaction() - producer.send(new ProducerRecord[String, String](topic, "18")).get() - producer.send(new ProducerRecord[String, String](topic, "19")).get() + producer.send(new ProducerRecord[String, String](topic, "22")).get() + producer.send(new ProducerRecord[String, String](topic, "23")).get() producer.commitTransaction() }, AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 17: _*), // offset: 18, 19*, 20 + CheckNewAnswer(18, 20), // offset: 18, 19*, 20 AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 19: _*), // offset: 21*, 22, 23 + CheckNewAnswer(22, 23), // offset: 21*, 22, 23 AdvanceManualClock(100), waitUntilBatchProcessed, - CheckAnswer(1 to 19: _*) // offset: 24* + CheckNewAnswer() // offset: 24* ) - } finally { - producer.close() } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 945f26179d29..93dba1844628 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -239,8 +239,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest test("read Kafka transactional messages: read_committed") { val topic = newTopic() testUtils.createTopic(topic) - val producer = testUtils.createProducer(usingTrascation = true) - try { + testUtils.withTranscationalProducer { producer => val df = spark .read .format("kafka") @@ -250,7 +249,6 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest .load() .selectExpr("CAST(value AS STRING)") - producer.initTransactions() producer.beginTransaction() (1 to 5).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() @@ -281,16 +279,13 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest // Should skip aborted messages and read new committed ones. checkAnswer(df, ((1 to 5) ++ (11 to 15)).map(_.toString).toDF) - } finally { - producer.close() } } test("read Kafka transactional messages: read_uncommitted") { val topic = newTopic() testUtils.createTopic(topic) - val producer = testUtils.createProducer(usingTrascation = true) - try { + testUtils.withTranscationalProducer { producer => val df = spark .read .format("kafka") @@ -300,7 +295,6 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest .load() .selectExpr("CAST(value AS STRING)") - producer.initTransactions() producer.beginTransaction() (1 to 5).foreach { i => producer.send(new ProducerRecord[String, String](topic, i.toString)).get() @@ -331,8 +325,6 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest // Should read all messages checkAnswer(df, (1 to 15).map(_.toString).toDF) - } finally { - producer.close() } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 7f4f4922462a..7b742a3ea674 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -347,12 +347,17 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props } - def createProducer(usingTrascation: Boolean): KafkaProducer[String, String] = { + /** Call `f` with a `KafkaProducer` that has initialized transactions. */ + def withTranscationalProducer(f: KafkaProducer[String, String] => Unit): Unit = { val props = producerConfiguration - if (usingTrascation) { - props.put("transactional.id", UUID.randomUUID().toString) + props.put("transactional.id", UUID.randomUUID().toString) + val producer = new KafkaProducer[String, String](props) + try { + producer.initTransactions() + f(producer) + } finally { + producer.close() } - new KafkaProducer[String, String](props) } private def consumerConfiguration: Properties = { From 603e0bc9cc822ec3151159a88a521ac063932f11 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 22 Aug 2018 10:30:19 -0700 Subject: [PATCH 09/10] minor --- .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index c338061e578f..40c4a48b61af 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -644,7 +644,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { val clock = new StreamManualClock - val waitUntilBatchProcessed = AssertOnQuery { q => + // Wait until the manual clock is waiting on further instructions to move forward. Then we can + // ensure all batches we are waiting for have been processed. + val waitUntilBatchProcessed = Execute { q => eventually(Timeout(streamingTimeout)) { if (!q.exception.isDefined) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) @@ -653,7 +655,6 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { if (q.exception.isDefined) { throw q.exception.get } - true } // The message values are the same as their offsets to make the test easy to follow @@ -765,6 +766,8 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { val clock = new StreamManualClock + // Wait until the manual clock is waiting on further instructions to move forward. Then we can + // ensure all batches we are waiting for have been processed. val waitUntilBatchProcessed = Execute { q => eventually(Timeout(streamingTimeout)) { if (!q.exception.isDefined) { From 7a02921950cda865e3cd45f1d1635212c2f707c0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 24 Aug 2018 15:18:07 -0700 Subject: [PATCH 10/10] address --- .../sql/kafka010/KafkaDataConsumer.scala | 14 ++++----- .../kafka010/KafkaMicroBatchReadSupport.scala | 1 - .../kafka010/KafkaMicroBatchSourceSuite.scala | 29 +++++++++---------- 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index cac83a40599f..ceb9e318b283 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -395,15 +395,13 @@ private[kafka010] case class InternalKafkaConsumer( // Never happen as "reportDataLoss" will throw an exception throw new IllegalStateException( "reportDataLoss didn't throw an exception when 'failOnDataLoss' is true") + } else if (record.offset >= untilOffset) { + reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)") + // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch. + fetchedRecord.withRecord(null, untilOffset) } else { - if (record.offset >= untilOffset) { - reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)") - // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch. - fetchedRecord.withRecord(null, untilOffset) - } else { - reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") - fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) - } + reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") + fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) } } else if (record.offset < offset) { // This should not happen. If it does happen, then we probably misunderstand Kafka internal diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index b00af2c1d973..70f37e32e78d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -331,7 +331,6 @@ private[kafka010] case class KafkaMicroBatchPartitionReader( offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) private val rangeToRead = resolveRange(offsetRange) - private val converter = new KafkaRecordToUnsafeRowConverter private var nextOffset = rangeToRead.fromOffset diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 41297f4e0bbf..eb66ccac744a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -159,13 +159,10 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf s"AddKafkaData(topics = $topics, data = $data, message = $message)" } - object WithKafkaProducer { - def apply( - topic: String, - producer: KafkaProducer[String, String])( - func: KafkaProducer[String, String] => Unit): AssertOnQuery = { + object WithOffsetSync { + def apply(topic: String)(func: () => Unit): StreamAction = { Execute("Run Kafka Producer")(_ => { - func(producer) + func() // This is a hack for the race condition that the committed message may be not visible to // consumer for a short time. // Looks like after the following call returns, the consumer can always read the committed @@ -661,7 +658,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, CheckAnswer(), - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => // Send 5 messages. They should be visible only after being committed. producer.beginTransaction() (0 to 4).foreach { i => @@ -672,7 +669,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { waitUntilBatchProcessed, // Should not see any uncommitted messages CheckNewAnswer(), - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => producer.commitTransaction() }, AdvanceManualClock(100), @@ -681,7 +678,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message] - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => // Send 5 messages and abort the transaction. They should not be read. producer.beginTransaction() (6 to 10).foreach { i => @@ -695,7 +692,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(), // offset: 9*, 10*, 11* - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => // Send 5 messages again. The consumer should skip the above aborted messages and read // them. producer.beginTransaction() @@ -710,7 +707,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(15, 16), // offset: 15, 16, 17* - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => producer.beginTransaction() producer.send(new ProducerRecord[String, String](topic, "18")).get() producer.commitTransaction() @@ -783,7 +780,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, CheckNewAnswer(), - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => // Send 5 messages. They should be visible only after being committed. producer.beginTransaction() (0 to 4).foreach { i => @@ -793,13 +790,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(0, 1, 2), // offset 0, 1, 2 - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => producer.commitTransaction() }, AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message] - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => // Send 5 messages and abort the transaction. They should not be read. producer.beginTransaction() (6 to 10).foreach { i => @@ -813,7 +810,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(9, 10), // offset: 9, 10, 11* - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => // Send 5 messages again. The consumer should skip the above aborted messages and read // them. producer.beginTransaction() @@ -828,7 +825,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(15, 16), // offset: 15, 16, 17* - WithKafkaProducer(topic, producer) { producer => + WithOffsetSync(topic) { () => producer.beginTransaction() producer.send(new ProducerRecord[String, String](topic, "18")).get() producer.commitTransaction()