From 95e30cc9803d579ff404472d4a22937b4fef2a11 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 10 May 2021 14:28:36 +0800 Subject: [PATCH 1/3] [SPARK-35360][SQL] RepairTableCommand respect `spark.sql.addPartitionInBatch.size` --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 ++ .../main/scala/org/apache/spark/sql/execution/command/ddl.scala | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fb43db5ec57d..1f0b05b53f2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3168,6 +3168,8 @@ object SQLConf { .intConf .createWithDefault(0) + val ADD_PARTITION_BATCH_SIZE + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index c7456cd9d205..0876b5f05876 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -771,7 +771,7 @@ case class RepairTableCommand( // Hive metastore may not have enough memory to handle millions of partitions in single RPC, // we should split them into smaller batches. Since Hive client is not thread safe, we cannot // do this in parallel. - val batchSize = 100 + val batchSize = spark.conf.get(SQLConf.ADD_PARTITION_BATCH_SIZE) partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch => val now = MILLISECONDS.toSeconds(System.currentTimeMillis()) val parts = batch.map { case (spec, location) => From c413759b97efc62e2a354059678b1636cd70cf6d Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 10 May 2021 14:28:55 +0800 Subject: [PATCH 2/3] Update SQLConf.scala --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1f0b05b53f2f..fb43db5ec57d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3168,8 +3168,6 @@ object SQLConf { .intConf .createWithDefault(0) - val ADD_PARTITION_BATCH_SIZE - /** * Holds information about keys that have been deprecated. * From 918ddcd435bb5270f4b62e021befacff7d386b82 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 10 May 2021 15:07:53 +0800 Subject: [PATCH 3/3] Update SQLConf.scala --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fb43db5ec57d..ce4a9cae2d17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2894,8 +2894,9 @@ object SQLConf { buildConf("spark.sql.addPartitionInBatch.size") .internal() .doc("The number of partitions to be handled in one turn when use " + - "`AlterTableAddPartitionCommand` to add partitions into table. The smaller " + - "batch size is, the less memory is required for the real handler, e.g. Hive Metastore.") + "`AlterTableAddPartitionCommand` or `RepairTableCommand` to add partitions into table. " + + "The smaller batch size is, the less memory is required for the real handler, e.g. " + + "Hive Metastore.") .version("3.0.0") .intConf .checkValue(_ > 0, "The value of spark.sql.addPartitionInBatch.size must be positive")