Skip to content

Conversation

@pull
Copy link

@pull pull bot commented Sep 13, 2022

See Commits and Changes for more details.


Created by pull[bot]

Can you help keep this open source service alive? 💖 Please sponsor : )

Yikun and others added 3 commits September 13, 2022 22:31
… dockerfile is changed

### What changes were proposed in this pull request?
Do base image real in time build only when infra dockerfile is changed.

### Why are the changes needed?
Recently github ghcr is [unstable](https://github.com/orgs/community/discussions/32184), so we better use infra static image when no infra dockerfile changes.

After this change even github ghcr image write has some issue, only infra dockerfile PR is blocked, other PR don't have any impact.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
1. CI passed, and [use](https://github.com/Yikun/spark/runs/8242449708?check_suite_focus=true#step:2:19) the `ghcr.io/apache/spark/apache-spark-github-action-image-cache:master-static`.
2. local test infra changes trigger CI: Yikun#170
3. test `./dev/is-changed.py -m infra-image`
![image](https://user-images.githubusercontent.com/1736354/189041038-1351a223-749f-4a65-beb8-32fd04116a24.png)

```
# previous hash of dockerfile is changed
$ export APACHE_SPARK_REF=871152bda69a5d5db714eecd42fdd3e7d2e04557
$ ./dev/is-changed.py -m infra-image
true
# hash of dockerfile is changed
$ export APACHE_SPARK_REF=0830575100c1bcbc89d98a6859fe3b8d46ca2e6e
$ ./dev/is-changed.py -m infra-image
false
# next hash  of dockerfile is changed
$ export APACHE_SPARK_REF=e6c58c1bd6f64ebfb337348fa6132c0b230dc932
$ ./dev/is-changed.py -m infra-image
false
```

Closes #37828 from Yikun/infra-image-static.

Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
In the PR, I propose to change the type of `messageParameters` passed to Spark exception (via constructors) from array of strings to map of pair of strings. The map contains parameters names from `error-classes.json` as keys, and parameters values as map's values. While formatting error messages using the templates from `error-classes.json`, the formatter can fail w/ an internal error if it cannot find a parameter in the map passed to the current exception. The map of message parameters can contain more elements then used by the template.

Before:
```scala
new AnalysisException(
  errorClass = "UNSUPPORTED_GENERATOR",
  errorSubClass = "MULTI_GENERATOR",
  messageParameters = Array(clause, generators.size.toString, generators.map(toSQLExpr).mkString(", ")))
```

After:
```scala
new AnalysisException(
  errorClass = "UNSUPPORTED_GENERATOR",
  errorSubClass = "MULTI_GENERATOR",
  messageParameters = Map(
    "clause" -> clause,
    "num" -> generators.size.toString,
    "generators" -> generators.map(toSQLExpr).mkString(", ")))
```

### Why are the changes needed?
At the moment, the order of error message parameters cannot be changed in `error-classes.json` independently from the source code where the parameters are passed as arrays. After the changes, the tech editors/users can change the error messages including the order of params or skip some params w/ modifying Spark's code base (including tests).

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "test:testOnly *ErrorParserSuite"
$ build/sbt "test:testOnly *AnalysisErrorSuite"
$ build/sbt "test:testOnly *AnalysisSuite
$ build/sbt "test:testOnly *ResolveSubquerySuite"
$ build/sbt "test:testOnly *V2OverwriteByExpressionANSIAnalysisSuite"
$ build/sbt "test:testOnly *TimeWindowSuite"
$ build/sbt "test:testOnly *DataSourceV2SQLSuiteV1Filter"
$ build/sbt "test:testOnly *DataFrameSuite"
$ build/sbt "test:testOnly *JsonV1Suite"
```

Closes #37834 from MaxGekk/error-message-parameters-map.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?
Change canonicalization to a one pass process and move logic from `Canonicalize.reorderCommutativeOperators` to the respective commutative operators' `canonicalize`.

### Why are the changes needed?
#34883 improved expression canonicalization performance but introduced regression when a commutative operator is under a `BinaryComparison`. This is because children reorder by their hashcode can't happen in `preCanonicalized` phase when children are not yet "final".

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new UT.

Closes #37851 from peter-toth/SPARK-40362-fix-binarycomparison-canonicalization.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?

This PR aims to update K8s document to declare the support of YuniKorn v1.1.+

### Why are the changes needed?

YuniKorn 1.1.0 has 87 JIRAs and is the first version to support multi-arch officially.
- https://yunikorn.apache.org/release-announce/1.1.0

```
$ docker inspect apache/yunikorn:scheduler-1.0.0 | grep Architecture
        "Architecture": "amd64",
$ docker inspect apache/yunikorn:scheduler-1.1.0 | grep Architecture
        "Architecture": "arm64",
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested with Apache YuniKorn v1.1.0+.
```
$ build/sbt -Psparkr -Pkubernetes -Pkubernetes-integration-tests \
-Dspark.kubernetes.test.deployMode=docker-desktop "kubernetes-integration-tests/test" \
-Dtest.exclude.tags=minikube,local,decom \
-Dtest.default.exclude.tags=
...
[info] KubernetesSuite:
[info] - Run SparkPi with no resources (11 seconds, 238 milliseconds)
[info] - Run SparkPi with no resources & statefulset allocation (11 seconds, 58 milliseconds)
[info] - Run SparkPi with a very long application name. (9 seconds, 948 milliseconds)
[info] - Use SparkLauncher.NO_RESOURCE (9 seconds, 884 milliseconds)
[info] - Run SparkPi with a master URL without a scheme. (9 seconds, 834 milliseconds)
[info] - Run SparkPi with an argument. (9 seconds, 870 milliseconds)
[info] - Run SparkPi with custom labels, annotations, and environment variables. (9 seconds, 887 milliseconds)
[info] - All pods have the same service account by default (9 seconds, 891 milliseconds)
[info] - Run extraJVMOptions check on driver (5 seconds, 888 milliseconds)
[info] - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j2.properties (10 seconds, 261 milliseconds)
[info] - Run SparkPi with env and mount secrets. (18 seconds, 702 milliseconds)
[info] - Run PySpark on simple pi.py example (10 seconds, 944 milliseconds)
[info] - Run PySpark to test a pyfiles example (13 seconds, 934 milliseconds)
[info] - Run PySpark with memory customization (10 seconds, 853 milliseconds)
[info] - Run in client mode. (11 seconds, 301 milliseconds)
[info] - Start pod creation from template (9 seconds, 853 milliseconds)
[info] - SPARK-38398: Schedule pod creation from template (9 seconds, 923 milliseconds)
[info] - Run SparkR on simple dataframe.R example (13 seconds, 929 milliseconds)
[info] YuniKornSuite:
[info] - Run SparkPi with no resources (9 seconds, 769 milliseconds)
[info] - Run SparkPi with no resources & statefulset allocation (9 seconds, 776 milliseconds)
[info] - Run SparkPi with a very long application name. (9 seconds, 856 milliseconds)
[info] - Use SparkLauncher.NO_RESOURCE (9 seconds, 803 milliseconds)
[info] - Run SparkPi with a master URL without a scheme. (10 seconds, 783 milliseconds)
[info] - Run SparkPi with an argument. (10 seconds, 771 milliseconds)
[info] - Run SparkPi with custom labels, annotations, and environment variables. (9 seconds, 868 milliseconds)
[info] - All pods have the same service account by default (10 seconds, 811 milliseconds)
[info] - Run extraJVMOptions check on driver (6 seconds, 858 milliseconds)
[info] - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j2.properties (11 seconds, 171 milliseconds)
[info] - Run SparkPi with env and mount secrets. (18 seconds, 221 milliseconds)
[info] - Run PySpark on simple pi.py example (11 seconds, 970 milliseconds)
[info] - Run PySpark to test a pyfiles example (13 seconds, 990 milliseconds)
[info] - Run PySpark with memory customization (11 seconds, 992 milliseconds)
[info] - Run in client mode. (11 seconds, 294 milliseconds)
[info] - Start pod creation from template (11 seconds, 10 milliseconds)
[info] - SPARK-38398: Schedule pod creation from template (9 seconds, 956 milliseconds)
[info] - Run SparkR on simple dataframe.R example (12 seconds, 992 milliseconds)
[info] Run completed in 10 minutes, 15 seconds.
[info] Total number of tests run: 36
[info] Suites: completed 2, aborted 0
[info] Tests: succeeded 36, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 751 s (12:31), completed Sep 13, 2022, 11:47:24 AM
```

Closes #37872 from dongjoon-hyun/SPARK-40417.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…en infra dockerfile is changed"

This reverts commit 57da119.
ulysses-you and others added 4 commits September 14, 2022 08:40
…n AQE

### What changes were proposed in this pull request?

- Support get user-specified root repartition through  `DeserializeToObjectExec`
- Skip optimize empty for the root repartition which is user-specified
- Add a new rule `AdjustShuffleExchangePosition` to adjust the shuffle we add back, so that we can restore shuffle safely.

### Why are the changes needed?

AQE can not completely respect the user-specified repartition. The main reasons are:

1. the AQE optimzier will convert empty to local relation which does not reserve the partitioning info
2. the machine of AQE `requiredDistribution` only restore the repartition which does not support through `DeserializeToObjectExec`

After the fix:
The partition number of `spark.range(0).repartition(5).rdd.getNumPartitions` should be 5.

### Does this PR introduce _any_ user-facing change?

yes, ensure the user-specified distribution.

### How was this patch tested?

add tests

Closes #37612 from ulysses-you/output-partition.

Lead-authored-by: ulysses-you <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…egative size in error message

### What changes were proposed in this pull request?

Change `UnsafeArrayWriter#initialize` to use longs rather than ints when calculating the initial size of the array.

### Why are the changes needed?

When calculating the initial size in bytes needed for the array, `UnsafeArrayWriter#initialize` uses an int expression, which can overflow. The initialize method then passes the negative size to `BufferHolder#grow`, which complains about the negative size.

Example (the following will run just fine on a 16GB laptop, despite the large driver size setting):
```
bin/spark-sql --driver-memory 22g --master "local[1]"

create or replace temp view data1 as
select 0 as key, id as val
from range(0, 268271216);

create or replace temp view data2 as
select key as lkey, collect_list(val) as bigarray
from data1
group by key;

-- the below cache forces Spark to create unsafe rows
cache lazy table data2;

select count(*) from data2;
```
After a few minutes, `BufferHolder#grow` will throw the following exception:
```
java.lang.IllegalArgumentException: Cannot grow BufferHolder by size -2115263656 because the size is negative
	at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:67)
	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter.initialize(UnsafeArrayWriter.java:61)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.aggregate.Collect.serialize(collect.scala:73)
	at org.apache.spark.sql.catalyst.expressions.aggregate.Collect.serialize(collect.scala:37)
```
This query was going to fail anyway, but the message makes it looks like a bug in Spark rather than a user problem. `UnsafeArrayWriter#initialize` should calculate using a long expression and fail if the size exceeds `Integer.MAX_VALUE`, showing the actual initial size in the error message.

Note: This issue is not related to SPARK-39608, as far as I can tell, despite having the same symptom

### Does this PR introduce _any_ user-facing change?

Other than a better error message, no.

### How was this patch tested?

New unit test.

Closes #37852 from bersprockets/bufferholder_message_issue.

Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…alyzer

### What changes were proposed in this pull request?

This is a followup of #37841. The code change in https://github.com/apache/spark/pull/37841/files#diff-ed19f376a63eba52eea59ca71f3355d4495fad4fad4db9a3324aade0d4986a47R212 is wrong. The context of`analyzed.origin` is not accurate at all. Thus this PR is to revert it
### Why are the changes needed?

Fix a wrong query context in previous PR #37841
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

GA tests

Closes #37861 from gengliangwang/fxiReg.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
…PythonArrowOutput

### What changes were proposed in this pull request?

This PR proposes to change PythonArrowInput and PythonArrowOutput to be more generic to cover the complex data type on both input and output. This is a baseline work for #37863.

### Why are the changes needed?

The traits PythonArrowInput and PythonArrowOutput can be further generalized to cover complex data type on both input and output. E.g. Not all operators would have simple InternalRow as input data to pass to Python worker and vice versa for output data.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #37864 from HeartSaVioR/SPARK-40414.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
… formats

### What changes were proposed in this pull request?
In the PR, I propose to sort error message parameters in the MINIMAL and STANDARD formats (JSON formats) by parameter names.

### Why are the changes needed?
To make the output in JSON format stable.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
```

Closes #37875 from MaxGekk/sorted-message-parameters.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
@pull pull bot merged commit 801faba into wangyum:master Sep 14, 2022
pull bot pushed a commit that referenced this pull request Aug 19, 2025
…onicalized expressions

### What changes were proposed in this pull request?

Make PullOutNonDeterministic use canonicalized expressions to dedup group and  aggregate expressions. This affects pyspark udfs in particular. Example:

```
from pyspark.sql.functions import col, avg, udf

pythonUDF = udf(lambda x: x).asNondeterministic()

spark.range(10)\
.selectExpr("id", "id % 3 as value")\
.groupBy(pythonUDF(col("value")))\
.agg(avg("id"), pythonUDF(col("value")))\
.explain(extended=True)
```

Currently results in a plan like this:

```
Aggregate [_nondeterministic#15](#15), [_nondeterministic#15 AS dummyNondeterministicUDF(value)#12, avg(id#0L) AS avg(id)#13, dummyNondeterministicUDF(value#6L)#8 AS dummyNondeterministicUDF(value)#14](#15%20AS%20dummyNondeterministicUDF(value)#12,%20avg(id#0L)%20AS%20avg(id)#13,%20dummyNondeterministicUDF(value#6L)#8%20AS%20dummyNondeterministicUDF(value)#14)
+- Project [id#0L, value#6L, dummyNondeterministicUDF(value#6L)#7 AS _nondeterministic#15](#0L,%20value#6L,%20dummyNondeterministicUDF(value#6L)#7%20AS%20_nondeterministic#15)
   +- Project [id#0L, (id#0L % cast(3 as bigint)) AS value#6L](#0L,%20(id#0L%20%%20cast(3%20as%20bigint))%20AS%20value#6L)
      +- Range (0, 10, step=1, splits=Some(2))
```

and then it throws:

```
[[MISSING_AGGREGATION] The non-aggregating expression "value" is based on columns which are not participating in the GROUP BY clause. Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(value)" if you do not care which of the values within a group is returned. SQLSTATE: 42803
```

- how canonicalized fixes this:
  -  nondeterministic PythonUDF expressions always have distinct resultIds per udf
  - The fix is to canonicalize the expressions when matching. Canonicalized means that we're setting the resultIds to -1, allowing us to dedup the PythonUDF expressions.
- for deterministic UDFs, this rule does not apply and "Post Analysis" batch extracts and deduplicates the expressions, as expected

### Why are the changes needed?

- the output of the query with the fix applied still makes sense - the nondeterministic UDF is invoked only once, in the project.

### Does this PR introduce _any_ user-facing change?

Yes, it's additive, it enables queries to run that previously threw errors.

### How was this patch tested?

- added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52061 from benrobby/adhoc-fix-pull-out-nondeterministic.

Authored-by: Ben Hurdelhey <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants