Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
19b9a68
Stub implementation and a test
MaxGekk Sep 15, 2018
90832f9
Saving all plans to file
MaxGekk Sep 15, 2018
673ae56
Output attributes
MaxGekk Sep 15, 2018
fbde812
Output whole stage codegen
MaxGekk Sep 15, 2018
dca19d3
Reusing codegenToOutputStream
MaxGekk Sep 15, 2018
66351a0
Code de-duplication
MaxGekk Sep 15, 2018
2ee75bc
Do not truncate fields
MaxGekk Sep 15, 2018
9b2a3e6
Moving the test up because previous one leaved a garbage
MaxGekk Sep 15, 2018
51c196e
Removing string interpolation in the test
MaxGekk Sep 16, 2018
c66a616
Getting Hadoop's conf from session state
MaxGekk Sep 16, 2018
ed57c8e
Using java.io.Writer
MaxGekk Sep 16, 2018
ce2c086
Using java.io.Writer
MaxGekk Sep 16, 2018
37326e2
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 17, 2018
7abf14c
Using StringWriter
MaxGekk Sep 17, 2018
d1188e3
Removing unneeded buffering and flushing
MaxGekk Sep 17, 2018
71ff7d1
Code de-duplication among toString and toFile
MaxGekk Sep 17, 2018
ac94a86
Using StringBuilderWriter and fix tests
MaxGekk Sep 18, 2018
f2906d9
Do not change maxFields so far
MaxGekk Sep 18, 2018
d3fede1
Added tests
MaxGekk Sep 18, 2018
c153838
Using StringBuilderWriter in treeString
MaxGekk Sep 18, 2018
6fe08bf
Propagating numFields to truncatedString
MaxGekk Sep 18, 2018
3324927
Bug fix + test
MaxGekk Sep 18, 2018
d63f862
Bug fix: passing maxFields to simpleString
MaxGekk Sep 18, 2018
24dbbba
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 18, 2018
deb5315
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 24, 2018
7fd88d3
Passing parameters by names
MaxGekk Sep 24, 2018
732707a
Getting file system from file path
MaxGekk Sep 24, 2018
3a133ae
Using the buffered writer
MaxGekk Sep 24, 2018
7452b82
Removing default value for maxFields in simpleString
MaxGekk Sep 24, 2018
4ec5732
Removing unnecessary signature of truncatedString
MaxGekk Sep 24, 2018
be16175
Minor improvement - passing maxFields by name
MaxGekk Sep 25, 2018
90ff7b5
Moving truncatedString out of core
MaxGekk Sep 25, 2018
2ba6624
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 26, 2018
1fcfc23
Adding SQL config to control maximum number of fields
MaxGekk Sep 27, 2018
2bf11fc
Adding Spark Core config to control maximum number of fields
MaxGekk Sep 27, 2018
5e2d3a6
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Sep 27, 2018
bd331c5
Revert indentations
MaxGekk Sep 27, 2018
3cf564b
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Oct 11, 2018
2375064
Making writeOrError multi-line
MaxGekk Oct 11, 2018
8befa13
Removing core config: spark.debug.maxToStringFields
MaxGekk Oct 11, 2018
28795c7
Improving description of spark.sql.debug.maxToStringFields
MaxGekk Oct 11, 2018
a246db4
Limit number of fields in structs too
MaxGekk Oct 11, 2018
d4da29b
Description of simpleString of TreeNode.
MaxGekk Oct 11, 2018
41b57bc
Added description of maxFields param of truncatedString
MaxGekk Oct 11, 2018
28cce2e
Fix typo
MaxGekk Oct 11, 2018
e4567cb
Passing maxField
MaxGekk Oct 11, 2018
9b72104
Fix for the warning
MaxGekk Oct 11, 2018
9f1d11d
Merge branch 'master' into plan-to-file
MaxGekk Oct 12, 2018
76f4248
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Oct 31, 2018
f7de26d
Merge remote-tracking branch 'origin/master' into plan-to-file
MaxGekk Nov 5, 2018
bda6ac2
Merge branch 'master' into plan-to-file
MaxGekk Nov 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding SQL config to control maximum number of fields
  • Loading branch information
