Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comments
  • Loading branch information
cloud-fan committed Aug 31, 2017
commit a29031db10b4c0a2dd87d7f033068feb0f64d14a
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@
@InterfaceStability.Unstable
public interface CatalystFilterPushDownSupport {
Copy link

@dongjoon-hyun dongjoon-hyun Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In V2, can we introduce PlanPushDownSupport, too? Oops, sorry. I found that it's documented as out of scope.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean pushDownCatalystFilter(Expression filter);

This interface is very nice. Just wondering, for data source to implement this method , is it ok for implementation to access sub types of Expression in Spark , for example functions like Abs ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementations can pattern match the given expression and access any subtype you like.

/**
* Push down one filter, returns true if this filter can be pushed down to this data source,
* false otherwise. This method might be called many times if more than one filter need to be
* pushed down.
*
* TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which
* returns unsupported filters.
* Push down filters, returns unsupported filters.
*/
boolean pushDownCatalystFilter(Expression filter);
Expression[] pushDownCatalystFilters(Expression[] filters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public interface ColumnarReadSupport {
* A safety door for columnar reader. It's possible that the implementation can only support
* columnar reads for some certain columns, users can overwrite this method to fallback to
* normal read path under some conditions.
*
* Note that, if the implementation always return true here, then he can throw exception in
* the row based `DataSourceV2Reader.createReadTasks`, as it will never be called.
*/
default boolean supportsColumnarReads() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance we could just have a utility method that converts a ReadTask<InternalRow> into a ReadTask<ColumnarBatch? this would allow us to get rid of this default method.

Do we have a use case for this?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this one we have a use case. The current vectorized parquet reader only support flat schema, i.e., struct/array/map types are not supported.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit strange - shouldn't this return value sometimes depend on schema?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema is a state of the reader, so when a reader mix-in this interface, it should know what the current schema is, after column pruning or something.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so this is only called after all push downs are done? we should specify that.

return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import java.io.Closeable;
import java.util.Iterator;

/**
* A data reader returned by a read task and is responsible for outputting data for an RDD
* partition.
*/
public interface DataReader<T> extends Iterator<T>, Closeable {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this an "Iterator"? Don't do this ...

Use explicit next(), with close().

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to use something like Iterable and Iterator instead of requiring an open method that can be easily forgotten. What's the rationale behind not using Iterator? The remove method that's on the interface?

Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.spark.sql.sources.v2.reader;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;

Expand All @@ -47,13 +50,16 @@ public abstract class DataSourceV2Reader {
* output.
*/
// TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row.
public abstract List<ReadTask<Row>> createReadTasks();
protected abstract List<ReadTask<Row>> createReadTasks();

/**
* Inside Spark, the input rows will be converted to `UnsafeRow`s before processing. To avoid
* this conversion, implementations can overwrite this method and output `UnsafeRow`s directly.
* Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and
* may get changed in future Spark versions.
*
* Note that, if the implement overwrites this method, he should also overwrite `createReadTasks`
* to throw exception, as it will never be called.
*/
@Experimental
@InterfaceStability.Unstable
Expand All @@ -64,3 +70,48 @@ public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() {
.collect(Collectors.toList());
}
}

class RowToUnsafeRowReadTask implements ReadTask<UnsafeRow> {
private final ReadTask<Row> rowReadTask;
private final StructType schema;

RowToUnsafeRowReadTask(ReadTask<Row> rowReadTask, StructType schema) {
this.rowReadTask = rowReadTask;
this.schema = schema;
}

@Override
public String[] preferredLocations() {
return rowReadTask.preferredLocations();
}

@Override
public DataReader<UnsafeRow> getReader() {
return new RowToUnsafeDataReader(rowReadTask.getReader(), RowEncoder.apply(schema));
}
}

class RowToUnsafeDataReader implements DataReader<UnsafeRow> {
private final DataReader<Row> rowReader;
private final ExpressionEncoder<Row> encoder;

RowToUnsafeDataReader(DataReader<Row> rowReader, ExpressionEncoder<Row> encoder) {
this.rowReader = rowReader;
this.encoder = encoder;
}

@Override
public boolean hasNext() {
return rowReader.hasNext();
}

@Override
public UnsafeRow next() {
return (UnsafeRow) encoder.toRow(rowReader.next());
}

@Override
public void close() throws IOException {
rowReader.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@
*/
public interface FilterPushDownSupport {
/**
* Push down one filter, returns true if this filter can be pushed down to this data source,
* false otherwise. This method might be called many times if more than one filter need to be
* pushed down.
*
* TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which
* returns unsupported filters.
* Push down filters, returns unsupported filters.
*/
boolean pushDownFilter(Filter filter);
Filter[] pushDownFilters(Filter[] filters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.io.Serializable;

/**
* A read task returned by a data source reader and is responsible for outputting data for an RDD
* partition.
* A read task returned by a data source reader and is responsible to create the data reader.
* The relationship between `ReadTask` and `DataReader` is similar to `Iterable` and `Iterator`.
*/
public interface ReadTask<T> extends Serializable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use standard interfaces like Iterable and Iterator or at least the same pattern? This interface mixes the two together by having both open and next/get. That makes it easy to forget to call open and prevents the ReadTask from being used more than once (unless it has been serialized to two places). I would expect ReadTask to be like an Iterable so that it can be reused if needed, and for it to return an equivalent of Iterator that actually does the read and tracks state. Requesting the Iterator from a task opens it, so you never forget.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these kinds of Serializable interfaces, it can be a pain to implement because you end up needing to make all of your data access objects serializable (or construct them all in the open method, which is also quite sad, for reasons @rdblue notes). In datasources V1, we've used a pattern where you include a serializable datastructure that contains enough information to construct your objects properly (so for example, the params map is serializable).

Ideally we could have something similar here; what if ReadTask<T> extends Serializable and has a method which returns a closeable Java 8 spliterator - a datastructure which has a similar interface to your Iterator-like thing, but is easier to implement safely (don't need to do a while (it.next()) { doSomething(it.get() }, can just do split.forEachRemaining(this::doSomething) or easily convert it into an iterator (with very little overhead) at a higher level.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should make ReadTask like an Iterable, and can return an Iterator. But I can't find a standard closable iterator interface in JDK, shall we create one here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just return an object with an interface like:

interface Reader<T> extends Spliterator<T>, Closeable {}

(spliterator is a much nicer interface than iterator to implement)

or is the issue here that we can't use Java 8?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use Closeable, which requires that multiple calls to close are safe. The iterator should close itself when it is exhausted, hasNext should return false, and NoSuchElementException should be thrown by next as required by Iterator.

/**
Expand All @@ -33,25 +33,5 @@ default String[] preferredLocations() {
return new String[0];
}

/**
* This method will be called before running this read task, users can overwrite this method
* and put initialization logic here.
*/
default void open() {}

/**
* Proceed to next record, returns false if there is no more records.
*/
boolean next();

/**
* Return the current record. This method should return same value until `next` is called.
*/
T get();

/**
* This method will be called after finishing this read task, users can overwrite this method
* and put clean up logic here.
*/
default void close() {}
DataReader<T> getReader();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public interface SortPushDown {
* Returns true if the implementation can handle this sorting requirement and save a sort
* operation at Spark side.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the discussion on hash support, I would rather see this the other way around, where the data source can reports its underlying sort order. Maybe there are some sources that can perform a sort before passing the data to Spark, but its hard to know when it should. I think the more useful case is when the data is already sorted.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I agree, shall we distinguish global sort and per-partition sort?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave this out of the first iteration and add it later, but some thoughts: specify the preferred sort orders, and the data source can return the sortedness.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-partition Sort would be great, We usually can't handle a Global sort but there are some per-partition sorts (By partition here I mean SQL Style Partitioning) we can handle.

SELECT pkey, RANK() OVER (PARTITION BY pkey ORDER BY clustCol) FROM ks.tab

Like in this case if pkey is the C* partition key then there is a natural ordering we can take advantage of when returning values.

Copy link

@RussellSpitzer RussellSpitzer Sep 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note @brianmhess wrote that sql, I don't know SQL like him and he is great.

*/
boolean pushDownSort(String[] sortingColumns);
boolean pushDownSort(String[] sortingColumns, boolean asc, boolean nullFirst);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,10 @@

package org.apache.spark.sql.sources.v2.reader;

public class SparkHashPartitioning implements Partitioning {
private int numPartitions;
import java.util.OptionalLong;

public SparkHashPartitioning(int numPartitions) {
this.numPartitions = numPartitions;
}

@Override
public boolean compatibleWith(Partitioning other) {
if (other instanceof SparkHashPartitioning) {
return this.numPartitions() == other.numPartitions();
} else {
return other.compatibleWith(this);
}
}

@Override
public int numPartitions() {
return numPartitions;
}
public interface Statistics {
OptionalLong getSize();
OptionalLong getRows();
OptionalLong getDistinctValues(String columnName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@
* statistics like sizeInBytes, to Spark.
*/
public interface StatisticsSupport {
// todo: shall we add more statistics? what do we want?
long sizeInBytes();
Statistics getStatistics();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader.distribution;

/**
* Represents a distribution where records that share the same values for the `clusteringColumns`
* will be co-located, which means, they will be produced by the same `ReadTask`.
*/
public class ClusteredDistribution {
private String[] clusteringColumns;

public ClusteredDistribution(String[] clusteringColumns) {
this.clusteringColumns = clusteringColumns;
}

public String[] getClusteringColumns() {
return clusteringColumns;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader.distribution;

/**
* Specifies how data should be distributed when a query is executed in parallel on many machines.
*
* Current implementations: `ClusteredDistribution`.
*/
public interface Distribution {}
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,21 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

package org.apache.spark.sql.sources.v2.reader.distribution;

/**
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to pre-clustering
* the data and avoid shuffle at Spark side.
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to report the
* output partitioning, to avoid shuffle at Spark side if the output partitioning can satisfy the
* distribution requirement.
*/
public interface ClusteringPushDownSupport {
public interface DistributionSupport {
/**
* Returns a non-null `Partitioning` if the implementation can handle this clustering requirement
* and save a shuffle at Spark side. Clustering means, if two records have same values for the
* given clustering columns, they must be produced by the same read task.
* Returns an array of partitionings this data source can output. Spark will pick one partitioning
* that can avoid shuffle, and call `pickPartitioning` to notify the data source which
* partitioning was picked. Note that, if none of the partitions can help to avoid shuffle,
* `NoPartitioning` will be passed to `pickPartitioning`.
*/
Partitioning pushDownClustering(String[] clusteringColumns);
Partitioning[] getPartitionings();

/**
* Cancel this clustering push down. This will be called if Spark finds out that we can't avoid
* the shuffle after we push down the clustering.
*/
void cancel();
void pickPartitioning(Partitioning p);
}
Loading