diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java new file mode 100644 index 000000000000..a504b25a439f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +/** + * The main interface and minimal requirement for data source v2 implementations. Users can mix in + * more interfaces to implement more functions other than just scan. + */ +public interface DataSourceV2 { + + /** + * The main entrance for read interface. + * + * @param schema the full schema of this data source reader. Full schema usually maps to the + * physical schema of the underlying storage of this data source reader, e.g. + * parquet files, JDBC tables, etc, while this reader may not read data with full + * schema, as column pruning or other optimizations may happen. + * @param options the options for this data source reader, which is case insensitive. + * @return a reader that implements the actual read logic. + */ + DataSourceV2Reader createReader( + StructType schema, + CaseInsensitiveMap options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2SchemaProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2SchemaProvider.java new file mode 100644 index 000000000000..8d61b38edf56 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2SchemaProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.types.StructType; + +/** + * A mix in interface for `DataSourceV2`. Users can implement this interface to provide schema + * inference ability when scanning data. + */ +public interface DataSourceV2SchemaProvider { + /** + * Return the inferred schema of this data source given these options. + */ + StructType inferSchema(CaseInsensitiveMap options); + + /** + * Whether or not this data source can accept user specified schema. When Spark scans a data + * source, users can specify the schema to avoid expensive schema inference. However some data + * sources may have to infer the schema and reject any user specified schemas, they can overwrite + * this method to achieve this. + */ + default boolean acceptsUserDefinedSchema() { + return true; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WritableDataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WritableDataSourceV2.java new file mode 100644 index 000000000000..e3d46907ebc2 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WritableDataSourceV2.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; + +/** + * A mix in interface for `DataSourceV2`. Users can implement this interface to provide data writing + * ability with job-level transaction. + */ +public interface WritableDataSourceV2 extends DataSourceV2 { + + /** + * The main entrance for write interface. + * + * @param mode the save move, can be append, overwrite, etc. + * @param options the options for this data source writer. + * @return a writer that implements the actual write logic. + */ + DataSourceV2Writer createWriter(SaveMode mode, CaseInsensitiveMap options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java new file mode 100644 index 000000000000..6f07e83a1fbf --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/CatalystFilterPushDownSupport.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.expressions.Expression; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * arbitrary expressions as predicates to the data source. + */ +@Experimental +@InterfaceStability.Unstable +public interface CatalystFilterPushDownSupport { + /** + * Push down filters, returns unsupported filters. + */ + Expression[] pushDownCatalystFilters(Expression[] filters); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnPruningSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnPruningSupport.java new file mode 100644 index 000000000000..6f0cb8cd4a19 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnPruningSupport.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.types.StructType; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to only read + * required columns/nested fields during scan. + */ +public interface ColumnPruningSupport { + /** + * Returns true if the implementation can apple this column pruning optimization, so that we can + * reduce the data size to be read at the very beginning. + */ + boolean pruneColumns(StructType requiredSchema); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java new file mode 100644 index 000000000000..b1e5cb85ec8b --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ColumnarReadSupport.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.util.List; + +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to provide + * columnar read ability for better performance. + */ +public interface ColumnarReadSupport { + /** + * Similar to `DataSourceV2Reader.createReadTasks`, but return data in columnar format. + */ + List> createColumnarReadTasks(); + + /** + * A safety door for columnar reader. It's possible that the implementation can only support + * columnar reads for some certain columns, users can overwrite this method to fallback to + * normal read path under some conditions. + * + * Note that, if the implementation always return true here, then he can throw exception in + * the row based `DataSourceV2Reader.createReadTasks`, as it will never be called. + */ + default boolean supportsColumnarReads() { + return true; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java new file mode 100644 index 000000000000..c4cb98b8a1f6 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Closeable; +import java.util.Iterator; + +/** + * A data reader returned by a read task and is responsible for outputting data for an RDD + * partition. + */ +public interface DataReader extends Iterator, Closeable {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java new file mode 100644 index 000000000000..4a795136905e --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; + +/** + * The main interface and minimal requirement for a data source reader. The implementations should + * at least implement the full scan logic, users can mix in more interfaces to implement scan + * optimizations like column pruning, filter push down, etc. + */ +public abstract class DataSourceV2Reader { + + /** + * The actual schema of this data source reader, which may be different from the physical schema + * of the underlying storage, as column pruning or other optimizations may happen. + */ + public abstract StructType readSchema(); + + /** + * The actual read logic should be implemented here. This may not be a full scan as optimizations + * may have already been applied on this reader. Implementations should return a list of + * read tasks, each task is responsible to output data for one RDD partition, which means + * the number of tasks returned here will be same as the number of RDD partitions this scan + * output. + */ + // TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row. + protected abstract List> createReadTasks(); + + /** + * Inside Spark, the input rows will be converted to `UnsafeRow`s before processing. To avoid + * this conversion, implementations can overwrite this method and output `UnsafeRow`s directly. + * Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and + * may get changed in future Spark versions. + * + * Note that, if the implement overwrites this method, he should also overwrite `createReadTasks` + * to throw exception, as it will never be called. + */ + @Experimental + @InterfaceStability.Unstable + public List> createUnsafeRowReadTasks() { + StructType schema = readSchema(); + return createReadTasks().stream() + .map(rowGenerator -> new RowToUnsafeRowReadTask(rowGenerator, schema)) + .collect(Collectors.toList()); + } +} + +class RowToUnsafeRowReadTask implements ReadTask { + private final ReadTask rowReadTask; + private final StructType schema; + + RowToUnsafeRowReadTask(ReadTask rowReadTask, StructType schema) { + this.rowReadTask = rowReadTask; + this.schema = schema; + } + + @Override + public String[] preferredLocations() { + return rowReadTask.preferredLocations(); + } + + @Override + public DataReader getReader() { + return new RowToUnsafeDataReader(rowReadTask.getReader(), RowEncoder.apply(schema)); + } +} + +class RowToUnsafeDataReader implements DataReader { + private final DataReader rowReader; + private final ExpressionEncoder encoder; + + RowToUnsafeDataReader(DataReader rowReader, ExpressionEncoder encoder) { + this.rowReader = rowReader; + this.encoder = encoder; + } + + @Override + public boolean hasNext() { + return rowReader.hasNext(); + } + + @Override + public UnsafeRow next() { + return (UnsafeRow) encoder.toRow(rowReader.next()); + } + + @Override + public void close() throws IOException { + rowReader.close(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java new file mode 100644 index 000000000000..dabb77438f39 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/FilterPushDownSupport.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.Filter; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * filters to the data source and reduce the size of the data to be read. + */ +public interface FilterPushDownSupport { + /** + * Push down filters, returns unsupported filters. + */ + Filter[] pushDownFilters(Filter[] filters); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/LimitPushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/LimitPushDownSupport.java new file mode 100644 index 000000000000..32ce32a5d338 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/LimitPushDownSupport.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * "limit" operation to the data source and reduce the size of the data to be read. + */ +public interface LimitPushDownSupport { + /** + * Returns true if the implementation can handle the limit operation, so that we can reduce + * the data size to be read at the very beginning. + */ + boolean pushDownLimit(int limit); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java new file mode 100644 index 000000000000..8dfc46bd9fc8 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Serializable; + +/** + * A read task returned by a data source reader and is responsible to create the data reader. + * The relationship between `ReadTask` and `DataReader` is similar to `Iterable` and `Iterator`. + */ +public interface ReadTask extends Serializable { + /** + * The preferred locations for this read task to run faster, but Spark can't guarantee that this + * task will always run on these locations. Implementations should make sure that it can + * be run on any location. + */ + default String[] preferredLocations() { + return new String[0]; + } + + DataReader getReader(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SamplePushDownSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SamplePushDownSupport.java new file mode 100644 index 000000000000..15bf1c492a3d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SamplePushDownSupport.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down + * "sample" operation to the data source and reduce the size of the data to be read. + */ +public interface SamplePushDownSupport { + /** + * Returns true if the implementation can handle this sample operation, so that we can reduce + * the data size to be read at the very beginning. + * + * TODO: shall we add more parameters like seed, lowerBound, upperBound, etc.? + */ + boolean pushDownSample(double fraction); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java new file mode 100644 index 000000000000..6415d183d64f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SortPushDown.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-sort + * the data and avoid sorting at Spark side. + */ +public interface SortPushDown { + /** + * Returns true if the implementation can handle this sorting requirement and save a sort + * operation at Spark side. + */ + boolean pushDownSort(String[] sortingColumns, boolean asc, boolean nullFirst); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java new file mode 100644 index 000000000000..a4da0d11bea0 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.util.OptionalLong; + +public interface Statistics { + OptionalLong getSize(); + OptionalLong getRows(); + OptionalLong getDistinctValues(String columnName); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java new file mode 100644 index 000000000000..b70e9ce62d1e --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/StatisticsSupport.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to propagate + * statistics like sizeInBytes, to Spark. + */ +public interface StatisticsSupport { + Statistics getStatistics(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/ClusteredDistribution.java new file mode 100644 index 000000000000..098557e9fefe --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/ClusteredDistribution.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.distribution; + +/** + * Represents a distribution where records that share the same values for the `clusteringColumns` + * will be co-located, which means, they will be produced by the same `ReadTask`. + */ +public class ClusteredDistribution { + private String[] clusteringColumns; + + public ClusteredDistribution(String[] clusteringColumns) { + this.clusteringColumns = clusteringColumns; + } + + public String[] getClusteringColumns() { + return clusteringColumns; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Distribution.java new file mode 100644 index 000000000000..4486f5268e7e --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Distribution.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.distribution; + +/** + * Specifies how data should be distributed when a query is executed in parallel on many machines. + * + * Current implementations: `ClusteredDistribution`. + */ +public interface Distribution {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/DistributionSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/DistributionSupport.java new file mode 100644 index 000000000000..272e1167933b --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/DistributionSupport.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.distribution; + +/** + * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to report the + * output partitioning, to avoid shuffle at Spark side if the output partitioning can satisfy the + * distribution requirement. + */ +public interface DistributionSupport { + /** + * Returns an array of partitionings this data source can output. Spark will pick one partitioning + * that can avoid shuffle, and call `pickPartitioning` to notify the data source which + * partitioning was picked. Note that, if none of the partitions can help to avoid shuffle, + * `NoPartitioning` will be passed to `pickPartitioning`. + */ + Partitioning[] getPartitionings(); + + void pickPartitioning(Partitioning p); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/NoPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/NoPartitioning.java new file mode 100644 index 000000000000..fb338eb871c4 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/NoPartitioning.java @@ -0,0 +1,17 @@ +package org.apache.spark.sql.sources.v2.reader.distribution; + +/** + * Represents a partitioning that guarantees nothing, which means it can not satisfy any + * distributions. + */ +public class NoPartitioning implements Partitioning { + @Override + public boolean satisfies(Distribution distribution) { + return false; + } + + @Override + public int numPartitions() { + return 0; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Partitioning.java new file mode 100644 index 000000000000..115d19ee8fc6 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/distribution/Partitioning.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.distribution; + +/** + * An interface to describe how the data is physically partitioned. + */ +public interface Partitioning { + /** + * Returns true if this partitioning can satisfy the given distribution requirement. + */ + boolean satisfies(Distribution distribution); + + /** + * The number of read tasks that will be used to produce data. + */ + int numPartitions(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BucketingSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BucketingSupport.java new file mode 100644 index 000000000000..639959242639 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BucketingSupport.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +/** + * A mix in interface for `DataSourceV2Writer`. Users can implement this interface to allow Spark + * to specify bucket/sort columns for writing data into this data source. + */ +public interface BucketingSupport { + void setBucketColumns(String[] bucketColumns, int numBuckets); + + void setSortColumns(String[] sortColumns, boolean asc, boolean nullFirst); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java new file mode 100644 index 000000000000..3cab49e2cf33 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +/** + * A writer that is responsible for writing data to a writable data source. + * The writing procedure is: + * 1. create the write task, serialize and send it to all the partitions of the input data(RDD). + * 2. for each partition, write the data with the writer. + * 3. for each partition, if all data are written successfully, call writer.commit. + * 4. for each partition, if exception happens during the writing, call writer.abort. + * TODO: shall we introduce a retry mechanism instead of calling `abort` immediately when + * failure happens? + * 5. wait until all the writers are finished, i.e., either commit or abort. + * 6. if all partitions are written successfully, call WritableDataSourceV2.commit. + * 7. if some partitions failed and aborted, call WritableDataSourceV2.abort. + * + * Note that, the implementations are responsible to correctly implement transaction by overwriting + * the `commit` and `abort` methods of the writer and write task. + */ +public interface DataSourceV2Writer { + WriteTask createWriteTask(); + + void commit(WriterCommitMessage[] messages); + + void abort(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/PartitioningSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/PartitioningSupport.java new file mode 100644 index 000000000000..bc20c2ede5c8 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/PartitioningSupport.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +/** + * A mix in interface for `DataSourceV2Writer`. Users can implement this interface to allow Spark + * to specify partition columns for writing data into this data source. + */ +public interface PartitioningSupport { + void setPartitionColumns(String[] partitionColumns); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteTask.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteTask.java new file mode 100644 index 000000000000..263041c5a43a --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteTask.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import java.io.Serializable; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; + +/** + * A write task that is responsible for writing the data for each input partition. + */ +public abstract class WriteTask implements Serializable { + private ExpressionEncoder encoder; + + /** + * This method will be called before writing data, users can overwrite this method and put + * initialization logic here. Note that, when overwriting this method, you have to call + * `super.initialize` to correctly do some internal setup for this write task. + */ + public void initialize(StructType schema) { + encoder = RowEncoder.apply(schema); + } + + /** + * The actual write logic should be implemented here. To correctly implement transaction, + * implementations should stage the writing or have a way to rollback. + */ + public abstract void write(Row row); + + /** + * Inside Spark, the data is `UnsafeRow` and will be converted to `Row` before sending it out of + * Spark and writing to data source. To avoid this conversion, implementations can overwrite + * this method and writes `UnsafeRow`s directly. Note that, this is an experimental and unstable + * interface, as `UnsafeRow` is not public and may get changed in future Spark versions. + */ + @Experimental + @InterfaceStability.Unstable + public void write(UnsafeRow row) { + write(encoder.fromRow(row)); + } + + public abstract WriterCommitMessage commit(); + + public abstract void abort(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java new file mode 100644 index 000000000000..6a07c3cd2e01 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import java.io.Serializable; + +/** + * A message returned by `WriteTask.commit`, which will be serialized and sent to the + * driver side, and pass to `DataSourceV2Writer.commit`. + * + * Note that this is an empty interface, concrete `WriteTask` implementations can return any kind of + * `WriterCommitMessage`, its corresponding `DataSourceV2Writer` should cast these messages to + * concrete type in its `commit` method. + */ +public interface WriterCommitMessage extends Serializable {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 10b28ce812af..2a1c94254aa2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -27,12 +27,15 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser} import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2SchemaProvider} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -181,13 +184,40 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { "read files of Hive data source directly.") } - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap).resolveRelation()) + val cls = DataSource.lookupDataSource(source) + if (classOf[DataSourceV2].isAssignableFrom(cls)) { + // TODO: merge this into `DataSource` + val dataSource = cls.newInstance().asInstanceOf[DataSourceV2] + val options = CaseInsensitiveMap(extraOptions.toMap) + + val schema = dataSource match { + case ds: DataSourceV2SchemaProvider if ds.acceptsUserDefinedSchema => + userSpecifiedSchema.getOrElse(ds.inferSchema(options)) + + case ds: DataSourceV2SchemaProvider => + val inferredSchema = ds.inferSchema(options) + if (userSpecifiedSchema.isDefined && userSpecifiedSchema.get != inferredSchema) { + throw new AnalysisException(s"$ds does not allow user-specified schemas.") + } + inferredSchema + + case _ => + userSpecifiedSchema.getOrElse { + throw new AnalysisException(s"A schema needs to be specified when using $dataSource.") + } + } + + val reader = dataSource.createReader(schema, options) + Dataset.ofRows(sparkSession, DataSourceV2Relation(schema.toAttributes, reader)) + } else { + sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = paths, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap).resolveRelation()) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 74a47da2deef..3713e94c087e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -31,8 +31,6 @@ import org.apache.spark.sql.types.DataType */ private[sql] trait ColumnarBatchScan extends CodegenSupport { - val inMemoryTableScan: InMemoryTableScanExec = null - override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 4e718d609c92..b143d44eae17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy import org.apache.spark.sql.internal.SQLConf class SparkPlanner( @@ -35,6 +36,7 @@ class SparkPlanner( def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( + DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala new file mode 100644 index 000000000000..5b32ddbe9b04 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.reflect.ClassTag + +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.sources.v2.reader.ReadTask + +class DataSourceRDDPartition[T : ClassTag](val index: Int, val generator: ReadTask[T]) + extends Partition with Serializable + +class DataSourceRDD[T: ClassTag]( + sc: SparkContext, + @transient private val generators: java.util.List[ReadTask[T]]) + extends RDD[T](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { + var index = 0 + val iter = generators.iterator() + val res = new Array[Partition](generators.size()) + while (iter.hasNext) { + res(index) = new DataSourceRDDPartition[T](index, iter.next()) + index += 1 + } + res + } + + override def compute(split: Partition, context: TaskContext): Iterator[T] = { + val generator = split.asInstanceOf[DataSourceRDDPartition[T]].generator + context.addTaskCompletionListener(_ => generator.close()) + val iter = new Iterator[T] { + generator.open() + + private[this] var valuePrepared = false + + override def hasNext: Boolean = { + if (!valuePrepared) { + valuePrepared = generator.next() + } + valuePrepared + } + + override def next(): T = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + valuePrepared = false + generator.get() + } + } + new InterruptibleIterator(context, iter) + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + split.asInstanceOf[DataSourceRDDPartition[T]].generator.preferredLocations() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala new file mode 100644 index 000000000000..9ce2a5ef71f2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.reader.{DataSourceV2Reader, StatisticsSupport} + +case class DataSourceV2Relation( + output: Seq[AttributeReference], + reader: DataSourceV2Reader) extends LeafNode { + + override def computeStats(): Statistics = reader match { + case r: StatisticsSupport => Statistics(sizeInBytes = r.sizeInBytes()) + case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala new file mode 100644 index 000000000000..d6260b4bed66 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, ExpressionSet} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} +import org.apache.spark.sql.sources.v2.reader.{ColumnarReadSupport, DataSourceV2Reader} + +case class DataSourceV2ScanExec( + fullOutput: Array[AttributeReference], + @transient reader: DataSourceV2Reader, + // TODO: these 3 parameters are only used to determine the equality of the scan node, however, + // the reader also have this information, and ideally we can just rely on the equality of the + // reader. The only concern is, the reader implementation is outside of Spark and we have no + // control. + requiredColumnsIndex: Seq[Int], + @transient filters: ExpressionSet, + hashPartitionKeys: Seq[String]) extends LeafExecNode with ColumnarBatchScan { + + def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) + + override protected def doExecute(): RDD[InternalRow] = reader match { + case r: ColumnarReadSupport if r.supportsColumnarReads() => + WholeStageCodegenExec(this).execute() + case _ => + val numOutputRows = longMetric("numOutputRows") + inputRDD.map { r => + numOutputRows += 1 + r + } + } + + private lazy val inputRDD: RDD[InternalRow] = { + reader match { + case r: ColumnarReadSupport if r.supportsColumnarReads() => + new DataSourceRDD(sparkContext, r.createColumnarReadTasks()) + .asInstanceOf[RDD[InternalRow]] + case _ => + new DataSourceRDD(sparkContext, reader.createUnsafeRowReadTasks()) + .asInstanceOf[RDD[InternalRow]] + } + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) + + override protected def doProduce(ctx: CodegenContext): String = { + if (reader.isInstanceOf[ColumnarReadSupport]) { + return super.doProduce(ctx) + } + val numOutputRows = metricTerm(ctx, "numOutputRows") + // PhysicalRDD always just has one input + val input = ctx.freshName("input") + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + val exprRows = output.zipWithIndex.map{ case (a, i) => + BoundReference(i, a.dataType, a.nullable) + } + val row = ctx.freshName("row") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columnsRowInput = exprRows.map(_.genCode(ctx)) + s""" + |while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, row).trim} + | if (shouldStop()) return; + |} + """.stripMargin + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala new file mode 100644 index 000000000000..1826f64f25d2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport} + +object DataSourceV2Strategy extends Strategy { + // TODO: write path + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalOperation(projects, filters, DataSourceV2Relation(output, reader)) => + val attrMap = AttributeMap(output.zip(output)) + + val projectSet = AttributeSet(projects.flatMap(_.references)) + val filterSet = AttributeSet(filters.flatMap(_.references)) + + // Match original case of attributes. + val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap) + val supportColumnPruning = reader match { + case r: ColumnPruningSupport => + r.pruneColumns(requiredColumns.toStructType) + case _ => false + } + + val stayUpFilters: Seq[Expression] = reader match { + case r: CatalystFilterPushDownSupport => + r.pushDownCatalystFilters(filters.toArray) + case r: FilterPushDownSupport => + // A map from original Catalyst expressions to corresponding translated data source filters. + // If a predicate is not in this map, it means it cannot be pushed down. + val translatedMap: Map[Expression, Filter] = filters.flatMap { p => + DataSourceStrategy.translateFilter(p).map(f => p -> f) + }.toMap + + // Catalyst predicate expressions that cannot be converted to data source filters. + val nonconvertiblePredicates = filters.filterNot(translatedMap.contains) + + // Data source filters that cannot be pushed down. An unhandled filter means + // the data source cannot guarantee the rows returned can pass the filter. + // As a result we must return it so Spark can plan an extra filter operator. + val unhandledFilters = r.pushDownFilters(translatedMap.values.toArray).toSet + val unhandledPredicates = translatedMap.filter { case (_, f) => + unhandledFilters.contains(f) + }.keys + + nonconvertiblePredicates ++ unhandledPredicates + case _ => filters + } + + val scan = DataSourceV2ScanExec( + output.toArray, + reader, + if (supportColumnPruning) requiredColumns.map(output.indexOf) else output.indices, + ExpressionSet(filters), + Nil) + + val filterCondition = stayUpFilters.reduceLeftOption(And) + val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + + val withProject = if (projects == withFilter.output) { + withFilter + } else { + ProjectExec(projects, withFilter) + } + + // TODO: support hash partitioning push down. + + withProject :: Nil + + case _ => Nil + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java new file mode 100644 index 000000000000..d8b7a7246374 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.*; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.GreaterThan; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2SchemaProvider; +import org.apache.spark.sql.sources.v2.reader.ColumnPruningSupport; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.sources.v2.reader.FilterPushDownSupport; +import org.apache.spark.sql.types.StructType; + +public class JavaAdvancedDataSourceV2 implements DataSourceV2, DataSourceV2SchemaProvider { + class Reader extends DataSourceV2Reader implements ColumnPruningSupport, FilterPushDownSupport { + private StructType requiredSchema; + private List filters = new LinkedList<>(); + + Reader(StructType schema) { + this.requiredSchema = schema; + } + + @Override + public StructType readSchema() { + return requiredSchema; + } + + @Override + public boolean pruneColumns(StructType requiredSchema) { + this.requiredSchema = requiredSchema; + return true; + } + + @Override + public boolean pushDownFilter(Filter filter) { + this.filters.add(filter); + return true; + } + + @Override + public List> createReadTasks() { + List> res = new ArrayList<>(); + + Integer lowerBound = null; + for (Filter filter : filters) { + if (filter instanceof GreaterThan) { + GreaterThan f = (GreaterThan) filter; + if ("i".equals(f.attribute()) && f.value() instanceof Integer) { + lowerBound = (Integer) f.value(); + break; + } + } + } + + if (lowerBound == null) { + res.add(new JavaAdvancedReadTask(0, 5, requiredSchema)); + res.add(new JavaAdvancedReadTask(5, 10, requiredSchema)); + } else if (lowerBound < 4) { + res.add(new JavaAdvancedReadTask(lowerBound + 1, 5, requiredSchema)); + res.add(new JavaAdvancedReadTask(5, 10, requiredSchema)); + } else if (lowerBound < 9) { + res.add(new JavaAdvancedReadTask(lowerBound + 1, 10, requiredSchema)); + } + + return res; + } + } + + @Override + public StructType inferSchema(CaseInsensitiveMap options) { + return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public DataSourceV2Reader createReader(StructType schema, CaseInsensitiveMap options) { + return new Reader(schema); + } +} + +class JavaAdvancedReadTask implements ReadTask { + private int start; + private int end; + private StructType requiredSchema; + + public JavaAdvancedReadTask(int start, int end, StructType requiredSchema) { + this.start = start - 1; + this.end = end; + this.requiredSchema = requiredSchema; + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public Row get() { + Object[] values = new Object[requiredSchema.size()]; + for (int i = 0; i < values.length; i++) { + if ("i".equals(requiredSchema.apply(i).name())) { + values[i] = start; + } else if ("j".equals(requiredSchema.apply(i).name())) { + values[i] = -start; + } + } + return new GenericRow(values); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java new file mode 100644 index 000000000000..9b4fd76a802e --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2SchemaProvider; +import org.apache.spark.sql.sources.v2.reader.ColumnarReadSupport; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +public class JavaBatchDataSourceV2 implements DataSourceV2, DataSourceV2SchemaProvider { + class Reader extends DataSourceV2Reader implements ColumnarReadSupport { + private final StructType schema; + + Reader(StructType schema) { + this.schema = schema; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createReadTasks() { + throw new RuntimeException("not implemented"); + } + + @Override + public List> createColumnarReadTasks() { + return java.util.Arrays.asList(new JavaColumnarReadTask(schema)); + } + } + + @Override + public StructType inferSchema(CaseInsensitiveMap options) { + return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public DataSourceV2Reader createReader(StructType schema, CaseInsensitiveMap options) { + return new Reader(schema); + } +} + +class JavaColumnarReadTask implements ReadTask { + private final StructType schema; + + public JavaColumnarReadTask(StructType schema) { + this.schema = schema; + } + + private ColumnarBatch batch = null; + private int currentBatch = 0; + + @Override + public boolean next() { + currentBatch += 1; + return currentBatch <= 2; + } + + @Override + public ColumnarBatch get() { + if (batch == null) { + batch = ColumnarBatch.allocate(schema); + } + batch.reset(); + if (currentBatch == 1) { + for (int i = 0; i < 5; i++) { + batch.column(0).putInt(i, i); + batch.column(1).putInt(i, -i); + } + } else { + for (int i = 0; i < 5; i++) { + batch.column(0).putInt(i, i + 5); + batch.column(1).putInt(i, -(i + 5)); + } + } + batch.setNumRows(5); + return batch; + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java new file mode 100644 index 000000000000..7a564a014c2e --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2SchemaProvider; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +public class JavaSimpleDataSourceV2 implements DataSourceV2, DataSourceV2SchemaProvider { + class Reader extends DataSourceV2Reader { + private final StructType schema; + + Reader(StructType schema) { + this.schema = schema; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createReadTasks() { + return java.util.Arrays.asList( + new JavaSimpleReadTask(0, 5), + new JavaSimpleReadTask(5, 10)); + } + } + + @Override + public StructType inferSchema(CaseInsensitiveMap options) { + return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public DataSourceV2Reader createReader(StructType schema, CaseInsensitiveMap options) { + return new Reader(schema); + } +} + +class JavaSimpleReadTask implements ReadTask { + private int start; + private int end; + + public JavaSimpleReadTask(int start, int end) { + this.start = start - 1; + this.end = end; + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public Row get() { + return new GenericRow(new Object[] {start, -start}); + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java new file mode 100644 index 000000000000..f573933ed399 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2SchemaProvider; +import org.apache.spark.sql.sources.v2.reader.ReadTask; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, DataSourceV2SchemaProvider { + class Reader extends DataSourceV2Reader { + private final StructType schema; + + Reader(StructType schema) { + this.schema = schema; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public List> createReadTasks() { + throw new IllegalStateException(); + } + + @Override + public List> createUnsafeRowReadTasks() { + return java.util.Arrays.asList( + new JavaUnsafeRowReadTask(0, 5), + new JavaUnsafeRowReadTask(5, 10)); + } + } + + @Override + public StructType inferSchema(CaseInsensitiveMap options) { + return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public DataSourceV2Reader createReader(StructType schema, CaseInsensitiveMap options) { + return new Reader(schema); + } +} + +class JavaUnsafeRowReadTask implements ReadTask { + private int start; + private int end; + private UnsafeRow row; + + public JavaUnsafeRowReadTask(int start, int end) { + this.start = start - 1; + this.end = end; + this.row = new UnsafeRow(2); + row.pointTo(new byte[8 * 3], 8 * 3); + } + + @Override + public boolean next() { + start += 1; + return start < end; + } + + @Override + public UnsafeRow get() { + row.setInt(0, start); + row.setInt(1, -start); + return row; + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala new file mode 100644 index 000000000000..9c227c349f5b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -0,0 +1,260 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.sources.v2 + +import java.util.{ArrayList, List => JList} + +import scala.collection.mutable.ListBuffer + +import test.org.apache.spark.sql.sources.v2._ + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.vectorized.ColumnarBatch +import org.apache.spark.sql.sources.{Filter, GreaterThan} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class DataSourceV2Suite extends QueryTest with SharedSQLContext { + import testImplicits._ + + test("simplest implementation") { + Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) + } + } + } + + test("advanced implementation") { + Seq(classOf[AdvancedDataSourceV2], classOf[JavaAdvancedDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 3), (4 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j).filter('i > 6), (7 until 10).map(i => Row(-i))) + checkAnswer(df.select('i).filter('i > 10), Nil) + } + } + } + + test("unsafe row implementation") { + Seq(classOf[UnsafeRowDataSourceV2], classOf[JavaUnsafeRowDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) + } + } + } + + test("batched implementation") { + Seq(classOf[BatchDataSourceV2], classOf[JavaBatchDataSourceV2]).foreach { cls => + withClue(cls.getName) { + val df = spark.read.format(cls.getName).load() + checkAnswer(df, (0 until 10).map(i => Row(i, -i))) + checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) + checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) + } + } + } +} + +class SimpleDataSourceV2 extends DataSourceV2 with DataSourceV2SchemaProvider { + class Reader(val readSchema: StructType) extends DataSourceV2Reader { + override def createReadTasks(): JList[ReadTask[Row]] = { + java.util.Arrays.asList(new SimpleReadTask(0, 5), new SimpleReadTask(5, 10)) + } + } + + override def inferSchema(options: CaseInsensitiveMap[String]): StructType = { + new StructType().add("i", "int").add("j", "int") + } + + override def createReader( + schema: StructType, + options: CaseInsensitiveMap[String]): DataSourceV2Reader = new Reader(schema) +} + +class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] { + private var current = start - 1 + override def next(): Boolean = { + current += 1 + current < end + } + override def get(): Row = Row(current, -current) +} + + + +class AdvancedDataSourceV2 extends DataSourceV2 with DataSourceV2SchemaProvider { + class Reader(fullSchema: StructType) extends DataSourceV2Reader + with ColumnPruningSupport with FilterPushDownSupport { + + var requiredSchema = fullSchema + val filters = ListBuffer.empty[Filter] + + override def pruneColumns(requiredSchema: StructType): Boolean = { + this.requiredSchema = requiredSchema + true + } + + override def pushDownFilter(filter: Filter): Boolean = { + this.filters += filter + true + } + + override def readSchema(): StructType = { + requiredSchema + } + + override def createReadTasks(): JList[ReadTask[Row]] = { + val lowerBound = filters.collect { + case GreaterThan("i", v: Int) => v + }.headOption + + val res = new ArrayList[ReadTask[Row]] + + if (lowerBound.isEmpty) { + res.add(new AdvancedReadTask(0, 5, requiredSchema)) + res.add(new AdvancedReadTask(5, 10, requiredSchema)) + } else if (lowerBound.get < 4) { + res.add(new AdvancedReadTask(lowerBound.get + 1, 5, requiredSchema)) + res.add(new AdvancedReadTask(5, 10, requiredSchema)) + } else if (lowerBound.get < 9) { + res.add(new AdvancedReadTask(lowerBound.get + 1, 10, requiredSchema)) + } + + res + } + } + + override def inferSchema(options: CaseInsensitiveMap[String]): StructType = { + new StructType().add("i", "int").add("j", "int") + } + + override def createReader( + schema: StructType, + options: CaseInsensitiveMap[String]): DataSourceV2Reader = new Reader(schema) +} + +class AdvancedReadTask(start: Int, end: Int, requiredSchema: StructType) + extends ReadTask[Row] { + + private var current = start - 1 + override def next(): Boolean = { + current += 1 + current < end + } + + override def get(): Row = { + val values = requiredSchema.map(_.name).map { + case "i" => current + case "j" => -current + } + Row.fromSeq(values) + } +} + + + +class UnsafeRowDataSourceV2 extends DataSourceV2 with DataSourceV2SchemaProvider { + class Reader(val readSchema: StructType) extends DataSourceV2Reader { + override def createUnsafeRowReadTasks(): JList[ReadTask[UnsafeRow]] = { + java.util.Arrays.asList(new UnsafeRowReadTask(0, 5), new UnsafeRowReadTask(5, 10)) + } + + override def createReadTasks(): JList[ReadTask[Row]] = throw new IllegalStateException() + } + + override def inferSchema(options: CaseInsensitiveMap[String]): StructType = { + new StructType().add("i", "int").add("j", "int") + } + + override def createReader( + schema: StructType, + options: CaseInsensitiveMap[String]): DataSourceV2Reader = new Reader(schema) +} + +class UnsafeRowReadTask(start: Int, end: Int) extends ReadTask[UnsafeRow] { + private val row = new UnsafeRow(2) + row.pointTo(new Array[Byte](8 * 3), 8 * 3) + + private var current = start - 1 + override def next(): Boolean = { + current += 1 + current < end + } + override def get(): UnsafeRow = { + row.setInt(0, current) + row.setInt(1, -current) + row + } +} + + + +class BatchDataSourceV2 extends DataSourceV2 with DataSourceV2SchemaProvider { + class Reader(val readSchema: StructType) extends DataSourceV2Reader with ColumnarReadSupport { + override def createReadTasks(): JList[ReadTask[Row]] = { + throw new NotImplementedError() + } + + override def createColumnarReadTasks(): JList[ReadTask[ColumnarBatch]] = { + java.util.Arrays.asList(new ColumnarReadTask(readSchema)) + } + } + + override def inferSchema(options: CaseInsensitiveMap[String]): StructType = { + new StructType().add("i", "int").add("j", "int") + } + + override def createReader( + schema: StructType, + options: CaseInsensitiveMap[String]): DataSourceV2Reader = new Reader(schema) +} + +class ColumnarReadTask(schema: StructType) extends ReadTask[ColumnarBatch] { + private lazy val batch = ColumnarBatch.allocate(schema) + private var currentBatch = 0 + + override def next(): Boolean = { + currentBatch += 1 + currentBatch <= 2 + } + + override def get(): ColumnarBatch = { + batch.reset() + if (currentBatch == 1) { + batch.column(0).putInts(0, 5, 0.until(5).toArray, 0) + batch.column(1).putInts(0, 5, 0.until(5).map(i => -i).toArray, 0) + } else { + batch.column(0).putInts(0, 5, 5.until(10).toArray, 0) + batch.column(1).putInts(0, 5, 5.until(10).map(i => -i).toArray, 0) + } + batch.setNumRows(5) + batch + } +}