Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -164,9 +165,12 @@ case class BucketSpec(
numBuckets: Int,
bucketColumnNames: Seq[String],
sortColumnNames: Seq[String]) {
if (numBuckets <= 0 || numBuckets >= 100000) {
def conf: SQLConf = SQLConf.get

if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the condition is changed from > to >=, there is inconsistent between the condition and the error message.

If this condition is true, the message is like ... but less than or equal to bucketing.maxBuckets ....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you submit a followup PR to address this message issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this PR first.

throw new AnalysisException(
s"Number of buckets should be greater than 0 but less than 100000. Got `$numBuckets`")
s"Number of buckets should be greater than 0 but less than bucketing.maxBuckets " +
s"(`${conf.bucketingMaxBuckets}`). Got `$numBuckets`")
}

override def toString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
.doc("The maximum number of buckets allowed. Defaults to 100000")
.intConf
.checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be larger than 0")
.createWithDefault(100000)

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.doc("When false, we will throw an error if a query contains a cartesian product without " +
"explicit CROSS JOIN syntax.")
Expand Down Expand Up @@ -1490,6 +1496,8 @@ class SQLConf extends Serializable with Logging {

def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)

def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.spark.sql.sources

import java.io.File
import java.net.URI

import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.datasources.BucketingUtils
Expand Down Expand Up @@ -48,16 +49,40 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
}

test("numBuckets be greater than 0 but less than 100000") {
test("numBuckets be greater than 0 but less/eq than default bucketing.maxBuckets (100000)") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")

Seq(-1, 0, 100000).foreach(numBuckets => {
Seq(-1, 0, 100001).foreach(numBuckets => {
val e = intercept[AnalysisException](df.write.bucketBy(numBuckets, "i").saveAsTable("tt"))
assert(
e.getMessage.contains("Number of buckets should be greater than 0 but less than 100000"))
e.getMessage.contains("Number of buckets should be greater than 0 but less than"))
})
}

test("numBuckets be greater than 0 but less/eq than overridden bucketing.maxBuckets (200000)") {
val maxNrBuckets: Int = 200000
val catalog = spark.sessionState.catalog

withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> maxNrBuckets.toString) {
// within the new limit
Seq(100001, maxNrBuckets).foreach(numBuckets => {
withTable("t") {
df.write.bucketBy(numBuckets, "i").saveAsTable("t")
val table = catalog.getTableMetadata(TableIdentifier("t"))
assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("i"), Seq())))
}
})

// over the new limit
withTable("t") {
val e = intercept[AnalysisException](
df.write.bucketBy(maxNrBuckets + 1, "i").saveAsTable("t"))
assert(
e.getMessage.contains("Number of buckets should be greater than 0 but less than"))
}
}
}

test("specify sorting columns without bucketing columns") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class CreateTableAsSelectSuite

test("create table using as select - with invalid number of buckets") {
withTable("t") {
Seq(0, 100000).foreach(numBuckets => {
Seq(0, 100001).foreach(numBuckets => {
val e = intercept[AnalysisException] {
sql(
s"""
Expand All @@ -236,11 +236,42 @@ class CreateTableAsSelectSuite
""".stripMargin
)
}.getMessage
assert(e.contains("Number of buckets should be greater than 0 but less than 100000"))
assert(e.contains("Number of buckets should be greater than 0 but less than"))
})
}
}

test("create table using as select - with overriden max number of buckets") {
def createTableSql(numBuckets: Int): String =
s"""
|CREATE TABLE t USING PARQUET
|OPTIONS (PATH '${path.toURI}')
|CLUSTERED BY (a) SORTED BY (b) INTO $numBuckets BUCKETS
|AS SELECT 1 AS a, 2 AS b
""".stripMargin

val maxNrBuckets: Int = 200000
val catalog = spark.sessionState.catalog
withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> maxNrBuckets.toString) {

// Within the new limit
Seq(100001, maxNrBuckets).foreach(numBuckets => {
withTable("t") {
sql(createTableSql(numBuckets))
val table = catalog.getTableMetadata(TableIdentifier("t"))
assert(table.bucketSpec == Option(BucketSpec(numBuckets, Seq("a"), Seq("b"))))
}
})

// Over the new limit
withTable("t") {
val e = intercept[AnalysisException](sql(createTableSql(maxNrBuckets + 1)))
assert(
e.getMessage.contains("Number of buckets should be greater than 0 but less than "))
}
}
}

test("SPARK-17409: CTAS of decimal calculation") {
withTable("tab2") {
withTempView("tab1") {
Expand Down