From 93c7bd93f5dbec41a0fd4d6b5ef0bfe0bfdc235c Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 27 Aug 2018 10:03:17 -0700 Subject: [PATCH] fix flake --- .../sql/kafka010/KafkaContinuousSourceSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 321665042b8eb..5d68a14326c00 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.kafka010 import org.apache.spark.sql.Dataset -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec import org.apache.spark.sql.streaming.Trigger // Run tests in KafkaSourceSuiteBase in continuous execution mode. @@ -60,10 +60,10 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { testUtils.createTopic(topic2, partitions = 5) eventually(timeout(streamingTimeout)) { assert( - query.lastExecution.logical.collectFirst { - case r: StreamingDataSourceV2Relation - if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig] + query.lastExecution.executedPlan.collectFirst { + case scan: DataSourceV2ScanExec + if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => + scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] }.exists { config => // Ensure the new topic is present and the old topic is gone. config.knownPartitions.exists(_.topic == topic2)