diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 389e410234154..03ec106a16354 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -158,6 +158,13 @@ class SessionCatalog( tableRelationCache.invalidate(key) } + /** This method discards any cached table relation plans for the given table identifier. */ + def invalidateCachedTable(name: TableIdentifier): Unit = { + val dbName = formatDatabaseName(name.database.getOrElse(currentDb)) + val tableName = formatTableName(name.table) + invalidateCachedTable(QualifiedTableName(dbName, tableName)) + } + /** This method provides a way to invalidate all the cached plans. */ def invalidateAllCachedTables(): Unit = { tableRelationCache.invalidateAll() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 4496b0cdee2f7..3f44049e3bf3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -278,6 +278,7 @@ case class AlterTableSetPropertiesCommand( properties = table.properties ++ properties, comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment)) catalog.alterTable(newTable) + catalog.invalidateCachedTable(tableName) Seq.empty[Row] } @@ -316,6 +317,7 @@ case class AlterTableUnsetPropertiesCommand( val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } val newTable = table.copy(properties = newProperties, comment = tableComment) catalog.alterTable(newTable) + catalog.invalidateCachedTable(tableName) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index 470c6a342b4dd..fd9e63ec3ad37 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -106,4 +106,28 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton } } } + + test("SPARK-37098: Alter table properties should invalidate cache") { + // specify the compression in case we change it in future + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withTempPath { dir => + withTable("t") { + sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'") + // cache table metadata + sql("SELECT * FROM t") + sql("ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='gzip')") + sql("INSERT INTO TABLE t values(1)") + val files1 = dir.listFiles().filter(_.getName.endsWith("gz.parquet")) + assert(files1.length == 1) + + // cache table metadata again + sql("SELECT * FROM t") + sql("ALTER TABLE t UNSET TBLPROPERTIES('parquet.compression')") + sql("INSERT INTO TABLE t values(1)") + val files2 = dir.listFiles().filter(_.getName.endsWith("snappy.parquet")) + assert(files2.length == 1) + } + } + } + } }