diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 3adec2f79073..7bfe42931f4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -221,7 +221,15 @@ abstract class PartitioningAwareFileIndex( if (!fs.isDirectory(userDefinedBasePath)) { throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") } - Set(fs.makeQualified(userDefinedBasePath)) + val qualifiedBasePath = fs.makeQualified(userDefinedBasePath) + val qualifiedBasePathStr = qualifiedBasePath.toString + rootPaths + .find(!fs.makeQualified(_).toString.startsWith(qualifiedBasePathStr)) + .foreach { rp => + throw new IllegalArgumentException( + s"Wrong basePath $userDefinedBasePath for the root path: $rp") + } + Set(qualifiedBasePath) case None => rootPaths.map { path => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index a7a2349a1dfb..553773e2555c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -352,6 +352,26 @@ class FileIndexSuite extends SharedSparkSession { "driver side must not be negative")) } + test ("SPARK-29537: throw exception when user defined a wrong base path") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val wrongBasePath = new File(dir, "unknown") + // basePath must be a directory + wrongBasePath.mkdir() + val parameters = Map("basePath" -> wrongBasePath.getCanonicalPath) + val fileIndex = new InMemoryFileIndex(spark, Seq(path), parameters, None) + val msg = intercept[IllegalArgumentException] { + // trigger inferPartitioning() + fileIndex.partitionSpec() + }.getMessage + assert(msg === s"Wrong basePath ${wrongBasePath.getCanonicalPath} for the root path: $path") + } + } + test("refresh for InMemoryFileIndex with FileStatusCache") { withTempDir { dir => val fileStatusCache = FileStatusCache.getOrCreate(spark) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index cef0e5ab4756..55a60940a775 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -234,6 +234,21 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with assert(DataSourceUtils.decodePartitioningColumns(partColumns) === Seq("col1", "col2")) } + test ("SPARK-29537: throw exception when user defined a wrong base path") { + withTempPath { p => + val path = new Path(p.toURI).toString + Seq((1, 1), (2, 2)).toDF("c1", "c2") + .write.partitionBy("c1").mode(SaveMode.Overwrite).parquet(path) + val wrongBasePath = new File(p, "unknown") + // basePath must be a directory + wrongBasePath.mkdir() + val msg = intercept[IllegalArgumentException] { + spark.read.option("basePath", wrongBasePath.getCanonicalPath).parquet(path) + }.getMessage + assert(msg === s"Wrong basePath ${wrongBasePath.getCanonicalPath} for the root path: $path") + } + } + test("save mode") { spark.range(10).write .format("org.apache.spark.sql.test")