From 6d474a1c2689342b55d4c7d75304c3c424b8e1ec Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 12 Jul 2017 02:28:16 +0800 Subject: [PATCH 1/4] working prototype --- .../sql/sources/v2/BucketingSupport.java | 28 ++ .../spark/sql/sources/v2/DataSourceV2.java | 43 +++ .../v2/DataSourceV2SchemaProvider.java | 42 +++ .../sql/sources/v2/PartitioningSupport.java | 26 ++ .../sql/sources/v2/WritableDataSourceV2.java | 38 +++ .../reader/CatalystFilterPushDownSupport.java | 40 +++ .../v2/reader/ColumnPruningSupport.java | 32 +++ .../v2/reader/ColumnarReadSupport.java | 42 +++ .../sources/v2/reader/DataSourceV2Reader.java | 66 +++++ .../v2/reader/FilterPushDownSupport.java | 36 +++ .../reader/HashPartitionPushDownSupport.java | 41 +++ .../v2/reader/LimitPushDownSupport.java | 30 ++ .../spark/sql/sources/v2/reader/ReadTask.java | 57 ++++ .../v2/reader/RowToUnsafeRowReadTask.java | 62 +++++ .../v2/reader/SamplePushDownSupport.java | 32 +++ .../sql/sources/v2/reader/SortPushDown.java | 30 ++ .../sources/v2/reader/StatisticsSupport.java | 27 ++ .../sources/v2/writer/DataSourceV2Writer.java | 42 +++ .../sql/sources/v2/writer/WriteTask.java | 66 +++++ .../v2/writer/WriterCommitMessage.java | 30 ++ .../apache/spark/sql/DataFrameReader.scala | 44 ++- .../sql/execution/ColumnarBatchScan.scala | 2 - .../spark/sql/execution/SparkPlanner.scala | 2 + .../datasources/v2/DataSourceRDD.scala | 74 +++++ .../datasources/v2/DataSourceV2Relation.scala | 32 +++ .../datasources/v2/DataSourceV2ScanExec.scala | 88 ++++++ .../datasources/v2/DataSourceV2Strategy.scala | 89 ++++++ .../sources/v2/JavaAdvancedDataSourceV2.java | 130 +++++++++ .../sql/sources/v2/JavaBatchDataSourceV2.java | 103 +++++++ .../sources/v2/JavaSimpleDataSourceV2.java | 82 ++++++ .../sources/v2/JavaUnsafeRowDataSourceV2.java | 92 +++++++ .../sql/sources/v2/DataSourceV2Suite.scala | 260 ++++++++++++++++++ 32 files changed, 1799 insertions(+), 9 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/BucketingSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2SchemaProvider.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/PartitioningSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/WritableDataSourceV2.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnPruningSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/HashPartitionPushDownSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/LimitPushDownSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/RowToUnsafeRowReadTask.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SamplePushDownSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteTask.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BucketingSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BucketingSupport.java new file mode 100644 index 000000000000..f66ea10278fd --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BucketingSupport.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +/** + * A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this + * interface to specify the bucket/sort columns of the reader/writer, to improve performance. + */ +public interface BucketingSupport { + void setBucketColumns(String[] bucketColumns, int numBuckets); + + void setSortColumns(String[] sortColumns); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java new file mode 100644 index 000000000000..a504b25a439f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +/** + * The main interface and minimal requirement for data source v2 implementations. Users can mix in + * more interfaces to implement more functions other than just scan. + */ +public interface DataSourceV2 { + + /** + * The main entrance for read interface. + * + * @param schema the full schema of this data source reader. Full schema usually maps to the + * physical schema of the underlying storage of this data source reader, e.g. + * parquet files, JDBC tables, etc, while this reader may not read data with full + * schema, as column pruning or other optimizations may happen. + * @param options the options for this data source reader, which is case insensitive. + * @return a reader that implements the actual read logic. + */ + DataSourceV2Reader createReader( + StructType schema, + CaseInsensitiveMap options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2SchemaProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2SchemaProvider.java new file mode 100644 index 000000000000..8d61b38edf56 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2SchemaProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.types.StructType; + +/** + * A mix in interface for `DataSourceV2`. Users can implement this interface to provide schema + * inference ability when scanning data. + */ +public interface DataSourceV2SchemaProvider { + /** + * Return the inferred schema of this data source given these options. + */ + StructType inferSchema(CaseInsensitiveMap options); + + /** + * Whether or not this data source can accept user specified schema. When Spark scans a data + * source, users can specify the schema to avoid expensive schema inference. However some data + * sources may have to infer the schema and reject any user specified schemas, they can overwrite + * this method to achieve this. + */ + default boolean acceptsUserDefinedSchema() { + return true; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/PartitioningSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/PartitioningSupport.java new file mode 100644 index 000000000000..b39d839eaacb --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/PartitioningSupport.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +/** + * A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this + * interface to specify the partition columns of the reader/writer, to improve performance. + */ +public interface PartitioningSupport { + void setPartitionColumns(String[] partitionColumns); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WritableDataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WritableDataSourceV2.java new file mode 100644 index 000000000000..e3d46907ebc2 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WritableDataSourceV2.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; + +/** + * A mix in interface for `DataSourceV2`. Users can implement this interface to provide data writing + * ability with job-level transaction. + */ +public interface WritableDataSourceV2 extends DataSourceV2 { + + /** + * The main entrance for write interface. + * + * @param mode the save move, can be append, overwrite, etc. + * @param options the options for this data source writer. + * @return a writer that implements the actual write logic. + */ + DataSourceV2Writer createWriter(SaveMode mode, CaseInsensitiveMap options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java new file mode 100644 index 000000000000..f9bda2ced3ac --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.expressions.Expression; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * arbitrary expressions as predicates to the data source. + */ +@Experimental +@InterfaceStability.Unstable +public interface CatalystFilterPushDownSupport { + /** + * Push down one filter, returns true if this filter can be pushed down to this data source, + * false otherwise. This method might be called many times if more than one filter need to be + * pushed down. + * + * TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which + * returns unsupported filters. + */ + boolean pushDownCatalystFilter(Expression filter); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnPruningSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnPruningSupport.java new file mode 100644 index 000000000000..6f0cb8cd4a19 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnPruningSupport.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.types.StructType; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to only read + * required columns/nested fields during scan. + */ +public interface ColumnPruningSupport { + /** + * Returns true if the implementation can apple this column pruning optimization, so that we can + * reduce the data size to be read at the very beginning. + */ + boolean pruneColumns(StructType requiredSchema); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java new file mode 100644 index 000000000000..62b3a5172331 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.util.List; + +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to provide + * columnar read ability for better performance. + */ +public interface ColumnarReadSupport { + /** + * Similar to `DataSourceV2Reader.createReadTasks`, but return data in columnar format. + */ + List> createColumnarReadTasks(); + + /** + * A safety door for columnar reader. It's possible that the implementation can only support + * columnar reads for some certain columns, users can overwrite this method to fallback to + * normal read path under some conditions. + */ + default boolean supportsColumnarReads() { + return true; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java new file mode 100644 index 000000000000..eeebacb5d0d7 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; + +/** + * The main interface and minimal requirement for a data source reader. The implementations should + * at least implement the full scan logic, users can mix in more interfaces to implement scan + * optimizations like column pruning, filter push down, etc. + */ +public abstract class DataSourceV2Reader { + + /** + * The actual schema of this data source reader, which may be different from the physical schema + * of the underlying storage, as column pruning or other optimizations may happen. + */ + public abstract StructType readSchema(); + + /** + * The actual read logic should be implemented here. This may not be a full scan as optimizations + * may have already been applied on this reader. Implementations should return a list of + * read tasks, each task is responsible to output data for one RDD partition, which means + * the number of tasks returned here will be same as the number of RDD partitions this scan + * output. + */ + // TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row. + public abstract List> createReadTasks(); + + /** + * Inside Spark, the input rows will be converted to `UnsafeRow`s before processing. To avoid + * this conversion, implementations can overwrite this method and output `UnsafeRow`s directly. + * Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and + * may get changed in future Spark versions. + */ + @Experimental + @InterfaceStability.Unstable + public List> createUnsafeRowReadTasks() { + StructType schema = readSchema(); + return createReadTasks().stream() + .map(rowGenerator -> new RowToUnsafeRowReadTask(rowGenerator, schema)) + .collect(Collectors.toList()); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java new file mode 100644 index 000000000000..9a65ac22f37f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.Filter; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * filters to the data source and reduce the size of the data to be read. + */ +public interface FilterPushDownSupport { + /** + * Push down one filter, returns true if this filter can be pushed down to this data source, + * false otherwise. This method might be called many times if more than one filter need to be + * pushed down. + * + * TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which + * returns unsupported filters. + */ + boolean pushDownFilter(Filter filter); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/HashPartitionPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/HashPartitionPushDownSupport.java new file mode 100644 index 000000000000..1fb349951162 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/HashPartitionPushDownSupport.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-partition + * the data and avoid shuffle at Spark side. + * + * Note that this interface is marked as unstable, as the implementation needs to be consistent + * with the Spark SQL shuffle hash function, which is internal and may get changed over different + * Spark versions. + */ +@Experimental +@InterfaceStability.Unstable +public interface HashPartitionPushDownSupport { + /** + * Returns true if the implementation can handle this hash partitioning requirement and save a + * shuffle at Spark side. The hash function is defined as: constructing a + * {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} with the values of the given + * partition columns, and call its `hashCode()` method. + */ + boolean pushDownHashPartition(String[] partitionColumns); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/LimitPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/LimitPushDownSupport.java new file mode 100644 index 000000000000..32ce32a5d338 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/LimitPushDownSupport.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * "limit" operation to the data source and reduce the size of the data to be read. + */ +public interface LimitPushDownSupport { + /** + * Returns true if the implementation can handle the limit operation, so that we can reduce + * the data size to be read at the very beginning. + */ + boolean pushDownLimit(int limit); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java new file mode 100644 index 000000000000..309520847fe4 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Serializable; + +/** + * A read task returned by a data source reader and is responsible for outputting data for an RDD + * partition. + */ +public interface ReadTask extends Serializable { + /** + * The preferred locations for this read task to run faster, but Spark can't guarantee that this + * task will always run on these locations. Implementations should make sure that it can + * be run on any location. + */ + default String[] preferredLocations() { + return new String[0]; + } + + /** + * This method will be called before running this read task, users can overwrite this method + * and put initialization logic here. + */ + default void open() {} + + /** + * Proceed to next record, returns false if there is no more records. + */ + boolean next(); + + /** + * Return the current record. This method should return same value until `next` is called. + */ + T get(); + + /** + * This method will be called after finishing this read task, users can overwrite this method + * and put clean up logic here. + */ + default void close() {} +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/RowToUnsafeRowReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/RowToUnsafeRowReadTask.java new file mode 100644 index 000000000000..860720e958bd --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/RowToUnsafeRowReadTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; + +public class RowToUnsafeRowReadTask implements ReadTask { + private final ReadTask rowGenerator; + private final StructType schema; + + private ExpressionEncoder encoder; + + public RowToUnsafeRowReadTask(ReadTask rowGenerator, StructType schema) { + this.rowGenerator = rowGenerator; + this.schema = schema; + } + + @Override + public String[] preferredLocations() { + return rowGenerator.preferredLocations(); + } + + @Override + public void open() { + rowGenerator.open(); + encoder = RowEncoder.apply(schema); + } + + @Override + public boolean next() { + return rowGenerator.next(); + } + + @Override + public UnsafeRow get() { + return (UnsafeRow) encoder.toRow(rowGenerator.get()); + } + + @Override + public void close() { + rowGenerator.close(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SamplePushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SamplePushDownSupport.java new file mode 100644 index 000000000000..15bf1c492a3d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SamplePushDownSupport.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * "sample" operation to the data source and reduce the size of the data to be read. + */ +public interface SamplePushDownSupport { + /** + * Returns true if the implementation can handle this sample operation, so that we can reduce + * the data size to be read at the very beginning. + * + * TODO: shall we add more parameters like seed, lowerBound, upperBound, etc.? + */ + boolean pushDownSample(double fraction); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java new file mode 100644 index 000000000000..851319d23164 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-sort + * the data and avoid sorting at Spark side. + */ +public interface SortPushDown { + /** + * Returns true if the implementation can handle this sorting requirement and save a sort + * operation at Spark side. + */ + boolean pushDownSort(String[] sortingColumns); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java new file mode 100644 index 000000000000..2bbac4130715 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to propagate + * statistics like sizeInBytes, to Spark. + */ +public interface StatisticsSupport { + // todo: shall we add more statistics? what do we want? + long sizeInBytes(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java new file mode 100644 index 000000000000..3cab49e2cf33 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +/** + * A writer that is responsible for writing data to a writable data source. + * The writing procedure is: + * 1. create the write task, serialize and send it to all the partitions of the input data(RDD). + * 2. for each partition, write the data with the writer. + * 3. for each partition, if all data are written successfully, call writer.commit. + * 4. for each partition, if exception happens during the writing, call writer.abort. + * TODO: shall we introduce a retry mechanism instead of calling `abort` immediately when + * failure happens? + * 5. wait until all the writers are finished, i.e., either commit or abort. + * 6. if all partitions are written successfully, call WritableDataSourceV2.commit. + * 7. if some partitions failed and aborted, call WritableDataSourceV2.abort. + * + * Note that, the implementations are responsible to correctly implement transaction by overwriting + * the `commit` and `abort` methods of the writer and write task. + */ +public interface DataSourceV2Writer { + WriteTask createWriteTask(); + + void commit(WriterCommitMessage[] messages); + + void abort(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteTask.java new file mode 100644 index 000000000000..263041c5a43a --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteTask.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import java.io.Serializable; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; + +/** + * A write task that is responsible for writing the data for each input partition. + */ +public abstract class WriteTask implements Serializable { + private ExpressionEncoder encoder; + + /** + * This method will be called before writing data, users can overwrite this method and put + * initialization logic here. Note that, when overwriting this method, you have to call + * `super.initialize` to correctly do some internal setup for this write task. + */ + public void initialize(StructType schema) { + encoder = RowEncoder.apply(schema); + } + + /** + * The actual write logic should be implemented here. To correctly implement transaction, + * implementations should stage the writing or have a way to rollback. + */ + public abstract void write(Row row); + + /** + * Inside Spark, the data is `UnsafeRow` and will be converted to `Row` before sending it out of + * Spark and writing to data source. To avoid this conversion, implementations can overwrite + * this method and writes `UnsafeRow`s directly. Note that, this is an experimental and unstable + * interface, as `UnsafeRow` is not public and may get changed in future Spark versions. + */ + @Experimental + @InterfaceStability.Unstable + public void write(UnsafeRow row) { + write(encoder.fromRow(row)); + } + + public abstract WriterCommitMessage commit(); + + public abstract void abort(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java new file mode 100644 index 000000000000..6a07c3cd2e01 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import java.io.Serializable; + +/** + * A message returned by `WriteTask.commit`, which will be serialized and sent to the + * driver side, and pass to `DataSourceV2Writer.commit`. + * + * Note that this is an empty interface, concrete `WriteTask` implementations can return any kind of + * `WriterCommitMessage`, its corresponding `DataSourceV2Writer` should cast these messages to + * concrete type in its `commit` method. + */ +public interface WriterCommitMessage extends Serializable {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 10b28ce812af..2a1c94254aa2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -27,12 +27,15 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser} import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2SchemaProvider} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -181,13 +184,40 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap).resolveRelation()) + val cls = DataSource.lookupDataSource(source) + if (classOf[DataSourceV2].isAssignableFrom(cls)) { + // TODO: merge this into `DataSource` + val dataSource = cls.newInstance().asInstanceOf[DataSourceV2] + val options = CaseInsensitiveMap(extraOptions.toMap) + + val schema = dataSource match { + case ds: DataSourceV2SchemaProvider if ds.acceptsUserDefinedSchema => + userSpecifiedSchema.getOrElse(ds.inferSchema(options)) + + case ds: DataSourceV2SchemaProvider => + val inferredSchema = ds.inferSchema(options) + if (userSpecifiedSchema.isDefined && userSpecifiedSchema.get != inferredSchema) { + throw new AnalysisException(s"$ds does not allow user-specified schemas.") + } + inferredSchema + + case _ => + userSpecifiedSchema.getOrElse { + throw new AnalysisException(s"A schema needs to be specified when using $dataSource.") + } + } + + val reader = dataSource.createReader(schema, options) + Dataset.ofRows(sparkSession, DataSourceV2Relation(schema.toAttributes, reader)) + } else { + sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = paths, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap).resolveRelation()) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 74a47da2deef..3713e94c087e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -31,8 +31,6 @@ import org.apache.spark.sql.types.DataType */ private[sql] trait ColumnarBatchScan extends CodegenSupport { - val inMemoryTableScan: InMemoryTableScanExec = null - override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 4e718d609c92..b143d44eae17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy import org.apache.spark.sql.internal.SQLConf class SparkPlanner( @@ -35,6 +36,7 @@ class SparkPlanner( def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( + DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala new file mode 100644 index 000000000000..5b32ddbe9b04 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.reflect.ClassTag + +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.sources.v2.reader.ReadTask + +class DataSourceRDDPartition[T : ClassTag](val index: Int, val generator: ReadTask[T]) + extends Partition with Serializable + +class DataSourceRDD[T: ClassTag]( + sc: SparkContext, + @transient private val generators: java.util.List[ReadTask[T]]) + extends RDD[T](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { + var index = 0 + val iter = generators.iterator() + val res = new Array[Partition](generators.size()) + while (iter.hasNext) { + res(index) = new DataSourceRDDPartition[T](index, iter.next()) + index += 1 + } + res + } + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + val generator = split.asInstanceOf[DataSourceRDDPartition[T]].generator + context.addTaskCompletionListener(_ => generator.close()) + val iter = new Iterator[T] { + generator.open() + + private[this] var valuePrepared = false + + override def hasNext: Boolean = { + if (!valuePrepared) { + valuePrepared = generator.next() + } + valuePrepared + } + + override def next(): T = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + valuePrepared = false + generator.get() + } + } + new InterruptibleIterator(context, iter) + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + split.asInstanceOf[DataSourceRDDPartition[T]].generator.preferredLocations() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala new file mode 100644 index 000000000000..9ce2a5ef71f2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.reader.{DataSourceV2Reader, StatisticsSupport} + +case class DataSourceV2Relation( + output: Seq[AttributeReference], + reader: DataSourceV2Reader) extends LeafNode { + + override def computeStats(): Statistics = reader match { + case r: StatisticsSupport => Statistics(sizeInBytes = r.sizeInBytes()) + case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala new file mode 100644 index 000000000000..d6260b4bed66 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, ExpressionSet} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} +import org.apache.spark.sql.sources.v2.reader.{ColumnarReadSupport, DataSourceV2Reader} + +case class DataSourceV2ScanExec( + fullOutput: Array[AttributeReference], + @transient reader: DataSourceV2Reader, + // TODO: these 3 parameters are only used to determine the equality of the scan node, however, + // the reader also have this information, and ideally we can just rely on the equality of the + // reader. The only concern is, the reader implementation is outside of Spark and we have no + // control. + requiredColumnsIndex: Seq[Int], + @transient filters: ExpressionSet, + hashPartitionKeys: Seq[String]) extends LeafExecNode with ColumnarBatchScan { + + def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) + + override protected def doExecute(): RDD[InternalRow] = reader match { + case r: ColumnarReadSupport if r.supportsColumnarReads() => + WholeStageCodegenExec(this).execute() + case _ => + val numOutputRows = longMetric("numOutputRows") + inputRDD.map { r => + numOutputRows += 1 + r + } + } + + private lazy val inputRDD: RDD[InternalRow] = { + reader match { + case r: ColumnarReadSupport if r.supportsColumnarReads() => + new DataSourceRDD(sparkContext, r.createColumnarReadTasks()) + .asInstanceOf[RDD[InternalRow]] + case _ => + new DataSourceRDD(sparkContext, reader.createUnsafeRowReadTasks()) + .asInstanceOf[RDD[InternalRow]] + } + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) + + override protected def doProduce(ctx: CodegenContext): String = { + if (reader.isInstanceOf[ColumnarReadSupport]) { + return super.doProduce(ctx) + } + val numOutputRows = metricTerm(ctx, "numOutputRows") + // PhysicalRDD always just has one input + val input = ctx.freshName("input") + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + val exprRows = output.zipWithIndex.map{ case (a, i) => + BoundReference(i, a.dataType, a.nullable) + } + val row = ctx.freshName("row") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columnsRowInput = exprRows.map(_.genCode(ctx)) + s""" + |while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, row).trim} + | if (shouldStop()) return; + |} + """.stripMargin + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala new file mode 100644 index 000000000000..f583dceaba0b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.v2.reader.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport} + +object DataSourceV2Strategy extends Strategy { + // TODO: write path + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalOperation(projects, filters, DataSourceV2Relation(output, reader)) => + val attrMap = AttributeMap(output.zip(output)) + + val projectSet = AttributeSet(projects.flatMap(_.references)) + val filterSet = AttributeSet(filters.flatMap(_.references)) + + // Match original case of attributes. + val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap) + val supportColumnPruning = reader match { + case r: ColumnPruningSupport => + r.pruneColumns(requiredColumns.toStructType) + case _ => false + } + + val stayUpFilters = ListBuffer.empty[Expression] + reader match { + case r: CatalystFilterPushDownSupport => + for (filter <- filters) { + if (!r.pushDownCatalystFilter(filter)) { + stayUpFilters += filter + } + } + case r: FilterPushDownSupport => + for (filter <- filters) { + val publicFilter = DataSourceStrategy.translateFilter(filter) + if (publicFilter.isEmpty) { + stayUpFilters += filter + } else if (!r.pushDownFilter(publicFilter.get)) { + stayUpFilters += filter + } + } + case _ => + } + + val scan = DataSourceV2ScanExec( + output.toArray, + reader, + if (supportColumnPruning) requiredColumns.map(output.indexOf) else output.indices, + ExpressionSet(filters), + Nil) + + val filterCondition = filters.reduceLeftOption(And) + val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + + val withProject = if (projects == withFilter.output) { + withFilter + } else { + ProjectExec(projects, withFilter) + } + + // TODO: support hash partitioning push down. + + withProject :: Nil + + case _ => Nil + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java new file mode 100644 index 000000000000..d8b7a7246374 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.*; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.GreaterThan; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2SchemaProvider; +import org.apache.spark.sql.sources.v2.reader.ColumnPruningSupport; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.FilterPushDownSupport; +import org.apache.spark.sql.types.StructType; + +public class JavaAdvancedDataSourceV2 implements DataSourceV2, DataSourceV2SchemaProvider { + class Reader extends DataSourceV2Reader implements ColumnPruningSupport, FilterPushDownSupport { + private StructType requiredSchema; + private List filters = new LinkedList<>(); + + Reader(StructType schema) { + this.requiredSchema = schema; + } + + @Override + public StructType readSchema() { + return requiredSchema; + } + + @Override + public boolean pruneColumns(StructType requiredSchema) { + this.requiredSchema = requiredSchema; + return true; + } + + @Override + public boolean pushDownFilter(Filter filter) { + this.filters.add(filter); + return true; + } + + @Override + public List> createReadTasks() { + List> res = new ArrayList<>(); + + Integer lowerBound = null; + for (Filter filter : filters) { + if (filter instanceof GreaterThan) { + GreaterThan f = (GreaterThan) filter; + if ("i".equals(f.attribute()) && f.value() instanceof Integer) { + lowerBound = (Integer) f.value(); + break; + } + } + } + + if (lowerBound == null) { + res.add(new JavaAdvancedReadTask(0, 5, requiredSchema)); + res.add(new JavaAdvancedReadTask(5, 10, requiredSchema)); + } else if (lowerBound < 4) { + res.add(new JavaAdvancedReadTask(lowerBound + 1, 5, requiredSchema)); + res.add(new JavaAdvancedReadTask(5, 10, requiredSchema)); + } else if (lowerBound < 9) { + res.add(new JavaAdvancedReadTask(lowerBound + 1, 10, requiredSchema)); + } + + return res; + } + } + + @Override + public StructType inferSchema(CaseInsensitiveMap options) { + return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public DataSourceV2Reader createReader(StructType schema, CaseInsensitiveMap options) { + return new Reader(schema); + } +} + +class JavaAdvancedReadTask implements ReadTask { + private int start; + private int end; + private StructType requiredSchema; + + public JavaAdvancedReadTask(int start, int end, StructType requiredSchema) { + this.start = start - 1; + this.end = end; + this.requiredSchema = requiredSchema; + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public Row get() { + Object[] values = new Object[requiredSchema.size()]; + for (int i = 0; i < values.length; i++) { + if ("i".equals(requiredSchema.apply(i).name())) { + values[i] = start; + } else if ("j".equals(requiredSchema.apply(i).name())) { + values[i] = -start; + } + } + return new GenericRow(values); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java new file mode 100644 index 000000000000..9b4fd76a802e --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2SchemaProvider; +import org.apache.spark.sql.sources.v2.reader.ColumnarReadSupport; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +public class JavaBatchDataSourceV2 implements DataSourceV2, DataSourceV2SchemaProvider { + class Reader extends DataSourceV2Reader implements ColumnarReadSupport { + private final StructType schema; + + Reader(StructType schema) { + this.schema = schema; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createReadTasks() { + throw new RuntimeException("not implemented"); + } + + @Override + public List> createColumnarReadTasks() { + return java.util.Arrays.asList(new JavaColumnarReadTask(schema)); + } + } + + @Override + public StructType inferSchema(CaseInsensitiveMap options) { + return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public DataSourceV2Reader createReader(StructType schema, CaseInsensitiveMap options) { + return new Reader(schema); + } +} + +class JavaColumnarReadTask implements ReadTask { + private final StructType schema; + + public JavaColumnarReadTask(StructType schema) { + this.schema = schema; + } + + private ColumnarBatch batch = null; + private int currentBatch = 0; + + @Override + public boolean next() { + currentBatch += 1; + return currentBatch <= 2; + } + + @Override + public ColumnarBatch get() { + if (batch == null) { + batch = ColumnarBatch.allocate(schema); + } + batch.reset(); + if (currentBatch == 1) { + for (int i = 0; i < 5; i++) { + batch.column(0).putInt(i, i); + batch.column(1).putInt(i, -i); + } + } else { + for (int i = 0; i < 5; i++) { + batch.column(0).putInt(i, i + 5); + batch.column(1).putInt(i, -(i + 5)); + } + } + batch.setNumRows(5); + return batch; + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java new file mode 100644 index 000000000000..7a564a014c2e --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2SchemaProvider; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleDataSourceV2 implements DataSourceV2, DataSourceV2SchemaProvider { + class Reader extends DataSourceV2Reader { + private final StructType schema; + + Reader(StructType schema) { + this.schema = schema; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createReadTasks() { + return java.util.Arrays.asList( + new JavaSimpleReadTask(0, 5), + new JavaSimpleReadTask(5, 10)); + } + } + + @Override + public StructType inferSchema(CaseInsensitiveMap options) { + return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public DataSourceV2Reader createReader(StructType schema, CaseInsensitiveMap options) { + return new Reader(schema); + } +} + +class JavaSimpleReadTask implements ReadTask { + private int start; + private int end; + + public JavaSimpleReadTask(int start, int end) { + this.start = start - 1; + this.end = end; + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public Row get() { + return new GenericRow(new Object[] {start, -start}); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java new file mode 100644 index 000000000000..f573933ed399 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2SchemaProvider; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, DataSourceV2SchemaProvider { + class Reader extends DataSourceV2Reader { + private final StructType schema; + + Reader(StructType schema) { + this.schema = schema; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createReadTasks() { + throw new IllegalStateException(); + } + + @Override + public List> createUnsafeRowReadTasks() { + return java.util.Arrays.asList( + new JavaUnsafeRowReadTask(0, 5), + new JavaUnsafeRowReadTask(5, 10)); + } + } + + @Override + public StructType inferSchema(CaseInsensitiveMap options) { + return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public DataSourceV2Reader createReader(StructType schema, CaseInsensitiveMap options) { + return new Reader(schema); + } +} + +class JavaUnsafeRowReadTask implements ReadTask { + private int start; + private int end; + private UnsafeRow row; + + public JavaUnsafeRowReadTask(int start, int end) { + this.start = start - 1; + this.end = end; + this.row = new UnsafeRow(2); + row.pointTo(new byte[8 * 3], 8 * 3); + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public UnsafeRow get() { + row.setInt(0, start); + row.setInt(1, -start); + return row; + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala new file mode 100644 index 000000000000..9c227c349f5b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -0,0 +1,260 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.sources.v2 + +import java.util.{ArrayList, List => JList} + +import scala.collection.mutable.ListBuffer + +import test.org.apache.spark.sql.sources.v2._ + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.vectorized.ColumnarBatch +import org.apache.spark.sql.sources.{Filter, GreaterThan} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DataSourceV2Suite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("simplest implementation") { + Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) + } + } + } + + test("advanced implementation") { + Seq(classOf[AdvancedDataSourceV2], classOf[JavaAdvancedDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 3), (4 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j).filter('i > 6), (7 until 10).map(i => Row(-i))) + checkAnswer(df.select('i).filter('i > 10), Nil) + } + } + } + + test("unsafe row implementation") { + Seq(classOf[UnsafeRowDataSourceV2], classOf[JavaUnsafeRowDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) + } + } + } + + test("batched implementation") { + Seq(classOf[BatchDataSourceV2], classOf[JavaBatchDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) + } + } + } +} + +class SimpleDataSourceV2 extends DataSourceV2 with DataSourceV2SchemaProvider { + class Reader(val readSchema: StructType) extends DataSourceV2Reader { + override def createReadTasks(): JList[ReadTask[Row]] = { + java.util.Arrays.asList(new SimpleReadTask(0, 5), new SimpleReadTask(5, 10)) + } + } + + override def inferSchema(options: CaseInsensitiveMap[String]): StructType = { + new StructType().add("i", "int").add("j", "int") + } + + override def createReader( + schema: StructType, + options: CaseInsensitiveMap[String]): DataSourceV2Reader = new Reader(schema) +} + +class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] { + private var current = start - 1 + override def next(): Boolean = { + current += 1 + current < end + } + override def get(): Row = Row(current, -current) +} + + + +class AdvancedDataSourceV2 extends DataSourceV2 with DataSourceV2SchemaProvider { + class Reader(fullSchema: StructType) extends DataSourceV2Reader + with ColumnPruningSupport with FilterPushDownSupport { + + var requiredSchema = fullSchema + val filters = ListBuffer.empty[Filter] + + override def pruneColumns(requiredSchema: StructType): Boolean = { + this.requiredSchema = requiredSchema + true + } + + override def pushDownFilter(filter: Filter): Boolean = { + this.filters += filter + true + } + + override def readSchema(): StructType = { + requiredSchema + } + + override def createReadTasks(): JList[ReadTask[Row]] = { + val lowerBound = filters.collect { + case GreaterThan("i", v: Int) => v + }.headOption + + val res = new ArrayList[ReadTask[Row]] + + if (lowerBound.isEmpty) { + res.add(new AdvancedReadTask(0, 5, requiredSchema)) + res.add(new AdvancedReadTask(5, 10, requiredSchema)) + } else if (lowerBound.get < 4) { + res.add(new AdvancedReadTask(lowerBound.get + 1, 5, requiredSchema)) + res.add(new AdvancedReadTask(5, 10, requiredSchema)) + } else if (lowerBound.get < 9) { + res.add(new AdvancedReadTask(lowerBound.get + 1, 10, requiredSchema)) + } + + res + } + } + + override def inferSchema(options: CaseInsensitiveMap[String]): StructType = { + new StructType().add("i", "int").add("j", "int") + } + + override def createReader( + schema: StructType, + options: CaseInsensitiveMap[String]): DataSourceV2Reader = new Reader(schema) +} + +class AdvancedReadTask(start: Int, end: Int, requiredSchema: StructType) + extends ReadTask[Row] { + + private var current = start - 1 + override def next(): Boolean = { + current += 1 + current < end + } + + override def get(): Row = { + val values = requiredSchema.map(_.name).map { + case "i" => current + case "j" => -current + } + Row.fromSeq(values) + } +} + + + +class UnsafeRowDataSourceV2 extends DataSourceV2 with DataSourceV2SchemaProvider { + class Reader(val readSchema: StructType) extends DataSourceV2Reader { + override def createUnsafeRowReadTasks(): JList[ReadTask[UnsafeRow]] = { + java.util.Arrays.asList(new UnsafeRowReadTask(0, 5), new UnsafeRowReadTask(5, 10)) + } + + override def createReadTasks(): JList[ReadTask[Row]] = throw new IllegalStateException() + } + + override def inferSchema(options: CaseInsensitiveMap[String]): StructType = { + new StructType().add("i", "int").add("j", "int") + } + + override def createReader( + schema: StructType, + options: CaseInsensitiveMap[String]): DataSourceV2Reader = new Reader(schema) +} + +class UnsafeRowReadTask(start: Int, end: Int) extends ReadTask[UnsafeRow] { + private val row = new UnsafeRow(2) + row.pointTo(new Array[Byte](8 * 3), 8 * 3) + + private var current = start - 1 + override def next(): Boolean = { + current += 1 + current < end + } + override def get(): UnsafeRow = { + row.setInt(0, current) + row.setInt(1, -current) + row + } +} + + + +class BatchDataSourceV2 extends DataSourceV2 with DataSourceV2SchemaProvider { + class Reader(val readSchema: StructType) extends DataSourceV2Reader with ColumnarReadSupport { + override def createReadTasks(): JList[ReadTask[Row]] = { + throw new NotImplementedError() + } + + override def createColumnarReadTasks(): JList[ReadTask[ColumnarBatch]] = { + java.util.Arrays.asList(new ColumnarReadTask(readSchema)) + } + } + + override def inferSchema(options: CaseInsensitiveMap[String]): StructType = { + new StructType().add("i", "int").add("j", "int") + } + + override def createReader( + schema: StructType, + options: CaseInsensitiveMap[String]): DataSourceV2Reader = new Reader(schema) +} + +class ColumnarReadTask(schema: StructType) extends ReadTask[ColumnarBatch] { + private lazy val batch = ColumnarBatch.allocate(schema) + private var currentBatch = 0 + + override def next(): Boolean = { + currentBatch += 1 + currentBatch <= 2 + } + + override def get(): ColumnarBatch = { + batch.reset() + if (currentBatch == 1) { + batch.column(0).putInts(0, 5, 0.until(5).toArray, 0) + batch.column(1).putInts(0, 5, 0.until(5).map(i => -i).toArray, 0) + } else { + batch.column(0).putInts(0, 5, 5.until(10).toArray, 0) + batch.column(1).putInts(0, 5, 5.until(10).map(i => -i).toArray, 0) + } + batch.setNumRows(5) + batch + } +} From 341f821574126d930d0c42f46fc71d0b1cd4a027 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 18 Aug 2017 02:08:13 +0800 Subject: [PATCH 2/4] replace HashPartitionPushDown with ClusteringPushDown --- ...rt.java => ClusteringPushDownSupport.java} | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{HashPartitionPushDownSupport.java => ClusteringPushDownSupport.java} (54%) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/HashPartitionPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java similarity index 54% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/HashPartitionPushDownSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java index 1fb349951162..6251682d52ce 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/HashPartitionPushDownSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java @@ -17,25 +17,16 @@ package org.apache.spark.sql.sources.v2.reader; -import org.apache.spark.annotation.Experimental; -import org.apache.spark.annotation.InterfaceStability; /** - * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-partition + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-clustering * the data and avoid shuffle at Spark side. - * - * Note that this interface is marked as unstable, as the implementation needs to be consistent - * with the Spark SQL shuffle hash function, which is internal and may get changed over different - * Spark versions. */ -@Experimental -@InterfaceStability.Unstable -public interface HashPartitionPushDownSupport { +public interface ClusteringPushDownSupport { /** - * Returns true if the implementation can handle this hash partitioning requirement and save a - * shuffle at Spark side. The hash function is defined as: constructing a - * {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} with the values of the given - * partition columns, and call its `hashCode()` method. + * Returns true if the implementation can handle this clustering requirement and save a shuffle + * at Spark side. Clustering means, if two records have same values for the given clustering + * columns, they must be produced by the same read task. */ - boolean pushDownHashPartition(String[] partitionColumns); + boolean pushDownClustering(String[] clusteringColumns); } From 75b1115ebc2cc6abc58ea5d5299dc2c3e9aa5768 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 22 Aug 2017 23:25:23 +0800 Subject: [PATCH 3/4] improve clustering push down --- .../v2/reader/ClusteringPushDownSupport.java | 14 +++++-- .../sql/sources/v2/reader/Partitioning.java | 38 ++++++++++++++++++ .../v2/reader/SparkHashPartitioning.java | 40 +++++++++++++++++++ 3 files changed, 88 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SparkHashPartitioning.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java index 6251682d52ce..662650b02bb4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java @@ -24,9 +24,15 @@ */ public interface ClusteringPushDownSupport { /** - * Returns true if the implementation can handle this clustering requirement and save a shuffle - * at Spark side. Clustering means, if two records have same values for the given clustering - * columns, they must be produced by the same read task. + * Returns a non-null `Partitioning` if the implementation can handle this clustering requirement + * and save a shuffle at Spark side. Clustering means, if two records have same values for the + * given clustering columns, they must be produced by the same read task. */ - boolean pushDownClustering(String[] clusteringColumns); + Partitioning pushDownClustering(String[] clusteringColumns); + + /** + * Cancel this clustering push down. This will be called if Spark finds out that we can't avoid + * the shuffle after we push down the clustering. + */ + void cancel(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java new file mode 100644 index 000000000000..76c5240087dc --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * An interface to describe how the data is partitioned. This is returned by + * `ClusteringPushDownSupport.pushDownClustering`, and data source implementations must guarantee + * that the returned partitioning satisfies the clustering requirement. + */ +public interface Partitioning { + /** + * Returns true if this partitioning is compatible with the other partitioning. Think about + * joining 2 data sources, even if both these 2 data sources satisfy the clustering requirement, + * we still can not join them because they are uncompatible, e.g. different number of partitions, + * different partitioner, etc. + */ + boolean compatibleWith(Partitioning other); + + /** + * The number of read tasks that will be used to produce data. + */ + int numPartitions(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SparkHashPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SparkHashPartitioning.java new file mode 100644 index 000000000000..e3615d5bfe10 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SparkHashPartitioning.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +public class SparkHashPartitioning implements Partitioning { + private int numPartitions; + + public SparkHashPartitioning(int numPartitions) { + this.numPartitions = numPartitions; + } + + @Override + public boolean compatibleWith(Partitioning other) { + if (other instanceof SparkHashPartitioning) { + return this.numPartitions() == other.numPartitions(); + } else { + return other.compatibleWith(this); + } + } + + @Override + public int numPartitions() { + return numPartitions; + } +} From a29031db10b4c0a2dd87d7f033068feb0f64d14a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 31 Aug 2017 21:23:42 +0800 Subject: [PATCH 4/4] address comments --- .../reader/CatalystFilterPushDownSupport.java | 9 +-- .../v2/reader/ColumnarReadSupport.java | 3 + .../sql/sources/v2/reader/DataReader.java | 27 ++++++++ .../sources/v2/reader/DataSourceV2Reader.java | 53 +++++++++++++++- .../v2/reader/FilterPushDownSupport.java | 9 +-- .../spark/sql/sources/v2/reader/ReadTask.java | 26 +------- .../v2/reader/RowToUnsafeRowReadTask.java | 62 ------------------- .../sql/sources/v2/reader/SortPushDown.java | 2 +- ...kHashPartitioning.java => Statistics.java} | 24 ++----- .../sources/v2/reader/StatisticsSupport.java | 3 +- .../distribution/ClusteredDistribution.java | 34 ++++++++++ .../v2/reader/distribution/Distribution.java | 25 ++++++++ .../DistributionSupport.java} | 25 ++++---- .../reader/distribution/NoPartitioning.java | 17 +++++ .../{ => distribution}/Partitioning.java | 13 ++-- .../v2/{ => writer}/BucketingSupport.java | 8 +-- .../v2/{ => writer}/PartitioningSupport.java | 6 +- .../datasources/v2/DataSourceV2Strategy.scala | 42 +++++++------ 18 files changed, 217 insertions(+), 171 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/RowToUnsafeRowReadTask.java rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{SparkHashPartitioning.java => Statistics.java} (63%) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/ClusteredDistribution.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Distribution.java rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{ClusteringPushDownSupport.java => distribution/DistributionSupport.java} (56%) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/NoPartitioning.java rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/{ => distribution}/Partitioning.java (58%) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/{ => writer}/BucketingSupport.java (75%) rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/{ => writer}/PartitioningSupport.java (79%) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java index f9bda2ced3ac..6f07e83a1fbf 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java @@ -29,12 +29,7 @@ @InterfaceStability.Unstable public interface CatalystFilterPushDownSupport { /** - * Push down one filter, returns true if this filter can be pushed down to this data source, - * false otherwise. This method might be called many times if more than one filter need to be - * pushed down. - * - * TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which - * returns unsupported filters. + * Push down filters, returns unsupported filters. */ - boolean pushDownCatalystFilter(Expression filter); + Expression[] pushDownCatalystFilters(Expression[] filters); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java index 62b3a5172331..b1e5cb85ec8b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java @@ -35,6 +35,9 @@ public interface ColumnarReadSupport { * A safety door for columnar reader. It's possible that the implementation can only support * columnar reads for some certain columns, users can overwrite this method to fallback to * normal read path under some conditions. + * + * Note that, if the implementation always return true here, then he can throw exception in + * the row based `DataSourceV2Reader.createReadTasks`, as it will never be called. */ default boolean supportsColumnarReads() { return true; diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java new file mode 100644 index 000000000000..c4cb98b8a1f6 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Closeable; +import java.util.Iterator; + +/** + * A data reader returned by a read task and is responsible for outputting data for an RDD + * partition. + */ +public interface DataReader extends Iterator, Closeable {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java index eeebacb5d0d7..4a795136905e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java @@ -17,12 +17,15 @@ package org.apache.spark.sql.sources.v2.reader; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.types.StructType; @@ -47,13 +50,16 @@ public abstract class DataSourceV2Reader { * output. */ // TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row. - public abstract List> createReadTasks(); + protected abstract List> createReadTasks(); /** * Inside Spark, the input rows will be converted to `UnsafeRow`s before processing. To avoid * this conversion, implementations can overwrite this method and output `UnsafeRow`s directly. * Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and * may get changed in future Spark versions. + * + * Note that, if the implement overwrites this method, he should also overwrite `createReadTasks` + * to throw exception, as it will never be called. */ @Experimental @InterfaceStability.Unstable @@ -64,3 +70,48 @@ public List> createUnsafeRowReadTasks() { .collect(Collectors.toList()); } } + +class RowToUnsafeRowReadTask implements ReadTask { + private final ReadTask rowReadTask; + private final StructType schema; + + RowToUnsafeRowReadTask(ReadTask rowReadTask, StructType schema) { + this.rowReadTask = rowReadTask; + this.schema = schema; + } + + @Override + public String[] preferredLocations() { + return rowReadTask.preferredLocations(); + } + + @Override + public DataReader getReader() { + return new RowToUnsafeDataReader(rowReadTask.getReader(), RowEncoder.apply(schema)); + } +} + +class RowToUnsafeDataReader implements DataReader { + private final DataReader rowReader; + private final ExpressionEncoder encoder; + + RowToUnsafeDataReader(DataReader rowReader, ExpressionEncoder encoder) { + this.rowReader = rowReader; + this.encoder = encoder; + } + + @Override + public boolean hasNext() { + return rowReader.hasNext(); + } + + @Override + public UnsafeRow next() { + return (UnsafeRow) encoder.toRow(rowReader.next()); + } + + @Override + public void close() throws IOException { + rowReader.close(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java index 9a65ac22f37f..dabb77438f39 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java @@ -25,12 +25,7 @@ */ public interface FilterPushDownSupport { /** - * Push down one filter, returns true if this filter can be pushed down to this data source, - * false otherwise. This method might be called many times if more than one filter need to be - * pushed down. - * - * TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which - * returns unsupported filters. + * Push down filters, returns unsupported filters. */ - boolean pushDownFilter(Filter filter); + Filter[] pushDownFilters(Filter[] filters); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java index 309520847fe4..8dfc46bd9fc8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java @@ -20,8 +20,8 @@ import java.io.Serializable; /** - * A read task returned by a data source reader and is responsible for outputting data for an RDD - * partition. + * A read task returned by a data source reader and is responsible to create the data reader. + * The relationship between `ReadTask` and `DataReader` is similar to `Iterable` and `Iterator`. */ public interface ReadTask extends Serializable { /** @@ -33,25 +33,5 @@ default String[] preferredLocations() { return new String[0]; } - /** - * This method will be called before running this read task, users can overwrite this method - * and put initialization logic here. - */ - default void open() {} - - /** - * Proceed to next record, returns false if there is no more records. - */ - boolean next(); - - /** - * Return the current record. This method should return same value until `next` is called. - */ - T get(); - - /** - * This method will be called after finishing this read task, users can overwrite this method - * and put clean up logic here. - */ - default void close() {} + DataReader getReader(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/RowToUnsafeRowReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/RowToUnsafeRowReadTask.java deleted file mode 100644 index 860720e958bd..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/RowToUnsafeRowReadTask.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; -import org.apache.spark.sql.catalyst.encoders.RowEncoder; -import org.apache.spark.sql.catalyst.expressions.UnsafeRow; -import org.apache.spark.sql.types.StructType; - -public class RowToUnsafeRowReadTask implements ReadTask { - private final ReadTask rowGenerator; - private final StructType schema; - - private ExpressionEncoder encoder; - - public RowToUnsafeRowReadTask(ReadTask rowGenerator, StructType schema) { - this.rowGenerator = rowGenerator; - this.schema = schema; - } - - @Override - public String[] preferredLocations() { - return rowGenerator.preferredLocations(); - } - - @Override - public void open() { - rowGenerator.open(); - encoder = RowEncoder.apply(schema); - } - - @Override - public boolean next() { - return rowGenerator.next(); - } - - @Override - public UnsafeRow get() { - return (UnsafeRow) encoder.toRow(rowGenerator.get()); - } - - @Override - public void close() { - rowGenerator.close(); - } -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java index 851319d23164..6415d183d64f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java @@ -26,5 +26,5 @@ public interface SortPushDown { * Returns true if the implementation can handle this sorting requirement and save a sort * operation at Spark side. */ - boolean pushDownSort(String[] sortingColumns); + boolean pushDownSort(String[] sortingColumns, boolean asc, boolean nullFirst); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SparkHashPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java similarity index 63% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SparkHashPartitioning.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java index e3615d5bfe10..a4da0d11bea0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SparkHashPartitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java @@ -17,24 +17,10 @@ package org.apache.spark.sql.sources.v2.reader; -public class SparkHashPartitioning implements Partitioning { - private int numPartitions; +import java.util.OptionalLong; - public SparkHashPartitioning(int numPartitions) { - this.numPartitions = numPartitions; - } - - @Override - public boolean compatibleWith(Partitioning other) { - if (other instanceof SparkHashPartitioning) { - return this.numPartitions() == other.numPartitions(); - } else { - return other.compatibleWith(this); - } - } - - @Override - public int numPartitions() { - return numPartitions; - } +public interface Statistics { + OptionalLong getSize(); + OptionalLong getRows(); + OptionalLong getDistinctValues(String columnName); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java index 2bbac4130715..b70e9ce62d1e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java @@ -22,6 +22,5 @@ * statistics like sizeInBytes, to Spark. */ public interface StatisticsSupport { - // todo: shall we add more statistics? what do we want? - long sizeInBytes(); + Statistics getStatistics(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/ClusteredDistribution.java new file mode 100644 index 000000000000..098557e9fefe --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/ClusteredDistribution.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.distribution; + +/** + * Represents a distribution where records that share the same values for the `clusteringColumns` + * will be co-located, which means, they will be produced by the same `ReadTask`. + */ +public class ClusteredDistribution { + private String[] clusteringColumns; + + public ClusteredDistribution(String[] clusteringColumns) { + this.clusteringColumns = clusteringColumns; + } + + public String[] getClusteringColumns() { + return clusteringColumns; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Distribution.java new file mode 100644 index 000000000000..4486f5268e7e --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Distribution.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.distribution; + +/** + * Specifies how data should be distributed when a query is executed in parallel on many machines. + * + * Current implementations: `ClusteredDistribution`. + */ +public interface Distribution {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/DistributionSupport.java similarity index 56% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/DistributionSupport.java index 662650b02bb4..272e1167933b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteringPushDownSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/DistributionSupport.java @@ -15,24 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader; - +package org.apache.spark.sql.sources.v2.reader.distribution; /** - * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-clustering - * the data and avoid shuffle at Spark side. + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to report the + * output partitioning, to avoid shuffle at Spark side if the output partitioning can satisfy the + * distribution requirement. */ -public interface ClusteringPushDownSupport { +public interface DistributionSupport { /** - * Returns a non-null `Partitioning` if the implementation can handle this clustering requirement - * and save a shuffle at Spark side. Clustering means, if two records have same values for the - * given clustering columns, they must be produced by the same read task. + * Returns an array of partitionings this data source can output. Spark will pick one partitioning + * that can avoid shuffle, and call `pickPartitioning` to notify the data source which + * partitioning was picked. Note that, if none of the partitions can help to avoid shuffle, + * `NoPartitioning` will be passed to `pickPartitioning`. */ - Partitioning pushDownClustering(String[] clusteringColumns); + Partitioning[] getPartitionings(); - /** - * Cancel this clustering push down. This will be called if Spark finds out that we can't avoid - * the shuffle after we push down the clustering. - */ - void cancel(); + void pickPartitioning(Partitioning p); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/NoPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/NoPartitioning.java new file mode 100644 index 000000000000..fb338eb871c4 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/NoPartitioning.java @@ -0,0 +1,17 @@ +package org.apache.spark.sql.sources.v2.reader.distribution; + +/** + * Represents a partitioning that guarantees nothing, which means it can not satisfy any + * distributions. + */ +public class NoPartitioning implements Partitioning { + @Override + public boolean satisfies(Distribution distribution) { + return false; + } + + @Override + public int numPartitions() { + return 0; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Partitioning.java similarity index 58% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Partitioning.java index 76c5240087dc..115d19ee8fc6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Partitioning.java @@ -15,21 +15,16 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2.reader; +package org.apache.spark.sql.sources.v2.reader.distribution; /** - * An interface to describe how the data is partitioned. This is returned by - * `ClusteringPushDownSupport.pushDownClustering`, and data source implementations must guarantee - * that the returned partitioning satisfies the clustering requirement. + * An interface to describe how the data is physically partitioned. */ public interface Partitioning { /** - * Returns true if this partitioning is compatible with the other partitioning. Think about - * joining 2 data sources, even if both these 2 data sources satisfy the clustering requirement, - * we still can not join them because they are uncompatible, e.g. different number of partitions, - * different partitioner, etc. + * Returns true if this partitioning can satisfy the given distribution requirement. */ - boolean compatibleWith(Partitioning other); + boolean satisfies(Distribution distribution); /** * The number of read tasks that will be used to produce data. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BucketingSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BucketingSupport.java similarity index 75% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/BucketingSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BucketingSupport.java index f66ea10278fd..639959242639 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BucketingSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BucketingSupport.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2; +package org.apache.spark.sql.sources.v2.writer; /** - * A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this - * interface to specify the bucket/sort columns of the reader/writer, to improve performance. + * A mix in interface for `DataSourceV2Writer`. Users can implement this interface to allow Spark + * to specify bucket/sort columns for writing data into this data source. */ public interface BucketingSupport { void setBucketColumns(String[] bucketColumns, int numBuckets); - void setSortColumns(String[] sortColumns); + void setSortColumns(String[] sortColumns, boolean asc, boolean nullFirst); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/PartitioningSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/PartitioningSupport.java similarity index 79% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/PartitioningSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/PartitioningSupport.java index b39d839eaacb..bc20c2ede5c8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/PartitioningSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/PartitioningSupport.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2; +package org.apache.spark.sql.sources.v2.writer; /** - * A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this - * interface to specify the partition columns of the reader/writer, to improve performance. + * A mix in interface for `DataSourceV2Writer`. Users can implement this interface to allow Spark + * to specify partition columns for writing data into this data source. */ public interface PartitioningSupport { void setPartitionColumns(String[] partitionColumns); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f583dceaba0b..1826f64f25d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.execution.datasources.v2 -import scala.collection.mutable.ListBuffer - import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.reader.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport} object DataSourceV2Strategy extends Strategy { @@ -44,24 +43,29 @@ object DataSourceV2Strategy extends Strategy { case _ => false } - val stayUpFilters = ListBuffer.empty[Expression] - reader match { + val stayUpFilters: Seq[Expression] = reader match { case r: CatalystFilterPushDownSupport => - for (filter <- filters) { - if (!r.pushDownCatalystFilter(filter)) { - stayUpFilters += filter - } - } + r.pushDownCatalystFilters(filters.toArray) case r: FilterPushDownSupport => - for (filter <- filters) { - val publicFilter = DataSourceStrategy.translateFilter(filter) - if (publicFilter.isEmpty) { - stayUpFilters += filter - } else if (!r.pushDownFilter(publicFilter.get)) { - stayUpFilters += filter - } - } - case _ => + // A map from original Catalyst expressions to corresponding translated data source filters. + // If a predicate is not in this map, it means it cannot be pushed down. + val translatedMap: Map[Expression, Filter] = filters.flatMap { p => + DataSourceStrategy.translateFilter(p).map(f => p -> f) + }.toMap + + // Catalyst predicate expressions that cannot be converted to data source filters. + val nonconvertiblePredicates = filters.filterNot(translatedMap.contains) + + // Data source filters that cannot be pushed down. An unhandled filter means + // the data source cannot guarantee the rows returned can pass the filter. + // As a result we must return it so Spark can plan an extra filter operator. + val unhandledFilters = r.pushDownFilters(translatedMap.values.toArray).toSet + val unhandledPredicates = translatedMap.filter { case (_, f) => + unhandledFilters.contains(f) + }.keys + + nonconvertiblePredicates ++ unhandledPredicates + case _ => filters } val scan = DataSourceV2ScanExec( @@ -71,7 +75,7 @@ object DataSourceV2Strategy extends Strategy { ExpressionSet(filters), Nil) - val filterCondition = filters.reduceLeftOption(And) + val filterCondition = stayUpFilters.reduceLeftOption(And) val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) val withProject = if (projects == withFilter.output) {