Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options w…
…hen filtering from a batch-based file data source
  • Loading branch information
cchighman authored and HeartSaVioR committed Nov 18, 2020
commit 154e83b9ca57e8833f00c724cf69ce30c5008e05
35 changes: 35 additions & 0 deletions docs/sql-data-sources-generic-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,38 @@ To load all files recursively, you can use:
{% include_example recursive_file_lookup r/RSparkSQLExample.R %}
</div>
</div>

### Modification Time Path Filters
`modifiedBefore` and `modifiedAfter` are options that can be
applied together or separately in order to achieve greater
granularity over which files may load during a Spark batch query.

* `modifiedBefore`: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
* `modifiedAfter`: an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

When a timezone option is not provided, the timestamps will be interpreted according
to the Spark session timezone (`spark.sql.session.timeZone`).

To load files with paths matching a given modified time range, you can use:

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_modified_time_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example load_with_modified_time_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example load_with_modified_time_filter python/sql/datasource.py %}
</div>

<div data-lang="r" markdown="1">
{% include_example load_with_modified_time_filter r/RSparkSQLExample.R %}
</div>
</div>
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ private static void runGenericFileSourceOptionsExample(SparkSession spark) {
// |file1.parquet|
// +-------------+
// $example off:load_with_path_glob_filter$
// $example on:load_with_modified_time_filter$
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
// Only load files modified before 7/1/2020 at 05:30
.option("modifiedBefore", "2020-07-01T05:30:00")
// Only load files modified after 6/1/2020 at 05:30
.option("modifiedAfter", "2020-06-01T05:30:00")
// Interpret both times above relative to CST timezone
.option("timeZone", "CST")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
// $example off:load_with_modified_time_filter$
}

private static void runBasicDataSourceExample(SparkSession spark) {
Expand Down
20 changes: 20 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ def generic_file_source_options_example(spark):
# +-------------+
# $example off:load_with_path_glob_filter$

# $example on:load_with_modified_time_filter$
# Only load files modified before 07/1/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
# Only load files modified after 06/01/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# +-------------+
# $example off:load_with_modified_time_filter$


def basic_datasource_example(spark):
# $example on:generic_load_save_functions$
Expand Down
8 changes: 8 additions & 0 deletions examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*
# 1 file1.parquet
# $example off:load_with_path_glob_filter$

# $example on:load_with_modified_time_filter$
beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
# file
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
# file
# $example off:load_with_modified_time_filter$

# $example on:manual_save_options_orc$
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,27 @@ object SQLDataSourceExample {
// |file1.parquet|
// +-------------+
// $example off:load_with_path_glob_filter$
// $example on:load_with_modified_time_filter$
val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: two indents to follow the other examples.

.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+
// $example off:load_with_modified_time_filter$
}

private def runBasicDataSourceExample(spark: SparkSession): Unit = {
Expand Down
Loading