MaxGekk committed Sep 27, 2018
commit 1fcfc23e308d2488e7d300486fca9d97758e7337
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{NumericType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -178,11 +179,14 @@ package object util extends Logging {
val DEFAULT_MAX_TO_STRING_FIELDS = 25

private[spark] def maxNumToStringFields = {
if (SparkEnv.get != null) {
val legacyLimit = if (SparkEnv.get != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for context why do you want to retain the legacy behavior? It is probably fine to break it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking into account that old config wasn't well documented and could use mostly in debugging, I think we can remove it in Spark 3.0. Initially the PR targeted to Spark 2.4, in the minor version removing a public config can break user apps potentially. If you are ok to remove it, I will do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed old core config and leaved only SQL config

SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is moved to sql, shall we name it as spark.sql.debug.maxToStringFields?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming it can potentially break user's apps. Can we do that in current minor version?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have both deprecated old config and new sql config, then remove old config in next major version?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added new SQL config and I am testing it for now. My concern is how to combine old config, new config and passed parameter. I am going to take maximum of them like:

 private[spark] def maxNumToStringFields = {
    val legacyLimit = if (SparkEnv.get != null) {
      SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS)
    } else {
      DEFAULT_MAX_TO_STRING_FIELDS
    }
    val sqlConfLimit = SQLConf.get.maxToStringFields

    Math.max(sqlConfLimit, legacyLimit)
  }

} else {
DEFAULT_MAX_TO_STRING_FIELDS
}
val sqlConfLimit = SQLConf.get.maxToStringFields

Math.max(sqlConfLimit, legacyLimit)
}

/** Whether we have warned about plan string truncation yet. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,16 +331,16 @@ object SQLConf {

val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")
.booleanConf
.createWithDefault(false)

val PARQUET_SCHEMA_RESPECT_SUMMARIES = buildConf("spark.sql.parquet.respectSummaryFiles")
.doc("When true, we make assumption that all part-files of Parquet are consistent with " +
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
"false, which is the default, we will merge all part-files. This should be considered " +
"as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
"false, which is the default, we will merge all part-files. This should be considered " +
"as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -418,9 +418,9 @@ object SQLConf {
.doc("If true, enables Parquet filter push-down optimization for Timestamp. " +
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is " +
"enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.")
.internal()
.booleanConf
.createWithDefault(true)
.internal()
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.decimal")
Expand All @@ -432,11 +432,11 @@ object SQLConf {

val PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED =
buildConf("spark.sql.parquet.filterPushdown.string.startsWith")
.doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.booleanConf
.createWithDefault(true)
.doc("If true, enables Parquet filter push-down optimization for string startsWith function. " +
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
.internal()
.booleanConf
.createWithDefault(true)

val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
Expand Down Expand Up @@ -527,34 +527,34 @@ object SQLConf {

val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
.doc("When true, check all the partition paths under the table\'s root directory " +
"when reading data stored in HDFS. This configuration will be deprecated in the future " +
"releases and replaced by spark.files.ignoreMissingFiles.")
"when reading data stored in HDFS. This configuration will be deprecated in the future " +
"releases and replaced by spark.files.ignoreMissingFiles.")
.booleanConf
.createWithDefault(false)

val HIVE_METASTORE_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastorePartitionPruning")
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
"unmatching partitions can be eliminated earlier. This only affects Hive tables " +
"not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " +
"HiveUtils.CONVERT_METASTORE_ORC for more information).")
"unmatching partitions can be eliminated earlier. This only affects Hive tables " +
"not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " +
"HiveUtils.CONVERT_METASTORE_ORC for more information).")
.booleanConf
.createWithDefault(true)

val HIVE_MANAGE_FILESOURCE_PARTITIONS =
buildConf("spark.sql.hive.manageFilesourcePartitions")
.doc("When true, enable metastore partition management for file source tables as well. " +
"This includes both datasource and converted Hive tables. When partition management " +
"is enabled, datasource tables store partition in the Hive metastore, and use the " +
"metastore to prune partitions during query planning.")
"This includes both datasource and converted Hive tables. When partition management " +
"is enabled, datasource tables store partition in the Hive metastore, and use the " +
"metastore to prune partitions during query planning.")
.booleanConf
.createWithDefault(true)

val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
buildConf("spark.sql.hive.filesourcePartitionFileCacheSize")
.doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
"a cache that can use up to specified num bytes for file metadata. This conf only " +
"has an effect when hive filesource partition management is enabled.")
"a cache that can use up to specified num bytes for file metadata. This conf only " +
"has an effect when hive filesource partition management is enabled.")
.longConf
.createWithDefault(250 * 1024 * 1024)

Expand Down Expand Up @@ -643,12 +643,12 @@ object SQLConf {
.createWithDefault(false)

val GATHER_FASTSTAT = buildConf("spark.sql.hive.gatherFastStats")
.internal()
.doc("When true, fast stats (number of files and total size of all files) will be gathered" +
" in parallel while repairing table partitions to avoid the sequential listing in Hive" +
" metastore.")
.booleanConf
.createWithDefault(true)
.internal()
.doc("When true, fast stats (number of files and total size of all files) will be gathered" +
" in parallel while repairing table partitions to avoid the sequential listing in Hive" +
" metastore.")
.booleanConf
.createWithDefault(true)

val PARTITION_COLUMN_TYPE_INFERENCE =
buildConf("spark.sql.sources.partitionColumnTypeInference.enabled")
Expand All @@ -669,13 +669,13 @@ object SQLConf {

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.doc("When false, we will throw an error if a query contains a cartesian product without " +
"explicit CROSS JOIN syntax.")
"explicit CROSS JOIN syntax.")
.booleanConf
.createWithDefault(false)

val ORDER_BY_ORDINAL = buildConf("spark.sql.orderByOrdinal")
.doc("When true, the ordinal numbers are treated as the position in the select list. " +
"When false, the ordinal numbers in order/sort by clause are ignored.")
"When false, the ordinal numbers in order/sort by clause are ignored.")
.booleanConf
.createWithDefault(true)

Expand Down Expand Up @@ -727,10 +727,10 @@ object SQLConf {
// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
buildConf("spark.sql.selfJoinAutoResolveAmbiguity")
.internal()
.booleanConf
.createWithDefault(true)
buildConf("spark.sql.selfJoinAutoResolveAmbiguity")
.internal()
.booleanConf
.createWithDefault(true)

// Whether to retain group by columns or not in GroupedData.agg.
val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns")
Expand Down Expand Up @@ -759,11 +759,11 @@ object SQLConf {

val WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME =
buildConf("spark.sql.codegen.useIdInClassName")
.internal()
.doc("When true, embed the (whole-stage) codegen stage ID into " +
"the class name of the generated class as a suffix")
.booleanConf
.createWithDefault(true)
.internal()
.doc("When true, embed the (whole-stage) codegen stage ID into " +
"the class name of the generated class as a suffix")
.booleanConf
.createWithDefault(true)

val WHOLESTAGE_MAX_NUM_FIELDS = buildConf("spark.sql.codegen.maxFields")
.internal()
Expand Down Expand Up @@ -1258,7 +1258,7 @@ object SQLConf {
buildConf("spark.sql.execution.rangeExchange.sampleSizePerPartition")
.internal()
.doc("Number of points to sample per partition in order to determine the range boundaries" +
" for range partitioning, typically used in global sorting (without limit).")
" for range partitioning, typically used in global sorting (without limit).")
.intConf
.createWithDefault(100)

Expand Down Expand Up @@ -1334,8 +1334,8 @@ object SQLConf {
"information. The values of options whose names that match this regex will be redacted " +
"in the explain output. This redaction is applied on top of the global redaction " +
s"configuration defined by ${SECRET_REDACTION_PATTERN.key}.")
.regexConf
.createWithDefault("(?i)url".r)
.regexConf
.createWithDefault("(?i)url".r)

val SQL_STRING_REDACTION_PATTERN =
buildConf("spark.sql.redaction.string.regex")
Expand All @@ -1359,19 +1359,19 @@ object SQLConf {

val ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION =
buildConf("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation")
.internal()
.doc("When this option is set to true, creating managed tables with nonempty location " +
"is allowed. Otherwise, an analysis exception is thrown. ")
.booleanConf
.createWithDefault(false)
.internal()
.doc("When this option is set to true, creating managed tables with nonempty location " +
"is allowed. Otherwise, an analysis exception is thrown. ")
.booleanConf
.createWithDefault(false)

val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.executorQueueSize")
.internal()
.doc("The size (measured in number of rows) of the queue used in continuous execution to" +
" buffer the results of a ContinuousDataReader.")
.intConf
.createWithDefault(1024)
.internal()
.doc("The size (measured in number of rows) of the queue used in continuous execution to" +
" buffer the results of a ContinuousDataReader.")
.intConf
.createWithDefault(1024)

val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS =
buildConf("spark.sql.streaming.continuous.executorPollIntervalMs")
Expand Down Expand Up @@ -1429,8 +1429,8 @@ object SQLConf {
"issues. Turn on this config to insert a local sort before actually doing repartition " +
"to generate consistent repartition results. The performance of repartition() may go " +
"down since we insert extra local sort before it.")
.booleanConf
.createWithDefault(true)
.booleanConf
.createWithDefault(true)

val NESTED_SCHEMA_PRUNING_ENABLED =
buildConf("spark.sql.optimizer.nestedSchemaPruning.enabled")
Expand All @@ -1446,8 +1446,8 @@ object SQLConf {
buildConf("spark.sql.execution.topKSortFallbackThreshold")
.internal()
.doc("In SQL queries with a SORT followed by a LIMIT like " +
"'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort" +
" in memory, otherwise do a global sort which spills to disk if necessary.")
"'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort" +
" in memory, otherwise do a global sort which spills to disk if necessary.")
.intConf
.createWithDefault(Int.MaxValue)

Expand Down Expand Up @@ -1554,6 +1554,14 @@ object SQLConf {
.internal()
.booleanConf
.createWithDefault(false)

val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields")
.internal()
.doc("Maximum number of fields of sequence-like entries that can be converted to strings " +
"in debug output. Any elements beyond the limit will be dropped and replaced by a" +
""" "... N more fields" placeholder.""")
.intConf
.createWithDefault(25)
}

/**
Expand Down Expand Up @@ -1965,6 +1973,8 @@ class SQLConf extends Serializable with Logging {

def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)

def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.io.Source

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

case class QueryExecutionTestRecord(
Expand Down Expand Up @@ -102,6 +103,21 @@ class QueryExecutionSuite extends SharedSQLContext {
}
}

test("limit number of fields by sql config") {
def relationPlans: String = {
val ds = spark.createDataset(Seq(QueryExecutionTestRecord(
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26)))
ds.queryExecution.toString
}
withSQLConf(SQLConf.MAX_TO_STRING_FIELDS.key -> "26") {
assert(relationPlans.contains("more fields"))
}
withSQLConf(SQLConf.MAX_TO_STRING_FIELDS.key -> "27") {
assert(!relationPlans.contains("more fields"))
}
}

test("toString() exception/error handling") {
spark.experimental.extraStrategies = Seq(
new SparkStrategy {
Expand Down