From a5171e68c0d4feb020f4fa49a24e4d54a07547db Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 12 Feb 2018 21:12:22 -0800 Subject: [PATCH 1/8] improve data source v2 explain --- .../kafka010/KafkaContinuousSourceSuite.scala | 2 +- .../sql/kafka010/KafkaContinuousTest.scala | 2 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- .../v2/DataSourceReaderHolder.scala | 64 ------------- .../v2/DataSourceV2QueryPlan.scala | 96 +++++++++++++++++++ .../datasources/v2/DataSourceV2Relation.scala | 24 +++-- .../datasources/v2/DataSourceV2ScanExec.scala | 6 +- .../datasources/v2/DataSourceV2Strategy.scala | 8 +- .../v2/PushDownOperatorsToDataSource.scala | 2 +- .../streaming/MicroBatchExecution.scala | 47 +++++---- .../continuous/ContinuousExecution.scala | 8 +- .../spark/sql/streaming/StreamSuite.scala | 8 +- .../spark/sql/streaming/StreamTest.scala | 2 +- .../continuous/ContinuousSuite.scala | 11 +-- 14 files changed, 164 insertions(+), 118 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index f679e9bfc045..5785fdaaa7de 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -60,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.logical.collectFirst { - case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r + case StreamingDataSourceV2Relation(_, _, r: KafkaContinuousReader) => r }.exists { r => // Ensure the new topic is present and the old topic is gone. r.knownPartitions.exists(_.topic == topic2) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index 48ac3fc1e8f9..fa9c4a882a21 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.logical.collectFirst { - case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r + case StreamingDataSourceV2Relation(_, _, r: KafkaContinuousReader) => r }.exists(_.knownPartitions.size == newCount), s"query never reconfigured to $newCount partitions") } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 89c9ef4cc73b..66caa8c4182e 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -119,7 +119,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { } ++ (query.get.lastExecution match { case null => Seq() case e => e.logical.collect { - case StreamingDataSourceV2Relation(_, reader: KafkaContinuousReader) => reader + case StreamingDataSourceV2Relation(_, _, reader: KafkaContinuousReader) => reader } }) }.distinct diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala deleted file mode 100644 index 81219e9771bd..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.v2 - -import java.util.Objects - -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.sources.v2.reader._ - -/** - * A base class for data source reader holder with customized equals/hashCode methods. - */ -trait DataSourceReaderHolder { - - /** - * The output of the data source reader, w.r.t. column pruning. - */ - def output: Seq[Attribute] - - /** - * The held data source reader. - */ - def reader: DataSourceReader - - /** - * The metadata of this data source reader that can be used for equality test. - */ - private def metadata: Seq[Any] = { - val filters: Any = reader match { - case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet - case s: SupportsPushDownFilters => s.pushedFilters().toSet - case _ => Nil - } - Seq(output, reader.getClass, filters) - } - - def canEqual(other: Any): Boolean - - override def equals(other: Any): Boolean = other match { - case other: DataSourceReaderHolder => - canEqual(other) && metadata.length == other.metadata.length && - metadata.zip(other.metadata).forall { case (l, r) => l == r } - case _ => false - } - - override def hashCode(): Int = { - metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala new file mode 100644 index 000000000000..a68cf0ae6aa0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util.Objects + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.util.Utils + +/** + * A base class for data source v2 related query plan(both logical and physical). It defines the + * equals/hashCode methods, and provides a string representation of the query plan, according to + * some common information. + */ +trait DataSourceV2QueryPlan { + + /** + * The output of the data source reader, w.r.t. column pruning. + */ + def output: Seq[Attribute] + + /** + * The instance of this data source implementation. Note that we only consider its class in + * equals/hashCode, not the instance itself. + */ + def source: DataSourceV2 + + /** + * The created data source reader. Here we use it to get the filters that has been pushed down + * so far, itself doesn't take part in the equals/hashCode. + */ + def reader: DataSourceReader + + private lazy val filters = reader match { + case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet + case s: SupportsPushDownFilters => s.pushedFilters().toSet + case _ => Set.empty + } + + /** + * The metadata of this data source query plan that can be used for equality check. + */ + private def metadata: Seq[Any] = Seq(output, source.getClass, filters) + + def canEqual(other: Any): Boolean + + override def equals(other: Any): Boolean = other match { + case other: DataSourceV2QueryPlan => canEqual(other) && metadata == other.metadata + case _ => false + } + + override def hashCode(): Int = { + metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b) + } + + def metadataString: String = { + val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)] + if (filters.nonEmpty) entries += "Pushed Filters" -> filters.mkString("[", ", ", "]") + + val outputStr = Utils.truncatedString(output, "[", ", ", "]") + + val entriesStr = if (entries.nonEmpty) { + Utils.truncatedString(entries.map { + case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100) + }, " (", ", ", ")") + } else { + "" + } + + s"${source.getClass.getSimpleName.stripSuffix("$")}$outputStr$entriesStr" + } + + private def redact(text: String): String = { + Utils.redact(SQLConf.get.stringRedationPattern, text) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index a98dd4866f82..88708efecf10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -35,15 +35,14 @@ case class DataSourceV2Relation( options: Map[String, String], projection: Seq[AttributeReference], filters: Option[Seq[Expression]] = None, - userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + userSpecifiedSchema: Option[StructType] = None) + extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan { import DataSourceV2Relation._ - override def simpleString: String = { - s"DataSourceV2Relation(source=${source.name}, " + - s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + - s"filters=[${pushedFilters.mkString(", ")}], options=$options)" - } + override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] + + override def simpleString: String = "Relation " + metadataString override lazy val schema: StructType = reader.readSchema() @@ -107,17 +106,24 @@ case class DataSourceV2Relation( } /** - * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical - * to the non-streaming relation. + * A specialization of [[DataSourceV2Relation]] with the streaming bit set to true. + * + * Note that, this plan has a mutable reader, so Spark won't apply operator push-down for this plan, + * to avoid making the plan mutable. We should consolidate this plan and [[DataSourceV2Relation]] + * after we figure out how to apply operator push-down for streaming data sources. */ case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], + source: DataSourceV2, reader: DataSourceReader) - extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { + extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan { + override def isStreaming: Boolean = true override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] + override def simpleString: String = "Streaming Relation " + metadataString + override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) override def computeStats(): Statistics = reader match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 7d9581be4db8..be82a5918590 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader import org.apache.spark.sql.types.StructType @@ -36,11 +37,14 @@ import org.apache.spark.sql.types.StructType */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], + @transient source: DataSourceV2, @transient reader: DataSourceReader) - extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan { + extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec] + override def simpleString: String = "Scan " + metadataString + override def outputPartitioning: physical.Partitioning = reader match { case s: SupportsReportPartitioning => new DataSourcePartitioning( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index c4e7644683c3..d578f18c5163 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case relation: DataSourceV2Relation => - DataSourceV2ScanExec(relation.output, relation.reader) :: Nil + case r: DataSourceV2Relation => + DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil - case relation: StreamingDataSourceV2Relation => - DataSourceV2ScanExec(relation.output, relation.reader) :: Nil + case r: StreamingDataSourceV2Relation => + DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala index f23d22856724..96eff7193004 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala @@ -26,7 +26,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] { override def apply( plan: LogicalPlan): LogicalPlan = plan transformUp { // PhysicalOperation guarantees that filters are deterministic; no need to check - case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) => + case PhysicalOperation(project, newFilters, relation: DataSourceV2Relation) => // merge the filters val filters = relation.filters match { case Some(existing) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 84655013ba95..650cd209bdc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -20,16 +20,16 @@ package org.apache.spark.sql.execution.streaming import java.util.Optional import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} +import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter} -import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport, StreamWriteSupport} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} @@ -52,6 +52,8 @@ class MicroBatchExecution( @volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty + private val readerToDataSourceMap = MutableMap.empty[MicroBatchReader, DataSourceV2] + private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) case OneTimeTrigger => OneTimeExecutor() @@ -77,31 +79,32 @@ class MicroBatchExecution( sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",") val _logicalPlan = analyzedPlan.transform { - case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) => - toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + case s @ StreamingRelation(dsV1, sourceName, output) => + toExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - val source = dataSourceV1.createSource(metadataPath) + val source = dsV1.createSource(metadataPath) nextSourceId += 1 - logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]") + logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dsV1]") StreamingExecutionRelation(source, output)(sparkSession) }) - case s @ StreamingRelationV2( - dataSourceV2: MicroBatchReadSupport, sourceName, options, output, _) if - !disabledSources.contains(dataSourceV2.getClass.getCanonicalName) => + + case s @ StreamingRelationV2(dsV2: MicroBatchReadSupport, sourceName, options, output, _) + if !disabledSources.contains(dsV2.getClass.getCanonicalName) => v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - val reader = dataSourceV2.createMicroBatchReader( + val reader = dsV2.createMicroBatchReader( Optional.empty(), // user specified schema metadataPath, new DataSourceOptions(options.asJava)) nextSourceId += 1 - logInfo(s"Using MicroBatchReader [$reader] from " + - s"DataSourceV2 named '$sourceName' [$dataSourceV2]") + readerToDataSourceMap(reader) = dsV2 + logInfo(s"Using MicroBatchReader [$reader] from DataSourceV2 named '$sourceName' [$dsV2]") StreamingExecutionRelation(reader, output)(sparkSession) }) - case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output, v1Relation) => + + case s @ StreamingRelationV2(dsV2, sourceName, _, output, v1Relation) => v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" @@ -111,7 +114,7 @@ class MicroBatchExecution( } val source = v1Relation.get.dataSource.createSource(metadataPath) nextSourceId += 1 - logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dataSourceV2]") + logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dsV2]") StreamingExecutionRelation(source, output)(sparkSession) }) } @@ -415,12 +418,14 @@ class MicroBatchExecution( case v1: SerializedOffset => reader.deserializeOffset(v1.json) case v2: OffsetV2 => v2 } - reader.setOffsetRange( - toJava(current), - Optional.of(availableV2)) + reader.setOffsetRange(toJava(current), Optional.of(availableV2)) logDebug(s"Retrieving data from $reader: $current -> $availableV2") - Some(reader -> - new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader)) + Some(reader -> StreamingDataSourceV2Relation( + output = reader.readSchema().toAttributes, + // Provide a fake value here just in case something went wrong, e.g. the reader gives + // a wrong `equals` implementation. + source = readerToDataSourceMap.getOrElse(reader, FakeDataSourceV2), + reader = reader)) case _ => None } } @@ -508,3 +513,5 @@ class MicroBatchExecution( Optional.ofNullable(scalaOption.orNull) } } + +object FakeDataSourceV2 extends DataSourceV2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 2c1d6c509d21..1fe2c8f70c98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset} @@ -167,7 +167,7 @@ class ContinuousExecution( var insertedSourceId = 0 val withNewSources = logicalPlan transform { - case ContinuousExecutionRelation(_, _, output) => + case ContinuousExecutionRelation(source, _, output) => val reader = continuousSources(insertedSourceId) insertedSourceId += 1 val newOutput = reader.readSchema().toAttributes @@ -180,7 +180,7 @@ class ContinuousExecution( val loggedOffset = offsets.offsets(0) val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json)) reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull)) - new StreamingDataSourceV2Relation(newOutput, reader) + StreamingDataSourceV2Relation(newOutput, source, reader) } // Rewire the plan to use the new attributes that were returned by the source. @@ -201,7 +201,7 @@ class ContinuousExecution( val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan) val reader = withSink.collect { - case StreamingDataSourceV2Relation(_, r: ContinuousReader) => r + case StreamingDataSourceV2Relation(_, _, r: ContinuousReader) => r }.head reportTimeTaken("queryPlanning") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index d1a04833390f..70eb9f0ac66d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -492,16 +492,16 @@ class StreamSuite extends StreamTest { val explainWithoutExtended = q.explainInternal(false) // `extended = false` only displays the physical plan. - assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithoutExtended).size === 0) - assert("DataSourceV2Scan".r.findAllMatchIn(explainWithoutExtended).size === 1) + assert("Streaming Relation".r.findAllMatchIn(explainWithoutExtended).size === 0) + assert("Scan FakeDataSourceV2".r.findAllMatchIn(explainWithoutExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithoutExtended.contains("StateStoreRestore")) val explainWithExtended = q.explainInternal(true) // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical // plan. - assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithExtended).size === 3) - assert("DataSourceV2Scan".r.findAllMatchIn(explainWithExtended).size === 1) + assert("Streaming Relation".r.findAllMatchIn(explainWithExtended).size === 3) + assert("Scan FakeDataSourceV2".r.findAllMatchIn(explainWithExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithExtended.contains("StateStoreRestore")) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 159dd0ecb590..2aaf8a812a82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -605,7 +605,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be plan .collect { case StreamingExecutionRelation(s, _) => s - case StreamingDataSourceV2Relation(_, r) => r + case StreamingDataSourceV2Relation(_, _, r) => r } .zipWithIndex .find(_._1 == source) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 4b4ed82dc652..9ee9aaf87f87 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -17,15 +17,12 @@ package org.apache.spark.sql.streaming.continuous -import java.util.UUID - -import org.apache.spark.{SparkContext, SparkEnv, SparkException} -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart} +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} import org.apache.spark.sql._ -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.test.TestSparkSession @@ -43,7 +40,7 @@ class ContinuousSuiteBase extends StreamTest { case s: ContinuousExecution => assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized") val reader = s.lastExecution.executedPlan.collectFirst { - case DataSourceV2ScanExec(_, r: RateStreamContinuousReader) => r + case DataSourceV2ScanExec(_, _, r: RateStreamContinuousReader) => r }.get val deltaMs = numTriggers * 1000 + 300 From dbee2813accd4c8f5937b28eb9142cc6a50f8c6a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 23 Feb 2018 12:31:36 +0800 Subject: [PATCH 2/8] address comments --- .../kafka010/KafkaContinuousSourceSuite.scala | 2 +- .../sql/kafka010/KafkaContinuousTest.scala | 2 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- .../v2/DataSourceV2QueryPlan.scala | 33 +++++++++++++++---- .../datasources/v2/DataSourceV2Relation.scala | 5 +-- .../datasources/v2/DataSourceV2ScanExec.scala | 3 +- .../datasources/v2/DataSourceV2Strategy.scala | 4 +-- .../streaming/MicroBatchExecution.scala | 22 +++++++++---- .../continuous/ContinuousExecution.scala | 6 ++-- .../spark/sql/streaming/StreamSuite.scala | 12 ++++--- .../spark/sql/streaming/StreamTest.scala | 4 +-- .../continuous/ContinuousSuite.scala | 2 +- 12 files changed, 67 insertions(+), 30 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 5785fdaaa7de..aab8ec42189f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -60,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.logical.collectFirst { - case StreamingDataSourceV2Relation(_, _, r: KafkaContinuousReader) => r + case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r }.exists { r => // Ensure the new topic is present and the old topic is gone. r.knownPartitions.exists(_.topic == topic2) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index fa9c4a882a21..fa1468a3943c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.logical.collectFirst { - case StreamingDataSourceV2Relation(_, _, r: KafkaContinuousReader) => r + case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r }.exists(_.knownPartitions.size == newCount), s"query never reconfigured to $newCount partitions") } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 66caa8c4182e..4c9e1d9e2fd0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -119,7 +119,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { } ++ (query.get.lastExecution match { case null => Seq() case e => e.logical.collect { - case StreamingDataSourceV2Relation(_, _, reader: KafkaContinuousReader) => reader + case StreamingDataSourceV2Relation(_, _, _, reader: KafkaContinuousReader) => reader } }) }.distinct diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala index a68cf0ae6aa0..85d0dc9ae453 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala @@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.util.Utils @@ -34,16 +35,21 @@ import org.apache.spark.util.Utils */ trait DataSourceV2QueryPlan { + /** + * The instance of this data source implementation. Note that we only consider its class in + * equals/hashCode, not the instance itself. + */ + def source: DataSourceV2 + /** * The output of the data source reader, w.r.t. column pruning. */ def output: Seq[Attribute] /** - * The instance of this data source implementation. Note that we only consider its class in - * equals/hashCode, not the instance itself. + * The options for this data source reader. */ - def source: DataSourceV2 + def options: Map[String, String] /** * The created data source reader. Here we use it to get the filters that has been pushed down @@ -60,7 +66,7 @@ trait DataSourceV2QueryPlan { /** * The metadata of this data source query plan that can be used for equality check. */ - private def metadata: Seq[Any] = Seq(output, source.getClass, filters) + private def metadata: Seq[Any] = Seq(source.getClass, output, options, filters) def canEqual(other: Any): Boolean @@ -73,9 +79,24 @@ trait DataSourceV2QueryPlan { metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b) } + private def sourceName: String = source match { + case registered: DataSourceRegister => registered.shortName() + case _ => source.getClass.getSimpleName.stripSuffix("$") + } + def metadataString: String = { val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)] - if (filters.nonEmpty) entries += "Pushed Filters" -> filters.mkString("[", ", ", "]") + + if (filters.nonEmpty) { + entries += "Pushed Filters" -> filters.mkString("[", ", ", "]") + } + + // TODO: we should only display some standard options like path, table, etc. + if (options.nonEmpty) { + entries += "Options" -> options.map { + case (k, v) => s"$k=$v" + }.mkString("[", ",", "]") + } val outputStr = Utils.truncatedString(output, "[", ", ", "]") @@ -87,7 +108,7 @@ trait DataSourceV2QueryPlan { "" } - s"${source.getClass.getSimpleName.stripSuffix("$")}$outputStr$entriesStr" + s"$sourceName$outputStr$entriesStr" } private def redact(text: String): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 88708efecf10..9070c288f1bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -42,7 +42,7 @@ case class DataSourceV2Relation( override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] - override def simpleString: String = "Relation " + metadataString + override def simpleString: String = "V2Relation " + metadataString override lazy val schema: StructType = reader.readSchema() @@ -115,6 +115,7 @@ case class DataSourceV2Relation( case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], source: DataSourceV2, + options: Map[String, String], reader: DataSourceReader) extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan { @@ -122,7 +123,7 @@ case class StreamingDataSourceV2Relation( override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] - override def simpleString: String = "Streaming Relation " + metadataString + override def simpleString: String = "Streaming V2Relation " + metadataString override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index be82a5918590..85f756e6a711 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -38,12 +38,13 @@ import org.apache.spark.sql.types.StructType case class DataSourceV2ScanExec( output: Seq[AttributeReference], @transient source: DataSourceV2, + @transient options: Map[String, String], @transient reader: DataSourceReader) extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec] - override def simpleString: String = "Scan " + metadataString + override def simpleString: String = "V2Scan " + metadataString override def outputPartitioning: physical.Partitioning = reader match { case s: SupportsReportPartitioning => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index d578f18c5163..1ac9572de641 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,10 +24,10 @@ import org.apache.spark.sql.execution.SparkPlan object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: DataSourceV2Relation => - DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil + DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil case r: StreamingDataSourceV2Relation => - DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil + DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 650cd209bdc2..f55e115cba7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -52,7 +52,8 @@ class MicroBatchExecution( @volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty - private val readerToDataSourceMap = MutableMap.empty[MicroBatchReader, DataSourceV2] + private val readerToDataSourceMap = + MutableMap.empty[MicroBatchReader, (DataSourceV2, Map[String, String])] private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) @@ -99,7 +100,7 @@ class MicroBatchExecution( metadataPath, new DataSourceOptions(options.asJava)) nextSourceId += 1 - readerToDataSourceMap(reader) = dsV2 + readerToDataSourceMap(reader) = dsV2 -> options logInfo(s"Using MicroBatchReader [$reader] from DataSourceV2 named '$sourceName' [$dsV2]") StreamingExecutionRelation(reader, output)(sparkSession) }) @@ -420,12 +421,19 @@ class MicroBatchExecution( } reader.setOffsetRange(toJava(current), Optional.of(availableV2)) logDebug(s"Retrieving data from $reader: $current -> $availableV2") - Some(reader -> StreamingDataSourceV2Relation( - output = reader.readSchema().toAttributes, + + val (source, options) = reader match { + // `MemoryStream` is special. It's for test only and doesn't have a `DataSourceV2` + // implementation. We provide a fake one here for explain. + case _: MemoryStream[_] => MemoryStreamDataSource -> Map.empty[String, String] // Provide a fake value here just in case something went wrong, e.g. the reader gives // a wrong `equals` implementation. - source = readerToDataSourceMap.getOrElse(reader, FakeDataSourceV2), - reader = reader)) + case _ => readerToDataSourceMap.getOrElse(reader, { + FakeDataSourceV2 -> Map.empty[String, String] + }) + } + Some(reader -> StreamingDataSourceV2Relation( + reader.readSchema().toAttributes, source, options, reader)) case _ => None } } @@ -514,4 +522,6 @@ class MicroBatchExecution( } } +object MemoryStreamDataSource extends DataSourceV2 + object FakeDataSourceV2 extends DataSourceV2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 1fe2c8f70c98..b651ea0783ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -167,7 +167,7 @@ class ContinuousExecution( var insertedSourceId = 0 val withNewSources = logicalPlan transform { - case ContinuousExecutionRelation(source, _, output) => + case ContinuousExecutionRelation(source, options, output) => val reader = continuousSources(insertedSourceId) insertedSourceId += 1 val newOutput = reader.readSchema().toAttributes @@ -180,7 +180,7 @@ class ContinuousExecution( val loggedOffset = offsets.offsets(0) val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json)) reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull)) - StreamingDataSourceV2Relation(newOutput, source, reader) + StreamingDataSourceV2Relation(newOutput, source, options, reader) } // Rewire the plan to use the new attributes that were returned by the source. @@ -201,7 +201,7 @@ class ContinuousExecution( val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan) val reader = withSink.collect { - case StreamingDataSourceV2Relation(_, _, r: ContinuousReader) => r + case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r }.head reportTimeTaken("queryPlanning") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 70eb9f0ac66d..63773e15abda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -492,16 +492,20 @@ class StreamSuite extends StreamTest { val explainWithoutExtended = q.explainInternal(false) // `extended = false` only displays the physical plan. - assert("Streaming Relation".r.findAllMatchIn(explainWithoutExtended).size === 0) - assert("Scan FakeDataSourceV2".r.findAllMatchIn(explainWithoutExtended).size === 1) + assert("Streaming V2Relation MemoryStreamDataSource".r + .findAllMatchIn(explainWithoutExtended).size === 0) + assert("V2Scan MemoryStreamDataSource".r + .findAllMatchIn(explainWithoutExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithoutExtended.contains("StateStoreRestore")) val explainWithExtended = q.explainInternal(true) // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical // plan. - assert("Streaming Relation".r.findAllMatchIn(explainWithExtended).size === 3) - assert("Scan FakeDataSourceV2".r.findAllMatchIn(explainWithExtended).size === 1) + assert("Streaming V2Relation MemoryStreamDataSource".r + .findAllMatchIn(explainWithExtended).size === 3) + assert("V2Scan MemoryStreamDataSource".r + .findAllMatchIn(explainWithExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithExtended.contains("StateStoreRestore")) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 2aaf8a812a82..cb499d849d38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -604,8 +604,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be def findSourceIndex(plan: LogicalPlan): Option[Int] = { plan .collect { - case StreamingExecutionRelation(s, _) => s - case StreamingDataSourceV2Relation(_, _, r) => r + case r: StreamingExecutionRelation => r.source + case r: StreamingDataSourceV2Relation => r.reader } .zipWithIndex .find(_._1 == source) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 9ee9aaf87f87..f5884b9c8de1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -40,7 +40,7 @@ class ContinuousSuiteBase extends StreamTest { case s: ContinuousExecution => assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized") val reader = s.lastExecution.executedPlan.collectFirst { - case DataSourceV2ScanExec(_, _, r: RateStreamContinuousReader) => r + case DataSourceV2ScanExec(_, _, _, r: RateStreamContinuousReader) => r }.get val deltaMs = numTriggers * 1000 + 300 From fc29f8f7b5aa1bd33f5b4aa1ffd73bd9d84afc15 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 24 Feb 2018 01:19:52 +0800 Subject: [PATCH 3/8] remove the customer equality --- .../datasources/v2/DataSourceV2QueryPlan.scala | 18 ------------------ .../datasources/v2/DataSourceV2Relation.scala | 4 ---- .../datasources/v2/DataSourceV2ScanExec.scala | 2 -- 3 files changed, 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala index 85d0dc9ae453..5753dccc832e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.Objects - import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.catalyst.expressions.Attribute @@ -63,22 +61,6 @@ trait DataSourceV2QueryPlan { case _ => Set.empty } - /** - * The metadata of this data source query plan that can be used for equality check. - */ - private def metadata: Seq[Any] = Seq(source.getClass, output, options, filters) - - def canEqual(other: Any): Boolean - - override def equals(other: Any): Boolean = other match { - case other: DataSourceV2QueryPlan => canEqual(other) && metadata == other.metadata - case _ => false - } - - override def hashCode(): Int = { - metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b) - } - private def sourceName: String = source match { case registered: DataSourceRegister => registered.shortName() case _ => source.getClass.getSimpleName.stripSuffix("$") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 9070c288f1bd..19f654758e13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -40,8 +40,6 @@ case class DataSourceV2Relation( import DataSourceV2Relation._ - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] - override def simpleString: String = "V2Relation " + metadataString override lazy val schema: StructType = reader.readSchema() @@ -121,8 +119,6 @@ case class StreamingDataSourceV2Relation( override def isStreaming: Boolean = true - override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] - override def simpleString: String = "Streaming V2Relation " + metadataString override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 85f756e6a711..65684a3ea92c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -42,8 +42,6 @@ case class DataSourceV2ScanExec( @transient reader: DataSourceReader) extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan { - override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec] - override def simpleString: String = "V2Scan " + metadataString override def outputPartitioning: physical.Partitioning = reader match { From dfe603d80494530554d51295f6d2f2b49a3548a3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 24 Feb 2018 13:35:27 +0800 Subject: [PATCH 4/8] address comments --- .../datasources/v2/DataSourceV2Relation.scala | 4 +-- .../datasources/v2/DataSourceV2ScanExec.scala | 2 +- .../v2/PushDownOperatorsToDataSource.scala | 2 +- .../streaming/MicroBatchExecution.scala | 30 ++++++++++--------- .../spark/sql/streaming/StreamSuite.scala | 8 ++--- 5 files changed, 24 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 19f654758e13..1048b14c59cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -40,7 +40,7 @@ case class DataSourceV2Relation( import DataSourceV2Relation._ - override def simpleString: String = "V2Relation " + metadataString + override def simpleString: String = "RelationV2 " + metadataString override lazy val schema: StructType = reader.readSchema() @@ -119,7 +119,7 @@ case class StreamingDataSourceV2Relation( override def isStreaming: Boolean = true - override def simpleString: String = "Streaming V2Relation " + metadataString + override def simpleString: String = "Streaming RelationV2 " + metadataString override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 65684a3ea92c..110551da7d1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -42,7 +42,7 @@ case class DataSourceV2ScanExec( @transient reader: DataSourceReader) extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan { - override def simpleString: String = "V2Scan " + metadataString + override def simpleString: String = "ScanV2 " + metadataString override def outputPartitioning: physical.Partitioning = reader match { case s: SupportsReportPartitioning => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala index 96eff7193004..f23d22856724 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala @@ -26,7 +26,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] { override def apply( plan: LogicalPlan): LogicalPlan = plan transformUp { // PhysicalOperation guarantees that filters are deterministic; no need to check - case PhysicalOperation(project, newFilters, relation: DataSourceV2Relation) => + case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) => // merge the filters val filters = relation.filters match { case Some(existing) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index f55e115cba7a..06ebe6eb4525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -80,32 +80,32 @@ class MicroBatchExecution( sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",") val _logicalPlan = analyzedPlan.transform { - case s @ StreamingRelation(dsV1, sourceName, output) => - toExecutionRelationMap.getOrElseUpdate(s, { + case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) => + toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - val source = dsV1.createSource(metadataPath) + val source = dataSourceV1.createSource(metadataPath) nextSourceId += 1 - logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dsV1]") + logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]") StreamingExecutionRelation(source, output)(sparkSession) }) - - case s @ StreamingRelationV2(dsV2: MicroBatchReadSupport, sourceName, options, output, _) - if !disabledSources.contains(dsV2.getClass.getCanonicalName) => + case s @ StreamingRelationV2( + dataSourceV2: MicroBatchReadSupport, sourceName, options, output, _) if + !disabledSources.contains(dataSourceV2.getClass.getCanonicalName) => v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - val reader = dsV2.createMicroBatchReader( + val reader = dataSourceV2.createMicroBatchReader( Optional.empty(), // user specified schema metadataPath, new DataSourceOptions(options.asJava)) nextSourceId += 1 - readerToDataSourceMap(reader) = dsV2 -> options - logInfo(s"Using MicroBatchReader [$reader] from DataSourceV2 named '$sourceName' [$dsV2]") + readerToDataSourceMap(reader) = dataSourceV2 -> options + logInfo(s"Using MicroBatchReader [$reader] from " + + s"DataSourceV2 named '$sourceName' [$dataSourceV2]") StreamingExecutionRelation(reader, output)(sparkSession) }) - - case s @ StreamingRelationV2(dsV2, sourceName, _, output, v1Relation) => + case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output, v1Relation) => v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" @@ -115,7 +115,7 @@ class MicroBatchExecution( } val source = v1Relation.get.dataSource.createSource(metadataPath) nextSourceId += 1 - logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dsV2]") + logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dataSourceV2]") StreamingExecutionRelation(source, output)(sparkSession) }) } @@ -419,7 +419,9 @@ class MicroBatchExecution( case v1: SerializedOffset => reader.deserializeOffset(v1.json) case v2: OffsetV2 => v2 } - reader.setOffsetRange(toJava(current), Optional.of(availableV2)) + reader.setOffsetRange( + toJava(current), + Optional.of(availableV2)) logDebug(s"Retrieving data from $reader: $current -> $availableV2") val (source, options) = reader match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 63773e15abda..c1ec1eba69fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -492,9 +492,9 @@ class StreamSuite extends StreamTest { val explainWithoutExtended = q.explainInternal(false) // `extended = false` only displays the physical plan. - assert("Streaming V2Relation MemoryStreamDataSource".r + assert("Streaming RelationV2 MemoryStreamDataSource".r .findAllMatchIn(explainWithoutExtended).size === 0) - assert("V2Scan MemoryStreamDataSource".r + assert("ScanV2 MemoryStreamDataSource".r .findAllMatchIn(explainWithoutExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithoutExtended.contains("StateStoreRestore")) @@ -502,9 +502,9 @@ class StreamSuite extends StreamTest { val explainWithExtended = q.explainInternal(true) // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical // plan. - assert("Streaming V2Relation MemoryStreamDataSource".r + assert("Streaming RelationV2 MemoryStreamDataSource".r .findAllMatchIn(explainWithExtended).size === 3) - assert("V2Scan MemoryStreamDataSource".r + assert("ScanV2 MemoryStreamDataSource".r .findAllMatchIn(explainWithExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithExtended.contains("StateStoreRestore")) From 8c5b934c98485154f711d975864479761f01b481 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 27 Feb 2018 11:03:16 +0800 Subject: [PATCH 5/8] address comment --- .../datasources/v2/DataSourceV2Relation.scala | 14 ++++++++++++-- .../datasources/v2/DataSourceV2ScanExec.scala | 12 +++++++++++- ...ryPlan.scala => DataSourceV2StringFormat.scala} | 13 ++++--------- 3 files changed, 27 insertions(+), 12 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{DataSourceV2QueryPlan.scala => DataSourceV2StringFormat.scala} (88%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index ff5798c74c92..c0e4dad2ec08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -36,7 +36,7 @@ case class DataSourceV2Relation( projection: Seq[AttributeReference], filters: Option[Seq[Expression]] = None, userSpecifiedSchema: Option[StructType] = None) - extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan { + extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat { import DataSourceV2Relation._ @@ -115,7 +115,7 @@ case class StreamingDataSourceV2Relation( source: DataSourceV2, options: Map[String, String], reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan { + extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat { override def isStreaming: Boolean = true @@ -123,6 +123,16 @@ case class StreamingDataSourceV2Relation( override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) + override def equals(other: Any): Boolean = other match { + case other: StreamingDataSourceV2Relation => + output == other.output && source == other.source && options == other.options + case _ => false + } + + override def hashCode(): Int = { + Seq(output, source, options).hashCode() + } + override def computeStats(): Statistics = reader match { case r: SupportsReportStatistics => Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 110551da7d1d..e28793488742 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -40,10 +40,20 @@ case class DataSourceV2ScanExec( @transient source: DataSourceV2, @transient options: Map[String, String], @transient reader: DataSourceReader) - extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan { + extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { override def simpleString: String = "ScanV2 " + metadataString + override def equals(other: Any): Boolean = other match { + case other: StreamingDataSourceV2Relation => + output == other.output && source == other.source && options == other.options + case _ => false + } + + override def hashCode(): Int = { + Seq(output, source, options).hashCode() + } + override def outputPartitioning: physical.Partitioning = reader match { case s: SupportsReportPartitioning => new DataSourcePartitioning( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala similarity index 88% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala index 5753dccc832e..8e77f5209eba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala @@ -27,11 +27,10 @@ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.util.Utils /** - * A base class for data source v2 related query plan(both logical and physical). It defines the - * equals/hashCode methods, and provides a string representation of the query plan, according to - * some common information. + * A trait that can be used by data source v2 related query plans(both logical and physical), to + * provide a string format of the data source information for explain. */ -trait DataSourceV2QueryPlan { +trait DataSourceV2StringFormat { /** * The instance of this data source implementation. Note that we only consider its class in @@ -74,11 +73,7 @@ trait DataSourceV2QueryPlan { } // TODO: we should only display some standard options like path, table, etc. - if (options.nonEmpty) { - entries += "Options" -> options.map { - case (k, v) => s"$k=$v" - }.mkString("[", ",", "]") - } + entries ++= options val outputStr = Utils.truncatedString(output, "[", ", ", "]") From c5af52ea185e6f94f64096a4937f462db47a4fc5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 28 Feb 2018 12:51:47 +0800 Subject: [PATCH 6/8] address comments --- .../sql/execution/datasources/v2/DataSourceV2Relation.scala | 3 ++- .../sql/execution/datasources/v2/DataSourceV2ScanExec.scala | 5 +++-- .../execution/datasources/v2/DataSourceV2StringFormat.scala | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index c0e4dad2ec08..2b282ffae239 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -123,9 +123,10 @@ case class StreamingDataSourceV2Relation( override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { case other: StreamingDataSourceV2Relation => - output == other.output && source == other.source && options == other.options + output == other.output && reader.getClass == other.reader.getClass && options == other.options case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index e28793488742..cb691ba29707 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -44,9 +44,10 @@ case class DataSourceV2ScanExec( override def simpleString: String = "ScanV2 " + metadataString + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { - case other: StreamingDataSourceV2Relation => - output == other.output && source == other.source && options == other.options + case other: DataSourceV2ScanExec => + output == other.output && reader.getClass == other.reader.getClass && options == other.options case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala index 8e77f5209eba..bdf951a0a0c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala @@ -79,7 +79,7 @@ trait DataSourceV2StringFormat { val entriesStr = if (entries.nonEmpty) { Utils.truncatedString(entries.map { - case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100) + case (key, value) => StringUtils.abbreviate(redact(key + ":" + value), 100) }, " (", ", ", ")") } else { "" From 6fe76817032cb9b6bac47f14b79d7a4041e286dd Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 1 Mar 2018 13:49:24 +0800 Subject: [PATCH 7/8] fix redact --- .../datasources/v2/DataSourceV2StringFormat.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala index bdf951a0a0c3..5cc94e2fedaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala @@ -73,13 +73,17 @@ trait DataSourceV2StringFormat { } // TODO: we should only display some standard options like path, table, etc. - entries ++= options + if (options.nonEmpty) { + entries += "Options" -> Utils.redact(options).map { + case (k, v) => s"$k=$v" + }.mkString("[", ",", "]") + } val outputStr = Utils.truncatedString(output, "[", ", ", "]") val entriesStr = if (entries.nonEmpty) { Utils.truncatedString(entries.map { - case (key, value) => StringUtils.abbreviate(redact(key + ":" + value), 100) + case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100) }, " (", ", ", ")") } else { "" From 1164eecb55d2120bc71c059ea6944ea38d13d300 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 2 Mar 2018 13:33:31 +0800 Subject: [PATCH 8/8] clean up --- .../datasources/v2/DataSourceV2StringFormat.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala index 5cc94e2fedaa..aed55a429bfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala @@ -69,7 +69,7 @@ trait DataSourceV2StringFormat { val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)] if (filters.nonEmpty) { - entries += "Pushed Filters" -> filters.mkString("[", ", ", "]") + entries += "Filters" -> filters.mkString("[", ", ", "]") } // TODO: we should only display some standard options like path, table, etc. @@ -83,7 +83,7 @@ trait DataSourceV2StringFormat { val entriesStr = if (entries.nonEmpty) { Utils.truncatedString(entries.map { - case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100) + case (key, value) => key + ": " + StringUtils.abbreviate(value, 100) }, " (", ", ", ")") } else { "" @@ -91,8 +91,4 @@ trait DataSourceV2StringFormat { s"$sourceName$outputStr$entriesStr" } - - private def redact(text: String): String = { - Utils.redact(SQLConf.get.stringRedationPattern, text) - } }