Skip to content

Commit 5e28e95

Browse files
sadikovicloud-fan
authored andcommitted
[SPARK-48649][SQL] Add "ignoreInvalidPartitionPaths" and "spark.sql.files.ignoreInvalidPartitionPaths" configs to allow ignoring invalid partition paths
### What changes were proposed in this pull request? This PR adds a new data source config `ignoreInvalidPartitionPaths` and SQL session configuration flag `spark.sql.files.ignoreInvalidPartitionPaths` to control the behaviour of skipping invalid partition paths (base paths). When the config is enabled, it allows skipping invalid paths such as: ``` table/ invalid/... part=1/... part=2/... part=3/... ``` In this case, `table/invalid` path will be ignored. Data source option takes precedence over the SQL config so with the code: ```scala spark.conf.set("spark.sql.files.ignoreInvalidPartitionPaths", "false") spark.read.format("parquet").option("ignoreInvalidPartitionPaths", "true").load(...) ``` the query would ignore invalid partitions, i.e. the flag will be enabled. The config is disabled by default. ### Why are the changes needed? Allows ignoring invalid partition paths that cannot be parsed. ### Does this PR introduce _any_ user-facing change? No. The added configs are disabled by default to have the exact same behaviour as before. ### How was this patch tested? I added a unit test for this. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47006 from sadikovi/SPARK-48649. Authored-by: Ivan Sadikov <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 6ee7c25 commit 5e28e95

File tree

6 files changed

+104
-15
lines changed

6 files changed

+104
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1977,6 +1977,14 @@ object SQLConf {
19771977
.booleanConf
19781978
.createWithDefault(false)
19791979

1980+
val IGNORE_INVALID_PARTITION_PATHS = buildConf("spark.sql.files.ignoreInvalidPartitionPaths")
1981+
.doc("Whether to ignore invalid partition paths that do not match <column>=<value>. When " +
1982+
"the option is enabled, table with two partition directories 'table/invalid' and " +
1983+
"'table/col=1' will only load the latter directory and ignore the invalid partition")
1984+
.version("4.0.0")
1985+
.booleanConf
1986+
.createWithDefault(false)
1987+
19801988
val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
19811989
.doc("Maximum number of records to write out to a single file. " +
19821990
"If this value is zero or negative, there is no limit.")
@@ -5275,6 +5283,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
52755283

52765284
def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
52775285

5286+
def ignoreInvalidPartitionPaths: Boolean = getConf(IGNORE_INVALID_PARTITION_PATHS)
5287+
52785288
def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)
52795289

52805290
def useCompression: Boolean = getConf(COMPRESS_CACHED)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndexOptions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
2222

2323
object FileIndexOptions extends DataSourceOptions {
2424
val IGNORE_MISSING_FILES = newOption(FileSourceOptions.IGNORE_MISSING_FILES)
25+
val IGNORE_INVALID_PARTITION_PATHS = newOption("ignoreInvalidPartitionPaths")
2526
val TIME_ZONE = newOption(DateTimeUtils.TIMEZONE_OPTION)
2627
val RECURSIVE_FILE_LOOKUP = newOption("recursiveFileLookup")
2728
val BASE_PATH_PARAM = newOption("basePath")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ abstract class PartitioningAwareFileIndex(
7070
caseInsensitiveMap.getOrElse(FileIndexOptions.RECURSIVE_FILE_LOOKUP, "false").toBoolean
7171
}
7272

73+
protected lazy val ignoreInvalidPartitionPaths: Boolean = {
74+
caseInsensitiveMap
75+
.get(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS)
76+
.map(_.toBoolean)
77+
.getOrElse(sparkSession.sessionState.conf.ignoreInvalidPartitionPaths)
78+
}
79+
7380
override def listFiles(
7481
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
7582
def isNonEmptyFile(f: FileStatus): Boolean = {
@@ -162,7 +169,8 @@ abstract class PartitioningAwareFileIndex(
162169
userSpecifiedSchema = userSpecifiedSchema,
163170
caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis,
164171
validatePartitionColumns = sparkSession.sessionState.conf.validatePartitionColumns,
165-
timeZoneId = timeZoneId)
172+
timeZoneId = timeZoneId,
173+
ignoreInvalidPartitionPaths = ignoreInvalidPartitionPaths)
166174
}
167175
}
168176

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,10 @@ object PartitioningUtils extends SQLConfHelper {
106106
userSpecifiedSchema: Option[StructType],
107107
caseSensitive: Boolean,
108108
validatePartitionColumns: Boolean,
109-
timeZoneId: String): PartitionSpec = {
109+
timeZoneId: String,
110+
ignoreInvalidPartitionPaths: Boolean): PartitionSpec = {
110111
parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, caseSensitive,
111-
validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId))
112+
validatePartitionColumns, DateTimeUtils.getZoneId(timeZoneId), ignoreInvalidPartitionPaths)
112113
}
113114

