diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 321665042b8eb..5d68a14326c00 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.kafka010 import org.apache.spark.sql.Dataset -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec import org.apache.spark.sql.streaming.Trigger // Run tests in KafkaSourceSuiteBase in continuous execution mode. @@ -60,10 +60,10 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { testUtils.createTopic(topic2, partitions = 5) eventually(timeout(streamingTimeout)) { assert( - query.lastExecution.logical.collectFirst { - case r: StreamingDataSourceV2Relation - if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => - r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig] + query.lastExecution.executedPlan.collectFirst { + case scan: DataSourceV2ScanExec + if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => + scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] }.exists { config => // Ensure the new topic is present and the old topic is gone. config.knownPartitions.exists(_.topic == topic2)