Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
naming updates
  • Loading branch information
cloud-fan committed Sep 13, 2017
commit abcc606e006e9975d1507eed379a48a3134165ad
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,11 @@

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.types.StructType;

/**
* The main interface for data source v2 implementations. Users can mix in more interfaces to
* implement more functions other than just scan.
* The base interface for data source v2 implementations.
*
* Note that this is an empty interface, data source implementations should mix-in at least one of
* the plug-in interfaces like `ReadSupport`. Otherwise it's just a dummy data source which is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use an actual link ...

* un-readable/writable.
*/
public interface DataSourceV2 {

/**
* Create a `DataSourceV2Reader` to scan the data for this data source.
*
* @param options the options for this data source reader, which is an immutable case-insensitive
* string-to-string map.
* @return a reader that implements the actual read logic.
*/
DataSourceV2Reader createReader(DataSourceV2Options options);
}
public interface DataSourceV2 {}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.Optional;

/**
* An immutable case-insensitive string-to-string map, which is used to represent data source
* options.
* An immutable string-to-string map in which keys are case-insensitive. This is used to represent
* data source options.
*/
public class DataSourceV2Options {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a simple test suite for this

private final Map<String, String> keyLowerCasedMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;

/**
* A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading
Copy link
Contributor

@rxin rxin Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users -> data source implementers

Actually a better one is

"Data sources can implement"

* ability and scan the data from the data source.
*/
public interface ReadSupport {

/**
* Creates a `DataSourceV2Reader` to scan the data for this data source.
*
* @param options the options for this data source reader, which is an immutable case-insensitive
* string-to-string map.
* @return a reader that implements the actual read logic.
*/
DataSourceV2Reader createReader(DataSourceV2Options options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.spark.sql.types.StructType;

/**
* A variant of `DataSourceV2` which requires users to provide a schema when reading data. A data
* source can inherit both `DataSourceV2` and `SchemaRequiredDataSourceV2` if it supports both schema
* inference and user-specified schemas.
* A mix-in interface for `DataSourceV2`. Users can implement this interface to provide data reading
* ability and scan the data from the data source.
*
* This is a variant of `ReadSupport` that accepts user-specified schema when reading data. A data
* source can implement both `ReadSupport` and `ReadSupportWithSchema` if it supports both schema
* inference and user-specified schema.
*/
public interface SchemaRequiredDataSourceV2 {
public interface ReadSupportWithSchema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still find ReadSupport vs ReadSupportWithSchema pretty confusing. But let's address that separately.


/**
* Create a `DataSourceV2Reader` to scan the data for this data source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
* this data source reader.
*
* There are mainly 3 kinds of query optimizations:
* 1. push operators downward to the data source, e.g., column pruning, filter push down, etc.
* 2. propagate information upward to Spark, e.g., report statistics, report ordering, etc.
* 3. special scans like columnar scan, unsafe row scan, etc. Note that a data source reader can
* implement at most one special scan.
* 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
* pruning), etc. These push-down interfaces are named like `SupportsPushDownXXX`.
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. These
* reporting interfaces are named like `SupportsReportingXXX`.
* 3. Special scan. E.g, columnar scan, unsafe row scan, etc. Note that a data source reader can
* implement at most one special scan. These scan interfaces are named like `SupportsScanXXX`.
*
* Spark first applies all operator push-down optimizations which this data source supports. Then
* Spark collects information this data source provides for further optimizations. Finally Spark
* Spark first applies all operator push-down optimizations that this data source supports. Then
* Spark collects information this data source reported for further optimizations. Finally Spark
* issues the scan request and does the actual data reading.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: this is not true now, as we push down operators at the planning phase. We need to do some refactor and move it to the optimizing phase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be really nice imho.

*/
public interface DataSourceV2Reader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader.upward;
package org.apache.spark.sql.sources.v2.reader;

import java.util.OptionalLong;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader.downward;
package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
Expand All @@ -24,14 +24,14 @@
/**
* A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down
* arbitrary expressions as predicates to the data source. This is an experimental and unstable
* interface
* interface as `Expression` is not public and may get changed in future Spark versions.
*
* Note that, if users implement both this interface and `FilterPushDownSupport`, Spark will ignore
* `FilterPushDownSupport` and only process this interface.
* Note that, if users implement both this interface and `SupportsPushDownFilters`, Spark will
* ignore `SupportsPushDownFilters` and only process this interface.
*/
@Experimental
@InterfaceStability.Unstable
public interface CatalystFilterPushDownSupport {
public interface SupportsPushDownCatalystFilters {

/**
* Pushes down filters, and returns unsupported filters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader.downward;
package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.sql.sources.Filter;

/**
* A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down
* filters to the data source and reduce the size of the data to be read.
*
* Note that, if users implement both this interface and `CatalystFilterPushDownSupport`, Spark
* will ignore this interface and only process `CatalystFilterPushDownSupport`.
* Note that, if users implement both this interface and `SupportsPushDownCatalystFilters`, Spark
* will ignore this interface and only process `SupportsPushDownCatalystFilters`.
*/
public interface FilterPushDownSupport {
public interface SupportsPushDownFilters {

/**
* Pushes down filters, and returns unsupported filters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader.downward;
package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.sql.types.StructType;

/**
* A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to only read the
* required columns/nested fields during scan.
* A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to push down
* required columns and only read these columns during scan.
*/
public interface ColumnPruningSupport {
public interface SupportsPushDownRequiredColumns {

/**
* Apply column pruning w.r.t. the given requiredSchema.
* Applies column pruning w.r.t. the given requiredSchema.
*
* Implementation should try its best to prune the unnecessary columns/nested fields, but it's
* Implementation should try its best to prune the unnecessary columns or nested fields, but it's
* also OK to do the pruning partially, e.g., a data source may not be able to prune nested
* fields, and only prune top-level columns.
*
* Note that, data source implementations should update `DataSourceReader.readSchema` after
* applying column pruning.
*/
void pruneColumns(StructType requiredSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link this to readSchema function

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader.upward;
package org.apache.spark.sql.sources.v2.reader;

/**
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to report
* statistics to Spark.
*/
public interface StatisticsSupport {
public interface SupportsReportStatistics {
Statistics getStatistics();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the returned stats be adjusted by the data sources based on the operator push-down?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should, but we need some refactor on optimizer, see #19136 (comment)

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader.scan;
package org.apache.spark.sql.sources.v2.reader;

import java.util.List;

Expand All @@ -28,18 +28,16 @@

/**
* A mix-in interface for `DataSourceV2Reader`. Users can implement this interface to output
* unsafe rows directly and avoid the row copy at Spark side.
*
* Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and
* may get changed in future Spark versions.
* unsafe rows directly and avoid the row copy at Spark side. This is an experimental and unstable
* interface, as `UnsafeRow` is not public and may get changed in future Spark versions.
*/
@Experimental
@InterfaceStability.Unstable
public interface UnsafeRowScan extends DataSourceV2Reader {
public interface SupportsScanUnsafeRow extends DataSourceV2Reader {

@Override
default List<ReadTask<Row>> createReadTasks() {
throw new IllegalStateException("createReadTasks should not be called with UnsafeRowScan.");
throw new IllegalStateException("createReadTasks should not be called with SupportsScanUnsafeRow.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, SchemaRequiredDataSourceV2}
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -183,31 +183,29 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}

val cls = DataSource.lookupDataSource(source)
val isDataSourceV2 = classOf[DataSourceV2].isAssignableFrom(cls) ||
classOf[SchemaRequiredDataSourceV2].isAssignableFrom(cls)
if (isDataSourceV2) {
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val dataSource = cls.newInstance()
val options = new DataSourceV2Options(extraOptions.asJava)

val reader = (cls.newInstance(), userSpecifiedSchema) match {
case (ds: SchemaRequiredDataSourceV2, Some(schema)) =>
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)

case (ds: DataSourceV2, None) =>
case (ds: ReadSupport, None) =>
ds.createReader(options)

case (_: SchemaRequiredDataSourceV2, None) =>
case (_: ReadSupportWithSchema, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $dataSource.")

case (ds: DataSourceV2, Some(schema)) =>
case (ds: ReadSupport, Some(schema)) =>
val reader = ds.createReader(options)
if (reader.readSchema() != schema) {
throw new AnalysisException(s"$ds does not allow user-specified schemas.")
}
reader

case _ =>
throw new AnalysisException(s"$cls is not a valid Spark SQL Data Source.")
throw new AnalysisException(s"$cls does not support data reading.")
}

Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
import org.apache.spark.sql.sources.v2.reader.{DataSourceV2Reader, SupportsReportStatistics}

case class DataSourceV2Relation(
output: Seq[AttributeReference],
reader: DataSourceV2Reader) extends LeafNode {

override def computeStats(): Statistics = reader match {
case r: StatisticsSupport =>
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataSourceV2Reader, ReadTask}
import org.apache.spark.sql.sources.v2.reader.scan.UnsafeRowScan
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.StructType

case class DataSourceV2ScanExec(
Expand All @@ -52,7 +51,7 @@ case class DataSourceV2ScanExec(

override protected def doExecute(): RDD[InternalRow] = {
val readTasks: java.util.List[ReadTask[UnsafeRow]] = reader match {
case r: UnsafeRowScan => r.createUnsafeRowReadTasks()
case r: SupportsScanUnsafeRow => r.createUnsafeRowReadTasks()
case _ =>
reader.createReadTasks().asScala.map {
new RowToUnsafeRowReadTask(_, reader.readSchema()): ReadTask[UnsafeRow]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport}
import org.apache.spark.sql.sources.v2.reader._

object DataSourceV2Strategy extends Strategy {
// TODO: write path
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, DataSourceV2Relation(output, reader)) =>
val stayUpFilters: Seq[Expression] = reader match {
case r: CatalystFilterPushDownSupport =>
case r: SupportsPushDownCatalystFilters =>
r.pushCatalystFilters(filters.toArray)

case r: FilterPushDownSupport =>
case r: SupportsPushDownFilters =>
// A map from original Catalyst expressions to corresponding translated data source
// filters. If a predicate is not in this map, it means it cannot be pushed down.
val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
Expand Down Expand Up @@ -65,7 +65,7 @@ object DataSourceV2Strategy extends Strategy {
// TODO: nested fields pruning
val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap)
reader match {
case r: ColumnPruningSupport =>
case r: SupportsPushDownRequiredColumns =>
r.pruneColumns(requiredColumns.toStructType)
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@
import org.apache.spark.sql.sources.GreaterThan;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.downward.ColumnPruningSupport;
import org.apache.spark.sql.sources.v2.reader.ReadTask;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.sources.v2.reader.downward.FilterPushDownSupport;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;

public class JavaAdvancedDataSourceV2 implements DataSourceV2 {
public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {

class Reader implements DataSourceV2Reader, ColumnPruningSupport, FilterPushDownSupport {
class Reader implements DataSourceV2Reader, SupportsPushDownRequiredColumns, SupportsPushDownFilters {
private StructType requiredSchema = new StructType().add("i", "int").add("j", "int");
private Filter[] filters = new Filter[0];

Expand Down
Loading