114115
private[datasources] def parsePartitions(
@@ -118,7 +119,8 @@ object PartitioningUtils extends SQLConfHelper {
118119
userSpecifiedSchema: Option[StructType],
119120
caseSensitive: Boolean,
120121
validatePartitionColumns: Boolean,
121-
zoneId: ZoneId): PartitionSpec = {
122+
zoneId: ZoneId,
123+
ignoreInvalidPartitionPaths: Boolean): PartitionSpec = {
122124
val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) {
123125
val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> f.dataType).toMap
124126
if (!caseSensitive) {
@@ -171,7 +173,7 @@ object PartitioningUtils extends SQLConfHelper {
171173
// TODO: Selective case sensitivity.
172174
val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
173175
assert(
174-
discoveredBasePaths.distinct.size == 1,
176+
ignoreInvalidPartitionPaths || discoveredBasePaths.distinct.size == 1,
175177
"Conflicting directory structures detected. Suspicious paths:\b" +
176178
discoveredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
177179
"If provided paths are partition directories, please set " +

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.mockito.Mockito.{mock, when}
3131

3232
import org.apache.spark.{SparkException, SparkRuntimeException}
3333
import org.apache.spark.metrics.source.HiveCatalogMetrics
34-
import org.apache.spark.sql.SparkSession
34+
import org.apache.spark.sql.{Row, SparkSession}
3535
import org.apache.spark.sql.catalyst.util._
3636
import org.apache.spark.sql.functions.col
3737
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -547,6 +547,66 @@ class FileIndexSuite extends SharedSparkSession {
547547
assert(fileIndex.leafFileStatuses.toSeq == statuses)
548548
}
549549

550+
test("SPARK-48649: Ignore invalid partitions") {
551+
// Table:
552+
// id part_col
553+
// 1 1
554+
// 2 2
555+
val df = spark.range(1, 3, 1, 2).toDF("id")
556+
.withColumn("part_col", col("id"))
557+
558+
withTempPath { directoryPath =>
559+
df.write
560+
.mode("overwrite")
561+
.format("parquet")
562+
.partitionBy("part_col")
563+
.save(directoryPath.getCanonicalPath)
564+
565+
// Rename one of the folders.
566+
new File(directoryPath, "part_col=1").renameTo(new File(directoryPath, "undefined"))
567+
568+
// By default, we expect the invalid path assertion to trigger.
569+
val ex = intercept[AssertionError] {
570+
spark.read
571+
.format("parquet")
572+
.load(directoryPath.getCanonicalPath)
573+
.collect()
574+
}
575+
assert(ex.getMessage.contains("Conflicting directory structures detected"))
576+
577+
// With the config enabled, we should only read the valid partition.
578+
withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") {
579+
assert(
580+
spark.read
581+
.format("parquet")
582+
.load(directoryPath.getCanonicalPath)
583+
.collect() === Seq(Row(2, 2)))
584+
}
585+
586+
// Data source option override takes precedence.
587+
withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "true") {
588+
val ex = intercept[AssertionError] {
589+
spark.read
590+
.format("parquet")
591+
.option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "false")
592+
.load(directoryPath.getCanonicalPath)
593+
.collect()
594+
}
595+
assert(ex.getMessage.contains("Conflicting directory structures detected"))
596+
}
597+
598+
// Data source option override takes precedence.
599+
withSQLConf(SQLConf.IGNORE_INVALID_PARTITION_PATHS.key -> "false") {
600+
assert(
601+
spark.read
602+
.format("parquet")
603+
.option(FileIndexOptions.IGNORE_INVALID_PARTITION_PATHS, "true")
604+
.load(directoryPath.getCanonicalPath)
605+
.collect() === Seq(Row(2, 2)))
606+
}
607+
}
608+
}
609+
550610
test("expire FileStatusCache if TTL is configured") {
551611
val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
552612
try {
@@ -585,9 +645,10 @@ class FileIndexSuite extends SharedSparkSession {
585645
}
586646

587647
test("SPARK-40667: validate FileIndex Options") {
588-
assert(FileIndexOptions.getAllOptions.size == 7)
648+
assert(FileIndexOptions.getAllOptions.size == 8)
589649
// Please add validation on any new FileIndex options here
590650
assert(FileIndexOptions.isValidOption("ignoreMissingFiles"))
651+
assert(FileIndexOptions.isValidOption("ignoreInvalidPartitionPaths"))
591652
assert(FileIndexOptions.isValidOption("timeZone"))
592653
assert(FileIndexOptions.isValidOption("recursiveFileLookup"))
593654
assert(FileIndexOptions.isValidOption("basePath"))

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ abstract class ParquetPartitionDiscoverySuite
112112
"hdfs://host:9000/path/a=10.5/b=hello")
113113

114114
var exception = intercept[AssertionError] {
115-
parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId)
115+
parsePartitions(
116+
paths.map(new Path(_)), true, Set.empty[Path], None, true, true, timeZoneId, false)
116117
}
117118
assert(exception.getMessage().contains("Conflicting directory structures detected"))
118119

@@ -129,7 +130,8 @@ abstract class ParquetPartitionDiscoverySuite
129130
None,
130131
true,
131132
true,
132-
timeZoneId)
133+
timeZoneId,
134+
false)
133135

