Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
}.exists { r =>
// Ensure the new topic is present and the old topic is gone.
r.knownPartitions.exists(_.topic == topic2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
case StreamingDataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
case StreamingDataSourceV2Relation(_, _, _, reader: KafkaContinuousReader) => reader
}
})
}.distinct
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,12 @@ case class DataSourceV2Relation(
options: Map[String, String],
projection: Seq[AttributeReference],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation {
userSpecifiedSchema: Option[StructType] = None)
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {

import DataSourceV2Relation._

override def simpleString: String = {
s"DataSourceV2Relation(source=${source.name}, " +
s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " +
s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
}
override def simpleString: String = "RelationV2 " + metadataString

override lazy val schema: StructType = reader.readSchema()

Expand Down Expand Up @@ -107,19 +104,36 @@ case class DataSourceV2Relation(
}

/**
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
* to the non-streaming relation.
* A specialization of [[DataSourceV2Relation]] with the streaming bit set to true.
*
* Note that, this plan has a mutable reader, so Spark won't apply operator push-down for this plan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm -0 on including this in a PR to improve explain results. This could needlessly cause commit conflicts when maintaining a branch. But, this is small and needs to go in somewhere. I would remove it, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you suggesting we should open a JIRA to fix the internal document? We usually fix them in related PRs that touch the class...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think it is okay, but I would not add it if it were my commit. It is unlikely that this will cause commit conflicts so it should be fine. I would just prefer to have a docs commit later to keep track of this.

* to avoid making the plan mutable. We should consolidate this plan and [[DataSourceV2Relation]]
* after we figure out how to apply operator push-down for streaming data sources.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the streaming execution creates the reader once. The reader is mutable and contains 2 kinds of states:

  1. operator push-down states, e.g. the filters being pushed down.
  2. streaming related states, like offsets, kafka connection, etc.

For continues mode, it's fine. We create the reader, set offsets, construct the plan, get the physical plan, and process. We mutate the reader states at the beginning and never mutate it again.

For micro-batch mode, we have a problem. We create the reader at the beginning, set reader offset, construct the plan and get the physical plan for every batch. This means we apply operator push-down to this reader many times, and data source v2 doesn't define what the behavior should be for this case. Thus we can't apply operator push-down for streaming data sources.

@marmbrus @tdas @zsxwing @jose-torres I have 2 proposals to support operator push down for streaming relation:

  1. Introduce a reset API to DataSourceReader to clear out the operator push-down states. Then we can call reset for every micro-batch and safely apply operator pushdown.
  2. Do plan analyzing/optimizing/planning only once for micro-batch mode. Theoretically it's not good, as different micro-batch may have different statistics and the optimal physical plan is different, we should rerun the planner for each batch. The benefit is, plan analyzing/optimizing/planning may be costly, doing it once can mitigate the cost. Also adaptive execution can help so it's not that bad to reuse the same physical plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I had assumed that data source v2 guaranteed it would call all the supported stateful methods during planning, and that the most recent call won.

Proposal 2 I don't think will work, since the planning for each batch is required at a fairly deep level and adaptive execution is disabled for streaming. Proposal 1 sounds fine to me, although I'd like to note that this kinda seems like it's working around an issue with having stateful pushdown methods. From an abstract perspective, they're really just action-at-a-distance parameters for createReadTasks().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue as well.

I agree that we really need to define the contract here, and personally I would really benefit from seeing a life cycle diagram for all of the different pieces of the API. @tdas and @jose-torres made one for just the write side of streaming and it was super useful for me as someone at a distance that wants to understand what was going on.

screen shot 2018-02-21 at 2 18 55 pm

Something like this diagram that also covered when things are resolved, when pushdown happens, and that shows the differences between read/write, microbatch, batch and continuous would be awesome.

Regarding the actual question, I'm not a huge fan of option 2 as it still seems like an implicit contract with this mutable object (assuming I understand the proposal correctly). Option 1 at least means that we could say, "whenever its time to do pushdown: call reset(), do pushdown in some defined order, then call createX(). It is invalid to do more pushdown after createX has been called".

Even better than a reset() might be a cleanClone() method that gives you a fresh copy. As I said above, I don't really understand the lifecycle of the API, but given how we reuse query plan fragments I'm really nervous about mutable objects that are embedded in operators.

I also agree with @jose-torres point that this mechanism looks like action at a distance, but the reset() contract at least localizes it to some degree, and I don't have a better suggestion for a way to support evolvable pushdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also ask the implementations to clear out all the state after createX is called, then we don't need to add any new APIs. Anyway there should be a detailed design doc for data source v2 operator push down, it doesn't block this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a diagram would really help us follow what's happening and the assumptions that are going into these proposals.

I'd also like to see this discussion happen on the dev list, where more people can participate. The streaming API for v2 wasn't really discussed there (unless I missed it) and given these challenges I think we should go back and have a design and discussion on it. This PR probably isn't the right place to get into these details.

*/
case class StreamingDataSourceV2Relation(
output: Seq[AttributeReference],
source: DataSourceV2,
options: Map[String, String],
reader: DataSourceReader)
extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation {
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {

override def isStreaming: Boolean = true

override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation]
override def simpleString: String = "Streaming RelationV2 " + metadataString

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: StreamingDataSourceV2Relation =>
output == other.output && reader.getClass == other.reader.getClass && options == other.options
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's exactly same as before. We should clean it up after figure out how to push down operators to streaming relation.

case _ => false
}

override def hashCode(): Int = {
Seq(output, source, options).hashCode()
}

override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types.StructType
Expand All @@ -36,10 +37,23 @@ import org.apache.spark.sql.types.StructType
*/
case class DataSourceV2ScanExec(
output: Seq[AttributeReference],
@transient source: DataSourceV2,
@transient options: Map[String, String],
@transient reader: DataSourceReader)
extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {

override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec]
override def simpleString: String = "ScanV2 " + metadataString

// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: DataSourceV2ScanExec =>
output == other.output && reader.getClass == other.reader.getClass && options == other.options
case _ => false
}

override def hashCode(): Int = {
Seq(output, source, options).hashCode()
}

override def outputPartitioning: physical.Partitioning = reader match {
case s: SupportsReportPartitioning =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan

object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case relation: DataSourceV2Relation =>
DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
case r: DataSourceV2Relation =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why rename this variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relation is called many times in the next line, renaming it to r can shorten the code below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change is necessary and it just makes the patch larger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking, if I don't change this line, then I need to split the next line into 2 lines, as I need to pass more parameters, so it's still a 2-line diff.

I think tiny things like this is really not worth to be pointed during code review, we should save our effort to focus more on the actual code logic. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pointing it out because I think it is significant. This is also why I sent a note to the dev list. Changes like this make it much harder to work with Spark because little things change that are not necessary and cause conflicts. That's a problem not just when I'm maintaining a branch, but also when I'm trying to get a PR committed. These changes cause more work because we have to rebase PRs and update for variables that have changed names in the name of style.

In this particular case, I would probably wrap the line that is too long. If you want to rename to fix it, I'd consider that reasonable. But all the other cases that aren't necessary should be reverted.

DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil

case relation: StreamingDataSourceV2Relation =>
DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
case r: StreamingDataSourceV2Relation =>
DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil

case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.util.Utils

/**
* A trait that can be used by data source v2 related query plans(both logical and physical), to
* provide a string format of the data source information for explain.
*/
trait DataSourceV2StringFormat {

/**
* The instance of this data source implementation. Note that we only consider its class in
* equals/hashCode, not the instance itself.
*/
def source: DataSourceV2

/**
* The output of the data source reader, w.r.t. column pruning.
*/
def output: Seq[Attribute]

/**
* The options for this data source reader.
*/
def options: Map[String, String]

/**
* The created data source reader. Here we use it to get the filters that has been pushed down
* so far, itself doesn't take part in the equals/hashCode.
*/
def reader: DataSourceReader

private lazy val filters = reader match {
case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
case s: SupportsPushDownFilters => s.pushedFilters().toSet
case _ => Set.empty
}

private def sourceName: String = source match {
case registered: DataSourceRegister => registered.shortName()
case _ => source.getClass.getSimpleName.stripSuffix("$")
}

def metadataString: String = {
val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]

if (filters.nonEmpty) {
entries += "Pushed Filters" -> filters.mkString("[", ", ", "]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Does this need "Pushed"?

}

// TODO: we should only display some standard options like path, table, etc.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For followup, there are 2 proposals:

  1. define some standard options and only display standard options, if they are specified.
  2. Create a new mix-in interface to allow data source implementations to decide which options they want to show during explain.

entries ++= options
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


val outputStr = Utils.truncatedString(output, "[", ", ", "]")

val entriesStr = if (entries.nonEmpty) {
Utils.truncatedString(entries.map {
case (key, value) => StringUtils.abbreviate(redact(key + ":" + value), 100)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now users can match password by password:.+ to redact password.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not redact the map? That would work with the default redaction pattern. If I understand correctly, this will require customization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the SQLConf.stringRedationPattern, the default pattern is empty. So it needs customization anyway, and this implementation is easier to customize. If we redact the entire map, I'm not sure how to write a regex to precisely match the password part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, part of the problem is that we're using that pattern and not the secret redaction pattern, which defaults to "(?i)secret|password|url|user|username".r: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L328-L335

If you used the map redaction method, it would use the default pattern to match keys that should be redacted: https://github.com/apache/spark/blob/a5a4b83/core/src/main/scala/org/apache/spark/util/Utils.scala#L2723-L2729

}, " (", ", ", ")")
} else {
""
}

s"$sourceName$outputStr$entriesStr"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be spaces? I don't see spaces added to the components, so it looks like this might look squished together.

Copy link
Contributor Author

@cloud-fan cloud-fan Mar 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputStr doesn't need space, we want Relation[a: int] instead of Relation [a: int]. This is also what data source v1 explains.

entriesStr has space. It's added via the paramter of mkString: " (", ", ", ")". It's mostly to avoid the extra space if entriesStr is empty.

}

private def redact(text: String): String = {
Utils.redact(SQLConf.get.stringRedationPattern, text)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ package org.apache.spark.sql.execution.streaming
import java.util.Optional

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
import scala.collection.mutable.{Map => MutableMap}

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
Expand All @@ -52,6 +52,9 @@ class MicroBatchExecution(

@volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty

private val readerToDataSourceMap =
MutableMap.empty[MicroBatchReader, (DataSourceV2, Map[String, String])]

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
Expand Down Expand Up @@ -97,6 +100,7 @@ class MicroBatchExecution(
metadataPath,
new DataSourceOptions(options.asJava))
nextSourceId += 1
readerToDataSourceMap(reader) = dataSourceV2 -> options
logInfo(s"Using MicroBatchReader [$reader] from " +
s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
StreamingExecutionRelation(reader, output)(sparkSession)
Expand Down Expand Up @@ -419,8 +423,19 @@ class MicroBatchExecution(
toJava(current),
Optional.of(availableV2))
logDebug(s"Retrieving data from $reader: $current -> $availableV2")
Some(reader ->
new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))

val (source, options) = reader match {
// `MemoryStream` is special. It's for test only and doesn't have a `DataSourceV2`
// implementation. We provide a fake one here for explain.
case _: MemoryStream[_] => MemoryStreamDataSource -> Map.empty[String, String]
// Provide a fake value here just in case something went wrong, e.g. the reader gives
// a wrong `equals` implementation.
case _ => readerToDataSourceMap.getOrElse(reader, {
FakeDataSourceV2 -> Map.empty[String, String]
})
}
Some(reader -> StreamingDataSourceV2Relation(
reader.readSchema().toAttributes, source, options, reader))
case _ => None
}
}
Expand Down Expand Up @@ -518,3 +533,7 @@ class MicroBatchExecution(
Optional.ofNullable(scalaOption.orNull)
}
}

object MemoryStreamDataSource extends DataSourceV2

object FakeDataSourceV2 extends DataSourceV2
Loading