From 61a476fe1f90b2e4c8ddbf82024f8116d737d2ef Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Tue, 17 Apr 2018 14:53:59 +0200 Subject: [PATCH 1/8] Adding configurable max buckets --- .../sql/catalyst/catalog/interface.scala | 8 +++- .../apache/spark/sql/internal/SQLConf.scala | 7 ++++ .../sources/CreateTableAsSelectSuite.scala | 40 ++++++++++++++----- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f3e67dc4e975c..b3a80e15139c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation @@ -164,9 +165,12 @@ case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], sortColumnNames: Seq[String]) { - if (numBuckets <= 0 || numBuckets >= 100000) { + def conf: SQLConf = SQLConf.get + + if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) { throw new AnalysisException( - s"Number of buckets should be greater than 0 but less than 100000. Got `$numBuckets`") + s"Number of buckets should be greater than 0 but less than spark.sql.bucketing.maxBuckets " + + s"(`${conf.bucketingMaxBuckets}`). Got `$numBuckets`") } override def toString: String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0dc47bfe075d0..4fb463fb158ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -580,6 +580,11 @@ object SQLConf { .booleanConf .createWithDefault(true) + val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.bucketing.maxBuckets") + .doc("The maximum number of buckets allowed. Defaults to 100000") + .longConf + .createWithDefault(100000) + val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .doc("When false, we will throw an error if a query contains a cartesian product without " + "explicit CROSS JOIN syntax.") @@ -1490,6 +1495,8 @@ class SQLConf extends Serializable with Logging { def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) + def bucketingMaxBuckets: Long = getConf(SQLConf.BUCKETING_MAX_BUCKETS) + def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 916a01ee0ca8e..8a44a06557b21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -224,20 +224,40 @@ class CreateTableAsSelectSuite } test("create table using as select - with invalid number of buckets") { + def createTableSql(numBuckets: Int): String = + s""" + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '${path.toURI}') + |CLUSTERED BY (a) SORTED BY (b) INTO $numBuckets BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin + withTable("t") { - Seq(0, 100000).foreach(numBuckets => { + Seq(0, 100001).foreach(numBuckets => { val e = intercept[AnalysisException] { - sql( - s""" - |CREATE TABLE t USING PARQUET - |OPTIONS (PATH '${path.toURI}') - |CLUSTERED BY (a) SORTED BY (b) INTO $numBuckets BUCKETS - |AS SELECT 1 AS a, 2 AS b - """.stripMargin - ) + sql(createTableSql(numBuckets)) }.getMessage - assert(e.contains("Number of buckets should be greater than 0 but less than 100000")) + assert(e.contains("Number of buckets should be greater than 0 but less than " + + "spark.sql.bucketing.maxBuckets")) }) + + // Reconfigure max + val maxNrBuckets: Int = 200000 + val catalog = spark.sessionState.catalog + withSQLConf("spark.sql.bucketing.maxBuckets" -> maxNrBuckets.toString) { + Seq(100001, maxNrBuckets).foreach(numBuckets => { + sql(createTableSql(numBuckets)) + val table = catalog.getTableMetadata(TableIdentifier("t")) + assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), Seq("b")))) + }) + + // Test new limit not respected + val e = intercept[AnalysisException] { + sql(createTableSql(maxNrBuckets + 1)) + }.getMessage + assert(e.contains("Number of buckets should be greater than 0 but less than " + + "spark.sql.bucketing.maxBuckets")) + } } } From a8846568db9eb63095c9dc55e8b71906ff95e6b0 Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Tue, 17 Apr 2018 17:22:18 +0200 Subject: [PATCH 2/8] fixing tests in spark.sql --- .../sql/sources/BucketedWriteSuite.scala | 39 +++++++++++++++---- .../sources/CreateTableAsSelectSuite.scala | 20 ++++++---- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 93f3efe2ccc4a..733dbc6751bc3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.sources import java.io.File -import java.net.URI import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.expressions.UnsafeProjection @@ -27,7 +26,9 @@ import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.BucketSpec class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext { protected override def beforeAll(): Unit = { @@ -48,16 +49,40 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt")) } - test("numBuckets be greater than 0 but less than 100000") { + test("numBuckets be greater than 0 but less than default bucketing.maxBuckets (100000)") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") - Seq(-1, 0, 100000).foreach(numBuckets => { - val e = intercept[AnalysisException](df.write.bucketBy(numBuckets, "i").saveAsTable("tt")) - assert( - e.getMessage.contains("Number of buckets should be greater than 0 but less than 100000")) + Seq(-1, 0, 100001).foreach(numBuckets => { + val e = intercept[AnalysisException]( + df.write.bucketBy(numBuckets, "i").saveAsTable("tt") + ).getMessage + assert(e.contains("Number of buckets should be greater than 0 but less than")) }) } + test("numBuckets be greater than 0 but less than overridden bucketing.maxBuckets (200000)") { + val maxNrBuckets: Int = 200000 + val catalog = spark.sessionState.catalog + withSQLConf("spark.sql.bucketing.maxBuckets" -> maxNrBuckets.toString) { + // Shall be allowed + Seq(100001, maxNrBuckets).foreach(numBuckets => { + withTable("t") { + df.write.bucketBy(numBuckets, "i").saveAsTable("t") + val table = catalog.getTableMetadata(TableIdentifier("t")) + assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("i"), Seq()))) + } + }) + + // Test new limit not respected + withTable("t") { + val e = intercept[AnalysisException]( + df.write.bucketBy(maxNrBuckets + 1, "i").saveAsTable("t") + ).getMessage + assert(e.contains("Number of buckets should be greater than 0 but less than")) + } + } + } + test("specify sorting columns without bucketing columns") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 8a44a06557b21..70da4b8d42e3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -240,18 +240,24 @@ class CreateTableAsSelectSuite assert(e.contains("Number of buckets should be greater than 0 but less than " + "spark.sql.bucketing.maxBuckets")) }) + } + + // Reconfigure max + val maxNrBuckets: Int = 200000 + val catalog = spark.sessionState.catalog + withSQLConf("spark.sql.bucketing.maxBuckets" -> maxNrBuckets.toString) { - // Reconfigure max - val maxNrBuckets: Int = 200000 - val catalog = spark.sessionState.catalog - withSQLConf("spark.sql.bucketing.maxBuckets" -> maxNrBuckets.toString) { - Seq(100001, maxNrBuckets).foreach(numBuckets => { + // Shall be allowed + Seq(100001, maxNrBuckets).foreach(numBuckets => { + withTable("t") { sql(createTableSql(numBuckets)) val table = catalog.getTableMetadata(TableIdentifier("t")) assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), Seq("b")))) - }) + } + }) - // Test new limit not respected + // Test new limit not respected + withTable("t") { val e = intercept[AnalysisException] { sql(createTableSql(maxNrBuckets + 1)) }.getMessage From aad106870e8af2ed2a9b637da338a79ffa63bb97 Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Thu, 2 Aug 2018 11:09:50 +0200 Subject: [PATCH 3/8] fix import order --- .../org/apache/spark/sql/catalyst/catalog/interface.scala | 2 +- .../org/apache/spark/sql/sources/BucketedWriteSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index b3a80e15139c5..f38ced6bc7424 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -24,7 +24,6 @@ import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.internal.Logging -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation @@ -33,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 733dbc6751bc3..8e06c6c4195a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -20,15 +20,15 @@ package org.apache.spark.sql.sources import java.io.File import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext { protected override def beforeAll(): Unit = { From 8ddc4ebed9623d571eb778b56a542f91db43f743 Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Mon, 6 Aug 2018 10:49:50 +0200 Subject: [PATCH 4/8] long numBuckets --- .../scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f38ced6bc7424..cf9ca72720e47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -162,7 +162,7 @@ case class CatalogTablePartition( * @param sortColumnNames the names of the columns that used to sort data in each bucket. */ case class BucketSpec( - numBuckets: Int, + numBuckets: Long, bucketColumnNames: Seq[String], sortColumnNames: Seq[String]) { def conf: SQLConf = SQLConf.get From e517f66481f428a6c26888293ba802fccd22091b Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Mon, 6 Aug 2018 11:06:42 +0200 Subject: [PATCH 5/8] Use int instead for both numBuckets and config --- .../scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 2 +- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index cf9ca72720e47..f38ced6bc7424 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -162,7 +162,7 @@ case class CatalogTablePartition( * @param sortColumnNames the names of the columns that used to sort data in each bucket. */ case class BucketSpec( - numBuckets: Long, + numBuckets: Int, bucketColumnNames: Seq[String], sortColumnNames: Seq[String]) { def conf: SQLConf = SQLConf.get diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4fb463fb158ee..4c4e5c2e2e3f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -582,7 +582,7 @@ object SQLConf { val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.bucketing.maxBuckets") .doc("The maximum number of buckets allowed. Defaults to 100000") - .longConf + .intConf .createWithDefault(100000) val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") From 628b4e316f0988eecd8909f667bc7c6e804cea9f Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Thu, 9 Aug 2018 14:44:29 +0200 Subject: [PATCH 6/8] type fix, improved tests --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../sql/sources/BucketedWriteSuite.scala | 22 +++++----- .../sources/CreateTableAsSelectSuite.scala | 41 +++++++++++-------- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4c4e5c2e2e3f5..831289a6fa9f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1495,7 +1495,7 @@ class SQLConf extends Serializable with Logging { def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) - def bucketingMaxBuckets: Long = getConf(SQLConf.BUCKETING_MAX_BUCKETS) + def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS) def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 8e06c6c4195a0..eaea64f9dbed6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -49,22 +49,22 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt")) } - test("numBuckets be greater than 0 but less than default bucketing.maxBuckets (100000)") { + test("numBuckets be greater than 0 but less/eq than default bucketing.maxBuckets (100000)") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") Seq(-1, 0, 100001).foreach(numBuckets => { - val e = intercept[AnalysisException]( - df.write.bucketBy(numBuckets, "i").saveAsTable("tt") - ).getMessage - assert(e.contains("Number of buckets should be greater than 0 but less than")) + val e = intercept[AnalysisException](df.write.bucketBy(numBuckets, "i").saveAsTable("tt")) + assert( + e.getMessage.contains("Number of buckets should be greater than 0 but less than")) }) } - test("numBuckets be greater than 0 but less than overridden bucketing.maxBuckets (200000)") { + test("numBuckets be greater than 0 but less/eq than overridden bucketing.maxBuckets (200000)") { val maxNrBuckets: Int = 200000 val catalog = spark.sessionState.catalog + withSQLConf("spark.sql.bucketing.maxBuckets" -> maxNrBuckets.toString) { - // Shall be allowed + // within the new limit Seq(100001, maxNrBuckets).foreach(numBuckets => { withTable("t") { df.write.bucketBy(numBuckets, "i").saveAsTable("t") @@ -73,12 +73,12 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { } }) - // Test new limit not respected + // over the new limit withTable("t") { val e = intercept[AnalysisException]( - df.write.bucketBy(maxNrBuckets + 1, "i").saveAsTable("t") - ).getMessage - assert(e.contains("Number of buckets should be greater than 0 but less than")) + df.write.bucketBy(maxNrBuckets + 1, "i").saveAsTable("t")) + assert( + e.getMessage.contains("Number of buckets should be greater than 0 but less than")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 70da4b8d42e3c..e35c37db61221 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -224,6 +224,24 @@ class CreateTableAsSelectSuite } test("create table using as select - with invalid number of buckets") { + withTable("t") { + Seq(0, 100001).foreach(numBuckets => { + val e = intercept[AnalysisException] { + sql( + s""" + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '${path.toURI}') + |CLUSTERED BY (a) SORTED BY (b) INTO $numBuckets BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin + ) + }.getMessage + assert(e.contains("Number of buckets should be greater than 0 but less than 100000")) + }) + } + } + + test("create table using as select - with overriden max number of buckets") { def createTableSql(numBuckets: Int): String = s""" |CREATE TABLE t USING PARQUET @@ -232,22 +250,11 @@ class CreateTableAsSelectSuite |AS SELECT 1 AS a, 2 AS b """.stripMargin - withTable("t") { - Seq(0, 100001).foreach(numBuckets => { - val e = intercept[AnalysisException] { - sql(createTableSql(numBuckets)) - }.getMessage - assert(e.contains("Number of buckets should be greater than 0 but less than " + - "spark.sql.bucketing.maxBuckets")) - }) - } - - // Reconfigure max val maxNrBuckets: Int = 200000 val catalog = spark.sessionState.catalog withSQLConf("spark.sql.bucketing.maxBuckets" -> maxNrBuckets.toString) { - // Shall be allowed + // Within the new limit Seq(100001, maxNrBuckets).foreach(numBuckets => { withTable("t") { sql(createTableSql(numBuckets)) @@ -256,13 +263,11 @@ class CreateTableAsSelectSuite } }) - // Test new limit not respected + // Over the new limit withTable("t") { - val e = intercept[AnalysisException] { - sql(createTableSql(maxNrBuckets + 1)) - }.getMessage - assert(e.contains("Number of buckets should be greater than 0 but less than " + - "spark.sql.bucketing.maxBuckets")) + val e = intercept[AnalysisException](sql(createTableSql(maxNrBuckets + 1))) + assert( + e.getMessage.contains("Number of buckets should be greater than 0 but less than ")) } } } From 6049059cd1ea2969fbed271ef2002a7df209aa1d Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Thu, 9 Aug 2018 18:01:34 +0200 Subject: [PATCH 7/8] fix --- .../org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index e35c37db61221..32af64ea4472c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -236,7 +236,7 @@ class CreateTableAsSelectSuite """.stripMargin ) }.getMessage - assert(e.contains("Number of buckets should be greater than 0 but less than 100000")) + assert(e.contains("Number of buckets should be greater than 0 but less than")) }) } } From ebd926530c1d8b2f515a4a233f5963eafc17e460 Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Fri, 10 Aug 2018 01:12:14 +0200 Subject: [PATCH 8/8] maxBuckets config check and rename --- .../org/apache/spark/sql/catalyst/catalog/interface.scala | 2 +- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- .../org/apache/spark/sql/sources/BucketedWriteSuite.scala | 2 +- .../apache/spark/sql/sources/CreateTableAsSelectSuite.scala | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f38ced6bc7424..7bef90c25a356 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -169,7 +169,7 @@ case class BucketSpec( if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) { throw new AnalysisException( - s"Number of buckets should be greater than 0 but less than spark.sql.bucketing.maxBuckets " + + s"Number of buckets should be greater than 0 but less than bucketing.maxBuckets " + s"(`${conf.bucketingMaxBuckets}`). Got `$numBuckets`") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 831289a6fa9f8..74dbc0dfabd12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -580,9 +580,10 @@ object SQLConf { .booleanConf .createWithDefault(true) - val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.bucketing.maxBuckets") + val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets") .doc("The maximum number of buckets allowed. Defaults to 100000") .intConf + .checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be larger than 0") .createWithDefault(100000) val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index eaea64f9dbed6..ccc350d9e2eca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -63,7 +63,7 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { val maxNrBuckets: Int = 200000 val catalog = spark.sessionState.catalog - withSQLConf("spark.sql.bucketing.maxBuckets" -> maxNrBuckets.toString) { + withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> maxNrBuckets.toString) { // within the new limit Seq(100001, maxNrBuckets).foreach(numBuckets => { withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 32af64ea4472c..d46029e84433c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -252,7 +252,7 @@ class CreateTableAsSelectSuite val maxNrBuckets: Int = 200000 val catalog = spark.sessionState.catalog - withSQLConf("spark.sql.bucketing.maxBuckets" -> maxNrBuckets.toString) { + withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> maxNrBuckets.toString) { // Within the new limit Seq(100001, maxNrBuckets).foreach(numBuckets => {