134136
// Valid
135137
paths = Seq(
@@ -145,7 +147,8 @@ abstract class ParquetPartitionDiscoverySuite
145147
None,
146148
true,
147149
true,
148-
timeZoneId)
150+
timeZoneId,
151+
false)
149152

150153
// Valid
151154
paths = Seq(
@@ -161,7 +164,8 @@ abstract class ParquetPartitionDiscoverySuite
161164
None,
162165
true,
163166
true,
164-
timeZoneId)
167+
timeZoneId,
168+
false)
165169

166170
// Invalid
167171
paths = Seq(
@@ -177,7 +181,8 @@ abstract class ParquetPartitionDiscoverySuite
177181
None,
178182
true,
179183
true,
180-
timeZoneId)
184+
timeZoneId,
185+
false)
181186
}
182187
assert(exception.getMessage().contains("Conflicting directory structures detected"))
183188

@@ -200,7 +205,8 @@ abstract class ParquetPartitionDiscoverySuite
200205
None,
201206
true,
202207
true,
203-
timeZoneId)
208+
timeZoneId,
209+
false)
204210
}
205211
assert(exception.getMessage().contains("Conflicting directory structures detected"))
206212
}
@@ -296,7 +302,8 @@ abstract class ParquetPartitionDiscoverySuite
296302
None,
297303
true,
298304
true,
299-
timeZoneId)
305+
timeZoneId,
306+
false)
300307
assert(actualSpec.partitionColumns === spec.partitionColumns)
301308
assert(actualSpec.partitions.length === spec.partitions.length)
302309
actualSpec.partitions.zip(spec.partitions).foreach { case (actual, expected) =>
@@ -427,7 +434,7 @@ abstract class ParquetPartitionDiscoverySuite
427434
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
428435
val actualSpec =
429436
parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None,
430-
true, true, timeZoneId)
437+
true, true, timeZoneId, false)
431438
assert(actualSpec === spec)
432439
}
433440

0 commit comments

Comments
 (0)