-
Notifications
You must be signed in to change notification settings - Fork 2
Prototype: Data Source V2 #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2; | ||
|
|
||
| /** | ||
| * A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this | ||
| * interface to specify the bucket/sort columns of the reader/writer, to improve performance. | ||
|
||
| */ | ||
| public interface BucketingSupport { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is used to set sort columns, why is it called |
||
| void setBucketColumns(String[] bucketColumns, int numBuckets); | ||
|
|
||
| void setSortColumns(String[] sortColumns); | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2; | ||
|
|
||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; | ||
| import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** | ||
| * The main interface and minimal requirement for data source v2 implementations. Users can mix in | ||
| * more interfaces to implement more functions other than just scan. | ||
| */ | ||
| public interface DataSourceV2 { | ||
|
|
||
| /** | ||
| * The main entrance for read interface. | ||
| * | ||
| * @param schema the full schema of this data source reader. Full schema usually maps to the | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the schema should be handled a little differently. We should assume that a data source has a schema that applies to all of its data, instead of passing one in. That's the case with most datasets, like Hive tables, relational tables, and Cassandra tables. I'd also argue that it is a best practice so we don't have expensive operations to infer the schema, like we do for non-Metastore tables. So instead of passing the full schema here and having an interface to infer a schema, the data source should be expected to report its schema while analyzing the query (which may require inference). Then the schema passed to create a reader should be the expected schema, or projection schema after the query is optimized. Also, I'd like to understand how column IDs should be handled. I'm building a data source that uses column IDs to implement schema evolution that supports, add, delete, and rename. The column IDs for a table are only unique within that table (another table can reuse them), so it doesn't necessarily make sense to expose them to Spark. Spark will still need its own unique attribute IDs. But, I'd prefer to have Spark request columns by ID rather than by name so that Spark handles name resolution between a query and the data source. I think what makes sense is for the data source to create a Spark schema with attributes that Spark will use in the projection schema. So if my data source has columns
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm +1 on it if we can rewrite the data source part from scratch... For the column id stuff, I don't totally understand it. Attribute id is internal to Spark, Spark only uses column names when interacting with data sources, that's why we only use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "We should assume that a data source has a schema that applies to all of its data, instead of passing one in. That's the case with most datasets, like Hive tables, relational tables, and Cassandra tables." This is not true though. Hive table, for example, the schema is specified in the catalog, which means it has to be passed into the underlying file-based data source. The file-based data source itself is actually not dictating the schema. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think what I didn't understand is that the datasource API is between the metastore and the files in a table. So the metastore is queried to get a schema, then the data files are treated as schema-on-read. But why should that be the case? I would rather be able to have Hive tables entirely behind the DataSource API, so they can be handled the same way as a JDBC table or another source with a pre-determined schema for its data. With that missing context, I can see how the interface proposed here looks designed for schema-on-read, rather than for tables that already have a fixed schema: it assumes that the data source will commonly use an externally provided schema, can't necessarily infer one (e.g. JSON), and the projection is separate from the data source schema. I think it's reasonable to consider whether this API should be based on schema-on-read or should assume that the data source has a schema along the lines of what I was suggesting before. The proposal doc lists 3 cases: an external schema is required, schema is fixed and a user schema is not allowed, and a user schema is optional because it can be inferred. For sources with fixed schemas, What about a For the other two cases, the proposed API works okay. It's a little strange that those that require a user schema won't implement |
||
| * physical schema of the underlying storage of this data source reader, e.g. | ||
| * parquet files, JDBC tables, etc, while this reader may not read data with full | ||
| * schema, as column pruning or other optimizations may happen. | ||
| * @param options the options for this data source reader, which is case insensitive. | ||
| * @return a reader that implements the actual read logic. | ||
| */ | ||
| DataSourceV2Reader createReader( | ||
| StructType schema, | ||
| CaseInsensitiveMap<String> options); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe not a bit deal at this stage but I wonder if this should be just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't use a Map. It's a huge disaster in v1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There might also be data sources that expects case-sensitive key names, I think it will be good idea to pass options as user specified and let the data-source implementation handle the options as appropriate for the data source. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has bitten me in design. Not realizing all the options come through lowercased There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't do that (let the source handle case sensitivity). It will be the most confusing thing to the end user. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rxin, what about using an option map was a disaster in v1? For case sensitivity, what happens elsewhere? I didn't think that options were case insensitive in Spark. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some sources interpret the options as case sensitive, and some as case insensitive. It's really confusing to the end users. On top of that, Scala Map has really bad binary compatibility across Scala versions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, that makes sense. What are you proposing instead of a map? A configuration object that handles resolution internally?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2; | ||
|
|
||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** | ||
| * A mix in interface for `DataSourceV2`. Users can implement this interface to provide schema | ||
| * inference ability when scanning data. | ||
| */ | ||
| public interface DataSourceV2SchemaProvider { | ||
| /** | ||
| * Return the inferred schema of this data source given these options. | ||
| */ | ||
| StructType inferSchema(CaseInsensitiveMap<String> options); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should have a better name. Data sources with a metadata store won't infer anything, this should come from the table's metadata.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd use a different name other than "get", since get is usually very cheap, whereas here it can potentially be very expensive. Maybe computeSchema ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point about get. If we add an interface to signal schema-on-read behavior, we could also move |
||
|
|
||
| /** | ||
| * Whether or not this data source can accept user specified schema. When Spark scans a data | ||
| * source, users can specify the schema to avoid expensive schema inference. However some data | ||
| * sources may have to infer the schema and reject any user specified schemas, they can overwrite | ||
| * this method to achieve this. | ||
| */ | ||
| default boolean acceptsUserDefinedSchema() { | ||
| return true; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why a boolean method instead of a UserDefinedSchema interface like the other traits? Also, doesn't the
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I explained in the doc, we have 3 requirements for the schema inference feature:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left a longer comment above. I didn't get what you were doing with these interfaces at first since I think of most data sources having a schema instead of being schema-on-read. |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2; | ||
|
|
||
| /** | ||
| * A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this | ||
| * interface to specify the partition columns of the reader/writer, to improve performance. | ||
|
||
| */ | ||
| public interface PartitioningSupport { | ||
| void setPartitionColumns(String[] partitionColumns); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is good but would this be separate from bucketing? I want to be able to dictate that a column, or group of columns a which, when equal between two rows will be found in the same task. I can't make guarantees about ordering, I can only guarantee clustering (re: Cassandra) |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2; | ||
|
|
||
| import org.apache.spark.sql.SaveMode; | ||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; | ||
| import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; | ||
|
|
||
| /** | ||
| * A mix in interface for `DataSourceV2`. Users can implement this interface to provide data writing | ||
| * ability with job-level transaction. | ||
| */ | ||
| public interface WritableDataSourceV2 extends DataSourceV2 { | ||
|
|
||
| /** | ||
| * The main entrance for write interface. | ||
| * | ||
| * @param mode the save move, can be append, overwrite, etc. | ||
| * @param options the options for this data source writer. | ||
| * @return a writer that implements the actual write logic. | ||
| */ | ||
| DataSourceV2Writer createWriter(SaveMode mode, CaseInsensitiveMap<String> options); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.reader; | ||
|
|
||
| import org.apache.spark.annotation.Experimental; | ||
| import org.apache.spark.annotation.InterfaceStability; | ||
| import org.apache.spark.sql.catalyst.expressions.Expression; | ||
|
|
||
| /** | ||
| * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down | ||
| * arbitrary expressions as predicates to the data source. | ||
| */ | ||
| @Experimental | ||
| @InterfaceStability.Unstable | ||
| public interface CatalystFilterPushDownSupport { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. boolean pushDownCatalystFilter(Expression filter); This interface is very nice. Just wondering, for data source to implement this method , is it ok for implementation to access sub types of Expression in Spark , for example functions like Abs ?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. implementations can pattern match the given expression and access any subtype you like. |
||
| /** | ||
| * Push down one filter, returns true if this filter can be pushed down to this data source, | ||
| * false otherwise. This method might be called many times if more than one filter need to be | ||
| * pushed down. | ||
| * | ||
| * TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which | ||
|
||
| * returns unsupported filters. | ||
| */ | ||
| boolean pushDownCatalystFilter(Expression filter); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.reader; | ||
|
|
||
|
|
||
| /** | ||
| * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-clustering | ||
| * the data and avoid shuffle at Spark side. | ||
| */ | ||
| public interface ClusteringPushDownSupport { | ||
| /** | ||
| * Returns true if the implementation can handle this clustering requirement and save a shuffle | ||
| * at Spark side. Clustering means, if two records have same values for the given clustering | ||
| * columns, they must be produced by the same read task. | ||
| */ | ||
| boolean pushDownClustering(String[] clusteringColumns); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.reader; | ||
|
|
||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** | ||
| * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to only read | ||
| * required columns/nested fields during scan. | ||
| */ | ||
| public interface ColumnPruningSupport { | ||
| /** | ||
| * Returns true if the implementation can apple this column pruning optimization, so that we can | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: should be "apply" not "apple" |
||
| * reduce the data size to be read at the very beginning. | ||
| */ | ||
| boolean pruneColumns(StructType requiredSchema); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why allow the implementation to reject the required schema? If the data source supports column pruning, then it should throw an exception if the columns passed by this method are invalid (with more context). Otherwise, it should do the pruning. The only case where I think this might be valid is if there are some columns that can't be pruned, in which case it should return the actual schema that will be produced instead of rejecting by returning false. But then, the actual read schema is accessible from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed here!
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I don't have a use case for rejecting the required schema, just wanna follow other push down interface to return a boolean. I'm fine to return |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.reader; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| import org.apache.spark.sql.execution.vectorized.ColumnarBatch; | ||
|
|
||
| /** | ||
| * A mix in interface for `DataSourceV2Reader`. Users can implement this interface to provide | ||
| * columnar read ability for better performance. | ||
| */ | ||
| public interface ColumnarReadSupport { | ||
| /** | ||
| * Similar to `DataSourceV2Reader.createReadTasks`, but return data in columnar format. | ||
| */ | ||
| List<ReadTask<ColumnarBatch>> createColumnarReadTasks(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means that I'd have to implement the I suppose this is a mixin because of the below method. It's unclear to me that you actually want to implement both of them as a user (even if you can support columnar in some cases) - I'd prefer to implement one and wrap it.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a data source may not be able to support columnar read for all columns, so I think they should always implement the row-based interface as a fallback. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but if I can support columnar read for all columns, then why make me implement the row based interface?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then you can throw exception in the row based interface, I'll put it in the java doc. |
||
|
|
||
| /** | ||
| * A safety door for columnar reader. It's possible that the implementation can only support | ||
| * columnar reads for some certain columns, users can overwrite this method to fallback to | ||
| * normal read path under some conditions. | ||
| */ | ||
| default boolean supportsColumnarReads() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any chance we could just have a utility method that converts a Do we have a use case for this?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea this one we have a use case. The current vectorized parquet reader only support flat schema, i.e., struct/array/map types are not supported. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a bit strange - shouldn't this return value sometimes depend on schema?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the schema is a state of the reader, so when a reader mix-in this interface, it should know what the current schema is, after column pruning or something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok so this is only called after all push downs are done? we should specify that. |
||
| return true; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.reader; | ||
|
|
||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.spark.annotation.Experimental; | ||
| import org.apache.spark.annotation.InterfaceStability; | ||
| import org.apache.spark.sql.Row; | ||
| import org.apache.spark.sql.catalyst.expressions.UnsafeRow; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** | ||
| * The main interface and minimal requirement for a data source reader. The implementations should | ||
| * at least implement the full scan logic, users can mix in more interfaces to implement scan | ||
| * optimizations like column pruning, filter push down, etc. | ||
| */ | ||
| public abstract class DataSourceV2Reader { | ||
|
|
||
| /** | ||
| * The actual schema of this data source reader, which may be different from the physical schema | ||
| * of the underlying storage, as column pruning or other optimizations may happen. | ||
| */ | ||
| public abstract StructType readSchema(); | ||
|
|
||
| /** | ||
| * The actual read logic should be implemented here. This may not be a full scan as optimizations | ||
| * may have already been applied on this reader. Implementations should return a list of | ||
| * read tasks, each task is responsible to output data for one RDD partition, which means | ||
| * the number of tasks returned here will be same as the number of RDD partitions this scan | ||
| * output. | ||
| */ | ||
| // TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row. | ||
| public abstract List<ReadTask<Row>> createReadTasks(); | ||
|
|
||
| /** | ||
| * Inside Spark, the input rows will be converted to `UnsafeRow`s before processing. To avoid | ||
| * this conversion, implementations can overwrite this method and output `UnsafeRow`s directly. | ||
| * Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and | ||
| * may get changed in future Spark versions. | ||
| */ | ||
| @Experimental | ||
| @InterfaceStability.Unstable | ||
| public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally we'd continue to be able to return InternalRows here. It's sometimes possible (with knowledge of filters, etc) to provide a more efficient implementation than using UnsafeRow; I should be able to provide my own (more efficient) implementation.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inside Spark the input rows must be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the use case for Should we make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that you indicate whether you need the conversion into unsafe rows by having your BaseRelation (in datasources v1) say that needsConversion is true (and then spark will copy the rows if a shuffle is required). Am I reading this incorrectly? I can see in GenerateUnsafeProjection that we are returning UnsafeRows, but this seems like a different approach which always involves a copy. In what case (with the current APIs) will my returning InternalRows result in an additional copy over my returning UnsafeRows. An additional thing is that it's far easier for me to create an InternalRow than it is for me to create an UnsafeRow in my codebase. Imagine I have a collection of list like things representing columns - it is trivial for me to create an InternalRow implementation that efficiently access them and avoids copies. It is much less trivial for me to create an unsafe row with them all in, as in, the code ends up being harder to implement (I have to start worrying about memory allocation, buffer sizes, batching).
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spark makes a contract that, the data exchanged between operators must be To avoid copy, you have to return For data source v1, |
||
| StructType schema = readSchema(); | ||
| return createReadTasks().stream() | ||
| .map(rowGenerator -> new RowToUnsafeRowReadTask(rowGenerator, schema)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is
v2going to be there at the final commit?I'm just wondering because already the names has
V2(e.g.DataSourceV2Reader,DataSourceV2Writer, andDataSourceV2) and the package nameorg.apache.spark.sql.sources.is different from the old one.