Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
169 commits
Select commit Hold shift + click to select a range
fb41b90
[SPARK-34361][K8S] In case of downscaling avoid killing of executors …
attilapiros Mar 3, 2021
b8b6f88
[SPARK-34948][K8S] Add ownerReference to executor configmap to fix le…
dongjoon-hyun Apr 3, 2021
e852a3c
[SPARK-35482][K8S] Use `spark.blockManager.port` not the wrong `spark…
yaooqinn May 21, 2021
5625c45
[SPARK-35493][K8S] make `spark.blockManager.port` fallback for `spark…
yaooqinn May 23, 2021
7c3e411
[SPARK-32975][K8S] Add config for driver readiness timeout before exe…
cchriswu Jun 4, 2021
38ec106
[SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception
yaooqinn Jun 10, 2021
ee142c0
[MINOR][K8S] Print the driver pod name instead of Some(name) if absent
yaooqinn Jun 13, 2021
fd14c30
[SPARK-37049][K8S] executorIdleTimeout should check `creationTimestam…
yangwwei Oct 20, 2021
8e57cfe
[SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapsho…
dongjoon-hyun Dec 1, 2021
3c35c38
[SPARK-34949][CORE] Prevent BlockManager reregister when Executor is …
sumeetgajjar Apr 5, 2021
c352398
[SPARK-34674][CORE][K8S] Close SparkContext after the Main method has…
kotlovs Apr 22, 2021
a004fb6
[SPARK-36193][CORE] Recover SparkSubmit.runMain not to stop SparkCont…
dongjoon-hyun Jul 19, 2021
a92ed12
[SPARK-35879][CORE][SHUFFLE] Fix performance regression caused by col…
yaooqinn Jun 26, 2021
d6b5c39
[SPARK-34473][SQL] Avoid NPE in DataFrameReader.schema(StructType)
cloud-fan Feb 22, 2021
866e683
[SPARK-34490][SQL] Analysis should fail if the view refers a dropped …
linhongliu-db Feb 23, 2021
6003f7c
[SPARK-34515][SQL] Fix NPE if InSet contains null value during getPar…
ulysses-you Feb 24, 2021
eee08d7
[SPARK-34436][SQL] DPP support LIKE ANY/ALL expression
wangyum Feb 25, 2021
91f2a9e
[SPARK-34550][SQL] Skip InSet null value during push filter to Hive m…
ulysses-you Feb 26, 2021
d78ae65
[SPARK-34556][SQL] Checking duplicate static partition columns should…
zsxwing Mar 1, 2021
5399700
[SPARK-34417][SQL] org.apache.spark.sql.DataFrameNaFunctions.fillMap …
amandeep-sharma Mar 2, 2021
59bd127
[SPARK-34547][SQL] Only use metadata columns for resolution as last r…
karenfeng Mar 2, 2021
bcf662e
[SPARK-34596][SQL] Use Utils.getSimpleName to avoid hitting Malformed…
rednaxelafx Mar 3, 2021
0ba7767
[SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch…
Mar 3, 2021
b3500ce
[SPARK-34555][SQL] Resolve metadata output from DataFrame
karenfeng Mar 3, 2021
c41b543
[SPARK-34584][SQL] Static partition should also follow StoreAssignmen…
cloud-fan Mar 4, 2021
463e130
[SPARK-34599][SQL] Fix the issue that INSERT INTO OVERWRITE doesn't s…
zsxwing Mar 4, 2021
f3fdc8d
[SPARK-34567][SQL] CreateTableAsSelect should update metrics too
AngersZhuuuu Mar 4, 2021
c9700c9
[SPARK-34482][SS] Correct the active SparkSession for StreamExecution…
Ngone51 Mar 4, 2021
db5fe1d
[SPARK-32924][WEBUI] Make duration column in master UI sorted in the …
Mar 4, 2021
4e48ab4
[SPARK-34613][SQL] Fix view does not capture disable hint config
ulysses-you Mar 5, 2021
81d34b4
[SPARK-34607][SQL][3.1] Add `Utils.isMemberClass` to fix a malformed …
maropu Mar 5, 2021
e0ef75a
[SPARK-34681][SQL] Fix bug for full outer shuffled hash join when bui…
c21 Mar 10, 2021
b82a653
[SPARK-34682][SQL] Fix regression in canonicalization error check in …
andygrove Mar 10, 2021
32624a7
[SPARK-34682][SQL] Use PrivateMethodTester instead of reflection
andygrove Mar 10, 2021
6f94a88
[MINOR][SQL] Remove unnecessary extend from BroadcastHashJoinExec
c21 Mar 11, 2021
f5474b5
[SPARK-34713][SQL] Fix group by CreateStruct with ExtractValue
cloud-fan Mar 11, 2021
b52570a
[SPARK-34724][SQL] Fix Interpreted evaluation by using getMethod inst…
dongjoon-hyun Mar 12, 2021
52395a7
[SPARK-34723][SQL] Correct parameter type for subexpression eliminati…
viirya Mar 13, 2021
cf311da
[SPARK-34727][SQL] Fix discrepancy in casting float to timestamp
MaxGekk Mar 14, 2021
bd2e15c
[SPARK-34504][SQL] Avoid unnecessary resolving of SQL temp views for …
cloud-fan Mar 17, 2021
a91c0d8
[SPARK-34770][SQL] InMemoryCatalog.tableExists should not fail if dat…
cloud-fan Mar 17, 2021
ff792c4
[SPARK-34749][SQL][3.1] Simplify ResolveCreateNamedStruct
cloud-fan Mar 17, 2021
d2b4abf
[SPARK-34737][SQL][3.1] Cast input float to double in `TIMESTAMP_SECO…
MaxGekk Mar 18, 2021
acfe003
[SPARK-34731][CORE] Avoid ConcurrentModificationException when redact…
bersprockets Mar 18, 2021
24369bd
[SPARK-34776][SQL] Nested column pruning should not prune Window prod…
viirya Mar 19, 2021
f3c14c5
[SPARK-34796][SQL][3.1] Initialize counter variable for LIMIT code-ge…
c21 Mar 21, 2021
1d45ac7
[SPARK-34811][CORE] Redact fs.s3a.access.key like secret and token
dongjoon-hyun Mar 21, 2021
20be38f
[SPARK-34225][CORE] Don't encode further when a URI form string is pa…
sarutak Mar 22, 2021
519a6fa
[SPARK-34790][CORE] Disable fetching shuffle blocks in batch when io …
hezuojiao Mar 22, 2021
5a22ad8
[SPARK-33482][SPARK-34756][SQL] Fix FileScan equality check
peter-toth Mar 23, 2021
e139042
[SPARK-34833][SQL] Apply right-padding correctly for correlated subqu…
maropu Mar 24, 2021
544a035
[SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle …
otterc Mar 25, 2021
975baa6
[SPARK-34833][SQL][FOLLOWUP] Handle outer references in all the places
cloud-fan Mar 26, 2021
dcb2bad
[SPARK-34829][SQL] Fix higher order function results
peter-toth Mar 28, 2021
31fa680
[SPARK-34876][SQL] Fill defaultResult of non-nullable aggregates
tanelk Mar 29, 2021
ee91028
[SPARK-34814][SQL] LikeSimplification should handle NULL
AngersZhuuuu Mar 29, 2021
891e873
[SPARK-34845][CORE] ProcfsMetricsGetter shouldn't return partial proc…
Mar 29, 2021
3a53e8c
[SPARK-34909][SQL] Fix conversion of negative to unsigned in conv()
timarmstrong Mar 31, 2021
d56aabe
[SPARK-34939][CORE] Throw fetch failure exception when unable to dese…
viirya Apr 4, 2021
21a727d
[SPARK-34923][SQL] Metadata output should be empty for more plans
karenfeng Apr 6, 2021
58af553
[SPARK-34922][SQL][3.1] Use a relative cost comparison function in th…
tanelk Apr 8, 2021
9ad6a06
[SPARK-34963][SQL] Fix nested column pruning for extracting case-inse…
viirya Apr 9, 2021
8715cf9
[SPARK-34926][SQL][3.1] PartitioningUtils.getPathFragment() should re…
AngersZhuuuu Apr 12, 2021
8accb94
[SPARK-35014] Fix the PhysicalAggregation pattern to not rewrite fold…
sigmod Apr 13, 2021
9e89876
[SPARK-34834][NETWORK] Fix a potential Netty memory leak in Transport…
weixiuli Apr 14, 2021
844beff
[SPARK-34225][CORE][FOLLOWUP] Replace Hadoop's Path with Utils.resolv…
sarutak Apr 15, 2021
8afe6b2
[SPARK-35136] Remove initial null value of LiveStage.info
sander-goos Apr 19, 2021
363a5bd
[SPARK-35117][UI] Change progress bar back to highlight ratio of task…
Kimahriman Apr 20, 2021
95acf41
[SPARK-35080][SQL] Only allow a subset of correlated equality predica…
allisonwang-db Apr 20, 2021
a41cebe
[SPARK-34639][SQL][3.1] RelationalGroupedDataset.alias should not cre…
cloud-fan Apr 21, 2021
06e5e2a
[SPARK-35096][SQL] SchemaPruning should adhere spark.sql.caseSensitiv…
sandeep-katta Apr 21, 2021
a657b0a
[SPARK-35127][UI] When we switch between different stage-detail pages…
echohlne Apr 22, 2021
60867dd
[SPARK-34897][SQL][3.1] Support reconcile schemas based on index afte…
wangyum Apr 23, 2021
b1e56bd
[SPARK-35168][SQL] mapred.reduce.tasks should be shuffle.partitions n…
yaooqinn Apr 25, 2021
bc789f2
[SPARK-35087][UI] Some columns in table Aggregated Metrics by Executo…
echohlne Apr 26, 2021
7b26b0c
[SPARK-35213][SQL] Keep the correct ordering of nested structs in cha…
Kimahriman Apr 26, 2021
a6e8f53
[SPARK-35244][SQL] Invoke should throw the original exception
cloud-fan Apr 28, 2021
5d4ab0e
[SPARK-35278][SQL] Invoke should find the method with correct number …
viirya May 1, 2021
4e2198c
[SPARK-34794][SQL] Fix lambda variable name issues in nested DataFram…
May 5, 2021
80b41db
[SPARK-35288][SQL] StaticInvoke should find the method without exact …
viirya May 7, 2021
da70670
[SPARK-35359][SQL] Insert data with char/varchar datatype will fail w…
fhygh May 17, 2021
53c98d8
[SPARK-35411][SQL] Add essential information while serializing TreeNo…
ivoson May 18, 2021
18c468b
[SPARK-35106][CORE][SQL] Avoid failing rename caused by destination d…
May 19, 2021
fd47c5d
[SPARK-35093][SQL] AQE now uses newQueryStage plan as key for looking…
andygrove May 19, 2021
84c53cb
[SPARK-35287][SQL] Allow RemoveRedundantProjects to preserve ProjectE…
sarutak May 24, 2021
81ad2a6
[SPARK-35449][SQL][3.1] Only extract common expressions from CaseWhen…
Kimahriman May 24, 2021
3e01a01
[SPARK-35454][SQL][3.1] One LogicalPlan can match multiple dataset ids
Ngone51 May 28, 2021
eeb5e9c
[SPARK-35411][SQL][FOLLOWUP] Handle Currying Product while serializin…
ivoson May 31, 2021
5465d29
[SPARK-35610][CORE] Fix the memory leak introduced by the Executor's …
attilapiros Jun 2, 2021
6953e0c
[SPARK-35589][CORE][3.1] BlockManagerMasterEndpoint should not ignore…
dongjoon-hyun Jun 3, 2021
d40b540
[SPARK-35679][SQL] instantToMicros overflow
dchvn Jun 10, 2021
c387856
[SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in sub…
cfmcgrady Jun 10, 2021
d1523a2
[SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails f…
Jun 10, 2021
f8e19bc
[SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetri…
sarutak Jun 10, 2021
c42759c
[SPARK-35652][SQL] joinWith on two table generated from same one
dchvn Jun 11, 2021
f1a1e2e
[SPARK-35695][SQL] Collect observed metrics from cached and adaptive …
tanelk Jun 11, 2021
f87ad51
[SPARK-35746][UI] Fix taskid in the stage page task event timeline
shahidki31 Jun 12, 2021
e934d57
[SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
wankunde Jun 13, 2021
763c27c
[SPARK-35767][SQL] Avoid executing child plan twice in CoalesceExec
andygrove Jun 15, 2021
6ae88cf
[SPARK-35791][SQL] Release on-going map properly for NULL-aware ANTI …
c21 Jun 17, 2021
bd86d35
[SPARK-34766][SQL][3.1] Do not capture maven config for views
ulysses-you Mar 18, 2021
07802bb
[SPARK-35792][SQL] View should not capture configs used in `RelationC…
linhongliu-db Jun 17, 2021
c33950e
[SPARK-35391] Fix memory leak in ExecutorAllocationListener
VasilyKolpakov Jun 21, 2021
5fe864a
[SPARK-35695][SQL][FOLLOWUP] Use AQE helper to simplify the code in C…
cloud-fan Jun 23, 2021
d2cbf6e
[SPARK-35841][SQL] Casting string to decimal type doesn't work if the…
dchvn Jun 24, 2021
70ec5ed
[SPARK-35886][SQL][3.1] PromotePrecision should not overwrite genCode…
viirya Jun 27, 2021
e48023a
[SPARK-35898][SQL] Fix arrays and maps in RowToColumnConverter
tomvanbussel Jun 28, 2021
1c89ae7
[SPARK-35935][SQL][3.1][3.0] Prevent failure of `MSCK REPAIR TABLE` o…
MaxGekk Jun 30, 2021
7295568
[SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerW…
Ngone51 Jul 1, 2021
8ae868c
[SPARK-36020][SQL][3.1] Check logical link in remove redundant projects
ulysses-you Jul 7, 2021
ccfbc17
[SQL][MINOR] EquivalentExpressions.commonChildrenToRecurse should ski…
cloud-fan Jul 13, 2021
a27f021
[SPARK-36076][SQL][3.1] ArrayIndexOutOfBounds in Cast string to times…
dchvn Jul 13, 2021
2bbd9bf
[SPARK-36034][SQL][3.1] Rebase datetime in pushed down filters to par…
MaxGekk Jul 16, 2021
a151a0c
[SPARK-35972][SQL][3.1] When replace ExtractValue in NestedColumnAlia…
AngersZhuuuu Jul 16, 2021
8eef6e8
[SPARK-36093][SQL][3.1] RemoveRedundantAliases should not change Comm…
AngersZhuuuu Jul 19, 2021
e304be4
[SPARK-36079][SQL] Null-based filter estimate should always be in the…
karenfeng Jul 20, 2021
bcdfe8b
[SPARK-36210][SQL] Preserve column insertion order in Dataset.withCol…
koertkuipers Jul 20, 2021
c50043d
[SPARK-35027][CORE] Close the inputStream in FileAppender when writin…
jhu-chang Jul 21, 2021
e031a6e
[SPARK-36020][SQL][FOLLOWUP] RemoveRedundantProjects should retain th…
cloud-fan Jul 21, 2021
51146da
[SPARK-28266][SQL] convertToLogicalRelation should not interpret `pat…
shardulm94 Jul 21, 2021
3f99be0
[SPARK-36213][SQL] Normalize PartitionSpec for Describe Table Command…
yaooqinn Jul 21, 2021
9c2f8b3
[SPARK-36242][CORE][3.1] Ensure spill file closed before set success …
LuciferYang Jul 27, 2021
7d1ba37
[SPARK-36354][CORE] EventLogFileReader should skip rolling event log …
dongjoon-hyun Aug 4, 2021
d2e9151
[SPARK-36086][SQL][3.1] CollapseProject project replace alias should …
AngersZhuuuu Aug 9, 2021
67d5aa9
[SPARK-36339][SQL][3.0] References to grouping that not part of aggre…
gaoyajun02 Aug 9, 2021
ca5113d
[SPARK-36464][CORE] Fix Underlying Size Variable Initialization in Ch…
kazuyukitanimura Aug 10, 2021
24c7d5e
[SPARK-36489][SQL] Aggregate functions over no grouping keys, on tabl…
IonutBoicuAms Aug 12, 2021
4688592
[SPARK-36353][SQL][3.1] RemoveNoopOperators should keep output schema
AngersZhuuuu Aug 13, 2021
6ad9939
[SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is inte…
jiangxb1987 Aug 13, 2021
83d8105
[SPARK-35876][SQL][3.1] ArraysZip should retain field names to avoid …
AngersZhuuuu Aug 24, 2021
8ffb3f0
[SPARK-36564][CORE] Fix NullPointerException in LiveRDDDistribution.t…
Ngone51 Aug 24, 2021
6d49a31
[SPARK-36352][SQL][3.1] Spark should check result plan's output schem…
AngersZhuuuu Aug 25, 2021
491e1ed
[SPARK-36509][CORE] Fix the issue that executors are never re-schedul…
sarutak Aug 28, 2021
f001d98
[SPARK-36614][CORE][UI] Correct executor loss reason caused by decomm…
Ngone51 Aug 30, 2021
cabf1bf
[SPARK-36639][SQL] Fix an issue that sequence builtin function causes…
sarutak Sep 3, 2021
051073b
[SPARK-36704][CORE] Expand exception handling to more Java 9 cases wh…
srowen Sep 11, 2021
589d0d5
[SPARK-36715][SQL] InferFiltersFromGenerate should not infer filter f…
cfmcgrady Sep 14, 2021
ebe5b88
[SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan
AngersZhuuuu Sep 14, 2021
29b38d3
[SPARK-36702][SQL][FOLLOWUP] ArrayUnion handle duplicated Double.NaN …
AngersZhuuuu Sep 15, 2021
402ca59
[SPARK-36755][SQL] ArraysOverlap should handle duplicated Double.NaN …
AngersZhuuuu Sep 15, 2021
8e3a919
[SPARK-36783][SQL] ScanOperation should not push Filter through nonde…
cloud-fan Sep 17, 2021
37a1871
[SPARK-36789][SQL] Use the correct constant type as the null value ho…
cloud-fan Sep 17, 2021
27ca66c
[SPARK-36741][SQL] ArrayDistinct handle duplicated Double.NaN and Flo…
AngersZhuuuu Sep 17, 2021
0b41e99
[SPARK-35985][SQL][3.1] push partitionFilters for empty readDataSchema
huaxingao Sep 19, 2021
43b00a5
[SPARK-36754][SQL] ArrayIntersect handle duplicated Double.NaN and Fl…
AngersZhuuuu Sep 20, 2021
6129ca4
[SPARK-36706][SQL][3.1] OverwriteByExpression conversion in DataSourc…
huaxingao Sep 20, 2021
401e641
[SPARK-36803][SQL] Fix ArrayType conversion when reading Parquet file…
sadikovi Sep 22, 2021
821dd6a
[SPARK-36753][SQL] ArrayExcept handle duplicated Double.NaN and Float…
AngersZhuuuu Sep 22, 2021
90a1cf9
[SPARK-36782][CORE] Avoid blocking dispatcher-BlockManagerMaster duri…
f-thiele Sep 23, 2021
b5eb1a4
[SPARK-36782][CORE][FOLLOW-UP] Only handle shuffle block in separate …
Ngone51 Sep 23, 2021
329d2ed
[SPARK-36792][SQL] InSet should handle NaN
AngersZhuuuu Sep 24, 2021
0498e2e
[SPARK-36874][SPARK-34634][SQL][3.1] ResolveReference.dedupRight shou…
sarutak Oct 7, 2021
74ce639
[SPARK-36717][CORE] Incorrect order of variable initialization may le…
Oct 8, 2021
650d951
[SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when…
JoshRosen Oct 14, 2021
640d88b
[SPARK-37098][SQL][3.1] Alter table properties should invalidate cache
ulysses-you Oct 26, 2021
fa3c90b
[SPARK-37203][SQL] Fix NotSerializableException when observe with Typ…
beliefer Nov 5, 2021
51ca7c0
[SPARK-37196][SQL] HiveDecimal enforcePrecisionScale failed return null
AngersZhuuuu Nov 8, 2021
86e8177
[SPARK-37388][SQL] Fix NPE in WidthBucket in WholeStageCodegenExec
tomvanbussel Nov 22, 2021
508d6f4
[SPARK-37452][SQL][3.1] Char and Varchar break backward compatibility…
yaooqinn Nov 30, 2021
04f31f4
[SPARK-37556][SQL] Deser void class fail with Java serialization
daijyc Dec 7, 2021
9157b18
[SPARK-37392][SQL] Fix the performance bug when inferring constraints…
cloud-fan Dec 8, 2021
a9e99bd
[SPARK-37451][3.1][SQL] Fix cast string type to decimal type if spark…
wangyum Dec 9, 2021
4c01c47
[SPARK-37060][CORE][3.1] Handle driver status response from backup ma…
testsgmr Dec 16, 2021
918c222
[SPARK-37654][SQL] Fix NPE in Row.getSeq when field is Null
huaxingao Dec 17, 2021
3f541d1
[SPARK-37784][SQL] Correctly handle UDTs in CodeGenerator.addBuffered…
JoshRosen Jan 4, 2022
d8dfbf7
[SPARK-35714][FOLLOW-UP][CORE] WorkerWatcher should run System.exit i…
Ngone51 Jan 5, 2022
dd437c0
[SPARK-37860][UI] Fix taskindex in the stage page task event timeline
Jan 11, 2022
4778af0
[SPARK-34555][SQL][FOLLOWUP] code format
fishcus Jan 12, 2022
5f31fe1
[SPARK-38075][SQL][3.1] Fix `hasNext` in `HiveScriptTransformationExe…
bersprockets Feb 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-34567][SQL] CreateTableAsSelect should update metrics too
### What changes were proposed in this pull request?
For command `CreateTableAsSelect` we use `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand` to insert data.
We will update metrics of  `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand`  in `FileFormatWriter.write()`, but we only show CreateTableAsSelectCommand in WebUI SQL Tab.
We need to update `CreateTableAsSelectCommand`'s metrics too.

Before this PR:
![image](https://user-images.githubusercontent.com/46485123/109411226-81f44480-79db-11eb-99cb-b9686b15bf61.png)

After this PR:
![image](https://user-images.githubusercontent.com/46485123/109411232-8ae51600-79db-11eb-9111-3bea0bc2d475.png)

![image](https://user-images.githubusercontent.com/46485123/109905192-62aa2f80-7cd9-11eb-91f9-04b16c9238ae.png)

### Why are the changes needed?
Complete SQL Metrics

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
<!--
MT

Closes apache#31679 from AngersZhuuuu/SPARK-34567.

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 401e270)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
AngersZhuuuu authored and fishcus committed Jan 12, 2022
commit f3fdc8d68f7b7ff7e2d76db374f69c49219c3a67
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.command

import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration

/**
Expand Down Expand Up @@ -73,4 +74,26 @@ object DataWritingCommand {
attr.withName(outputName)
}
}

/**
* When execute CTAS operators, Spark will use [[InsertIntoHadoopFsRelationCommand]]
* or [[InsertIntoHiveTable]] command to write data, they both inherit metrics from
* [[DataWritingCommand]], but after running [[InsertIntoHadoopFsRelationCommand]]
* or [[InsertIntoHiveTable]], we only update metrics in these two command through
* [[BasicWriteJobStatsTracker]], we also need to propogate metrics to the command
* that actually calls [[InsertIntoHadoopFsRelationCommand]] or [[InsertIntoHiveTable]].
*
* @param sparkContext Current SparkContext.
* @param command Command to execute writing data.
* @param metrics Metrics of real DataWritingCommand.
*/
def propogateMetrics(
sparkContext: SparkContext,
command: DataWritingCommand,
metrics: Map[String, SQLMetric]): Unit = {
command.metrics.foreach { case (key, metric) => metrics(key).set(metric.value) }
SQLMetrics.postDriverMetricUpdates(sparkContext,
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
metrics.values.toSeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)

try {
dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan)
dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan, metrics)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -518,7 +519,8 @@ case class DataSource(
mode: SaveMode,
data: LogicalPlan,
outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation = {
physicalPlan: SparkPlan,
metrics: Map[String, SQLMetric]): BaseRelation = {
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames)
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
Expand Down Expand Up @@ -546,6 +548,7 @@ case class DataSource(
partitionColumns = resolvedPartCols,
outputColumnNames = outputColumnNames)
resolved.run(sparkSession, physicalPlan)
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, resolved, metrics)
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation()
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -755,4 +756,20 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}
}

test("SPARK-34567: Add metrics for CTAS operator") {
withTable("t") {
val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a")
val dataWritingCommandExec =
df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec]
dataWritingCommandExec.executeCollect()
val createTableAsSelect = dataWritingCommandExec.cmd
assert(createTableAsSelect.metrics.contains("numFiles"))
assert(createTableAsSelect.metrics("numFiles").value == 1)
assert(createTableAsSelect.metrics.contains("numOutputBytes"))
assert(createTableAsSelect.metrics("numOutputBytes").value > 0)
assert(createTableAsSelect.metrics.contains("numOutputRows"))
assert(createTableAsSelect.metrics("numOutputRows").value == 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {

val command = getWritingCommand(catalog, tableDesc, tableExists = true)
command.run(sparkSession, child)
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, metrics)
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
Expand All @@ -69,6 +70,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand {
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
val command = getWritingCommand(catalog, createdTableMeta, tableExists = false)
command.run(sparkSession, child)
DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, metrics)
} catch {
case NonFatal(e) =>
// drop the created table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton

// Disable AQE because metric info is different with AQE on/off
Expand All @@ -34,4 +36,29 @@ class SQLMetricsSuite extends SQLMetricsTestUtils with TestHiveSingleton
testMetricsDynamicPartition("hive", "hive", "t1")
}
}

test("SPARK-34567: Add metrics for CTAS operator") {
Seq(false, true).foreach { canOptimized =>
withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> canOptimized.toString) {
withTable("t") {
val df = sql(s"CREATE TABLE t STORED AS PARQUET AS SELECT 1 as a")
val dataWritingCommandExec =
df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec]
dataWritingCommandExec.executeCollect()
val createTableAsSelect = dataWritingCommandExec.cmd
if (canOptimized) {
assert(createTableAsSelect.isInstanceOf[OptimizedCreateHiveTableAsSelectCommand])
} else {
assert(createTableAsSelect.isInstanceOf[CreateHiveTableAsSelectCommand])
}
assert(createTableAsSelect.metrics.contains("numFiles"))
assert(createTableAsSelect.metrics("numFiles").value == 1)
assert(createTableAsSelect.metrics.contains("numOutputBytes"))
assert(createTableAsSelect.metrics("numOutputBytes").value > 0)
assert(createTableAsSelect.metrics.contains("numOutputRows"))
assert(createTableAsSelect.metrics("numOutputRows").value == 1)
}
}
}
}
}