Skip to content

Conversation

@lianhuiwang
Copy link
Contributor

What changes were proposed in this pull request?

when query only use metadata (example: partition key), it can return results based on metadata without scanning files. Hive did it in HIVE-1003.

How was this patch tested?

add unit tests

@SparkQA
Copy link

SparkQA commented Jun 3, 2016

Test build #59925 has finished for PR 13494 at commit 2ca2c38.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 3, 2016

Test build #59929 has finished for PR 13494 at commit edea710.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 3, 2016

Test build #59930 has finished for PR 13494 at commit 8426522.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 3, 2016

Test build #59940 has finished for PR 13494 at commit 153293e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jun 4, 2016

Can you try to write a design doc on this? Would be great to discuss the reasons why we might want this, the kind of queries that can be answered, corner cases, and how it should be implemented. Thanks.

@lianhuiwang
Copy link
Contributor Author

lianhuiwang commented Jun 4, 2016

@rxin I have writed a design doc: https://docs.google.com/document/d/1Bmi4-PkTaBQ0HVaGjIqa3eA12toKX52QaiUyhb6WQiM/edit?usp=sharing.
Glad to get your comments. Thanks.

val partitionSchema = files.partitionSchema.toAttributes
lazy val converter = GenerateUnsafeProjection.generate(partitionSchema, partitionSchema)
val partitionValues = selectedPartitions.map(_.values)
files.sqlContext.sparkContext.parallelize(partitionValues, 1).map(converter(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this partition has more than one data files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now in this PR, default of spark.sql.optimizer.metadataOnly is false, So if user needs this feature, he should set spark.sql.optimizer.metadataOnly=true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think optimizer should never affect the correctness of the query result. If this optimization is too hard to implement with current code base, we should improve the code base first, instead of rushing in a partial implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I rethink more and then i will add a metadataOnly optimizer to optimizer list.Thanks.

@cloud-fan
Copy link
Contributor

hi @lianhuiwang , thanks for working on it!

The overall idea LGTM, we should elimiante unnecessary file scan if only partition columns are read. However, the current implementation looks not corrected, we should also consider the number of rows. I also took a look at the hive path, it only optimize partition columns used as aggregation keys, where the number of duplicated rows doesn't matter.

I think we should either narrow down the scope of this PR and focus on aggregation queries, or spent some more time for a more general design.

cc @yhuai @liancheng

@lianhuiwang
Copy link
Contributor Author

lianhuiwang commented Jun 23, 2016

@cloud-fan Yes, I think what you said is right. as Hive/Prestodb, if queries that did some functions (example: MIN/MAX) or distinct aggregates on partition column and the value of config 'spark.sql.optimizer.metadataOnly' is true, then we can use the metadata-only optimization.
I will add a metadataOnly optimizer to optimizer list.Thanks.

@SparkQA
Copy link

SparkQA commented Jun 24, 2016

Test build #61161 has finished for PR 13494 at commit 7d7ece0.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor Author

@cloud-fan Now i have added a extendedHiveOptimizerRules that include MetadataOnly Optimization for Hive Optimizer.
Firstly,MetadataOnly Optimization should be in Hive Model because MetastoreRelation only can be used in Hive now.
Secondly, MetadataOnly Optimization should be between Analyzer and RewriteDistinctAggregates.
In the future, we can add ParquetConversions/OrcConversions and other optimizations into extendedHiveOptimizerRules.

@rxin
Copy link
Contributor

rxin commented Jun 24, 2016

Why is this rule Hive specific?

@lianhuiwang
Copy link
Contributor Author

lianhuiwang commented Jun 24, 2016

@rxin good point. Because now MetastoreRelation only be defined in Hive now and if we make it using MetadataOnly optimization, like this PR we can use MetadataOnly optimization in Hive Component.
if not, we needs divide MetadataOnly optimization into two part, one for common sql, other for HiveQl.
I will think more about it and try my best to resolve it. Thanks.

val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization.")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we turn it on by default?

@SparkQA
Copy link

SparkQA commented Jun 24, 2016

Test build #61162 has finished for PR 13494 at commit 2e55a9d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if files.partitionSchema.nonEmpty =>
(Some(relation), Seq.empty[Expression])

case relation: MetastoreRelation if relation.partitionKeys.nonEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetastoreRelation extends CatalogRelation, I think we can put this rule in sql core instead of hive module.

@SparkQA
Copy link

SparkQA commented Jun 24, 2016

Test build #61163 has finished for PR 13494 at commit b2b6eba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 24, 2016

Test build #61164 has finished for PR 13494 at commit c5a291e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class JavaPackage
    • case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) extends LeafExecNode

@lianhuiwang
Copy link
Contributor Author

@hvanhovell I have addressed some of your comments. Thanks. Could you look at again?

/**
* Returns the partition attributes of the table relation plan.
*/
def getPartitionAttrs(partitionColumnNames: Seq[String], relation: LogicalPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Style.

def getPartitionAttrs(
    partitionColumnNames: Seq[String],
    relation: LogicalPlan): Seq[Attribute] = { ...

While you are at it. Change the return type to AttributeSet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it. Thanks.

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62110 has finished for PR 13494 at commit d888c85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case plan if plan eq relation =>
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
val partAttrs = PartitionedRelation.getPartitionAttrs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does getPartitionAttrs need to be a method in PartitionedRelation? I think it can just be a private method in parent class.

Copy link
Contributor Author

@lianhuiwang lianhuiwang Jul 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. Because object PartitionedRelation also use getPartitionAttrs, Now i just define it in PartitionedRelation. If it define a private method in class OptimizeMetadataOnlyQuery, there are two same getPartitionAttrs() functions in PartitionedRelation and OptimizeMetadataOnlyQuery.
How about define two same getPartitionAttrs() functions? or has another way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I will define two functions for getPartitionAttrs(). In the future, I think we can put getPartitionAttrs() into relation plan. If i has some problem, please tell me. thanks.

@lianhuiwang
Copy link
Contributor Author

@cloud-fan @hvanhovell about getPartitionAttrs() It has a improve place that we can define it in relation node. but now relation node has not this function. how about added in follow-up PRs? Thanks.

@SparkQA
Copy link

SparkQA commented Jul 12, 2016

Test build #62137 has finished for PR 13494 at commit ff16509.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* Returns the partition attributes of the table relation plan.
*/
private def getPartitionAttrs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, inner class can access private member of outer class, we don't need to duplicate the method in inner class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks.

@lianhuiwang
Copy link
Contributor Author

@cloud-fan I have addressed your latest comments. thanks.

@SparkQA
Copy link

SparkQA commented Jul 12, 2016

Test build #62156 has finished for PR 13494 at commit 030776a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

LGTM - Merging to master. Thanks!

@asfgit asfgit closed this in 5ad68ba Jul 12, 2016
@lianhuiwang
Copy link
Contributor Author

Thank you for review and merging. @rxin @hvanhovell @cloud-fan .

asfgit pushed a commit that referenced this pull request Jan 25, 2019
…cords correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in #13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes #23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: Xiao Li <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit f5b9370)
Signed-off-by: gatorsmile <[email protected]>
asfgit pushed a commit that referenced this pull request Jan 25, 2019
…cords correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in #13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes #23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: Xiao Li <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
gengliangwang added a commit to gengliangwang/spark that referenced this pull request Jan 25, 2019
…cords correctly

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in apache#13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

Unit test

Closes apache#23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: Xiao Li <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
asfgit pushed a commit that referenced this pull request Jan 26, 2019
…dle empty records correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in #13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?
Unit test

Closes #23648 from gengliangwang/SPARK-26709.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…cords correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in apache#13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes apache#23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: Xiao Li <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…cords correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in apache#13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes apache#23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: Xiao Li <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit f5b9370)
Signed-off-by: gatorsmile <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…cords correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in apache#13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes apache#23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: Xiao Li <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit f5b9370)
Signed-off-by: gatorsmile <[email protected]>
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
* [SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty records correctly

## What changes were proposed in this pull request?

When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
```
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
sql("SELECT MAX(p1) FROM t")
```
The result is supposed to be `null`. However, with the optimization the result is `5`.

The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in apache#13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.

It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.

## How was this patch tested?

Unit test

Closes apache#23635 from gengliangwang/optimizeMetadata.

Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: Xiao Li <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit f5b9370)
Signed-off-by: gatorsmile <[email protected]>

* [SPARK-26080][PYTHON] Skips Python resource limit on Windows in Python worker

## What changes were proposed in this pull request?

`resource` package is a Unix specific package. See https://docs.python.org/2/library/resource.html and https://docs.python.org/3/library/resource.html.

Note that we document Windows support:

> Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS).

This should be backported into branch-2.4 to restore Windows support in Spark 2.4.1.

## How was this patch tested?

Manually mocking the changed logics.

Closes apache#23055 from HyukjinKwon/SPARK-26080.

Lead-authored-by: hyukjinkwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 9cda9a8)
Signed-off-by: Hyukjin Kwon <[email protected]>

* [SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.

## What changes were proposed in this pull request?

Updates FileFormatWriter to create a consistent Hadoop Job ID for a write.

## How was this patch tested?

Existing tests for regressions.

Closes apache#23777 from rdblue/SPARK-26873-fix-file-format-writer-job-ids.

Authored-by: Ryan Blue <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit 33334e2)
Signed-off-by: Marcelo Vanzin <[email protected]>

* [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>

* [SPARK-26677][BUILD] Update Parquet to 1.10.1 with notEq pushdown fix.

## What changes were proposed in this pull request?

Update to Parquet Java 1.10.1.

## How was this patch tested?

Added a test from HyukjinKwon that validates the notEq case from SPARK-26677.

Closes apache#23704 from rdblue/SPARK-26677-fix-noteq-parquet-bug.

Lead-authored-by: Ryan Blue <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Ryan Blue <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit f72d217)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-26677][FOLLOWUP][BRANCH-2.4] Update Parquet manifest with Hadoop-2.6

## What changes were proposed in this pull request?

During merging Parquet upgrade PR, `hadoop-2.6` profile dependency manifest is missed.

## How was this patch tested?

Manual.

```
./dev/test-dependencies.sh
```

Also, this will recover `branch-2.4` with `hadoop-2.6` build.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.4-test-sbt-hadoop-2.6/281/

Closes apache#23738 from dongjoon-hyun/SPARK-26677-2.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

## What changes were proposed in this pull request?

When performing non-cascading cache invalidation, `recache` is called on the other cache entries which are dependent on the cache being invalidated. It leads to the the physical plans of those cache entries being re-compiled. For those cache entries, if the cache RDD has already been persisted, chances are there will be inconsistency between the data and the new plan. It can cause a correctness issue if the new plan's `outputPartitioning`  or `outputOrdering` is different from the that of the actual data, and meanwhile the cache is used by another query that asks for specific `outputPartitioning` or `outputOrdering` which happens to match the new plan but not the actual data.

The fix is to keep the cache entry as it is if the data has been loaded, otherwise re-build the cache entry, with a new plan and an empty cache buffer.

## How was this patch tested?

Added UT.

Closes apache#23678 from maryannxue/spark-26708-2.4.

Authored-by: maryannxue <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>

* [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)

## What changes were proposed in this pull request?

Backport apache#23324 to branch-2.4.

## How was this patch tested?

Jenkins

Closes apache#23365 from zsxwing/SPARK-26267-2.4.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>

* [SPARK-26706][SQL] Fix `illegalNumericPrecedence` for ByteType

This PR contains a minor change in `Cast$mayTruncate` that fixes its logic for bytes.

Right now, `mayTruncate(ByteType, LongType)` returns `false` while `mayTruncate(ShortType, LongType)` returns `true`. Consequently, `spark.range(1, 3).as[Byte]` and `spark.range(1, 3).as[Short]` behave differently.

Potentially, this bug can silently corrupt someone's data.
```scala
// executes silently even though Long is converted into Byte
spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
  .map(b => b - 1)
  .show()
+-----+
|value|
+-----+
|  -12|
|  -11|
|  -10|
|   -9|
|   -8|
|   -7|
|   -6|
|   -5|
|   -4|
|   -3|
+-----+
// throws an AnalysisException: Cannot up cast `id` from bigint to smallint as it may truncate
spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
  .map(s => s - 1)
  .show()
```

This PR comes with a set of unit tests.

Closes apache#23632 from aokolnychyi/cast-fix.

Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: DB Tsai <[email protected]>

* [SPARK-26078][SQL][BACKPORT-2.4] Dedup self-join attributes on IN subqueries

## What changes were proposed in this pull request?

When there is a self-join as result of a IN subquery, the join condition may be invalid, resulting in trivially true predicates and return wrong results.

The PR deduplicates the subquery output in order to avoid the issue.

## How was this patch tested?

added UT

Closes apache#23449 from mgaido91/SPARK-26078_2.4.

Authored-by: Marco Gaido <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-26233][SQL][BACKPORT-2.4] CheckOverflow when encoding a decimal value

## What changes were proposed in this pull request?

When we encode a Decimal from external source we don't check for overflow. That method is useful not only in order to enforce that we can represent the correct value in the specified range, but it also changes the underlying data to the right precision/scale. Since in our code generation we assume that a decimal has exactly the same precision and scale of its data type, missing to enforce it can lead to corrupted output/results when there are subsequent transformations.

## How was this patch tested?

added UT

Closes apache#23232 from mgaido91/SPARK-26233_2.4.

Authored-by: Marco Gaido <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-27097][CHERRY-PICK 2.4] Avoid embedding platform-dependent offsets literally in whole-stage generated code

## What changes were proposed in this pull request?

Spark SQL performs whole-stage code generation to speed up query execution. There are two steps to it:
- Java source code is generated from the physical query plan on the driver. A single version of the source code is generated from a query plan, and sent to all executors.
  - It's compiled to bytecode on the driver to catch compilation errors before sending to executors, but currently only the generated source code gets sent to the executors. The bytecode compilation is for fail-fast only.
- Executors receive the generated source code and compile to bytecode, then the query runs like a hand-written Java program.

In this model, there's an implicit assumption about the driver and executors being run on similar platforms. Some code paths accidentally embedded platform-dependent object layout information into the generated code, such as:
```java
Platform.putLong(buffer, /* offset */ 24, /* value */ 1);
```
This code expects a field to be at offset +24 of the `buffer` object, and sets a value to that field.
But whole-stage code generation generally uses platform-dependent information from the driver. If the object layout is significantly different on the driver and executors, the generated code can be reading/writing to wrong offsets on the executors, causing all kinds of data corruption.

One code pattern that leads to such problem is the use of `Platform.XXX` constants in generated code, e.g. `Platform.BYTE_ARRAY_OFFSET`.

Bad:
```scala
val baseOffset = Platform.BYTE_ARRAY_OFFSET
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will embed the value of `Platform.BYTE_ARRAY_OFFSET` on the driver into the generated code.

Good:
```scala
val baseOffset = "Platform.BYTE_ARRAY_OFFSET"
// codegen template:
s"Platform.putLong($buffer, $baseOffset, $value);"
```
This will generate the offset symbolically -- `Platform.putLong(buffer, Platform.BYTE_ARRAY_OFFSET, value)`, which will be able to pick up the correct value on the executors.

Caveat: these offset constants are declared as runtime-initialized `static final` in Java, so they're not compile-time constants from the Java language's perspective. It does lead to a slightly increased size of the generated code, but this is necessary for correctness.

NOTE: there can be other patterns that generate platform-dependent code on the driver which is invalid on the executors. e.g. if the endianness is different between the driver and the executors, and if some generated code makes strong assumption about endianness, it would also be problematic.

## How was this patch tested?

Added a new test suite `WholeStageCodegenSparkSubmitSuite`. This test suite needs to set the driver's extraJavaOptions to force the driver and executor use different Java object layouts, so it's run as an actual SparkSubmit job.

Authored-by: Kris Mok <kris.mokdatabricks.com>

Closes apache#24032 from gatorsmile/testFailure.

Lead-authored-by: Kris Mok <[email protected]>
Co-authored-by: gatorsmile <[email protected]>
Signed-off-by: DB Tsai <[email protected]>

* [SPARK-26188][SQL] FileIndex: don't infer data types of partition columns if user specifies schema

## What changes were proposed in this pull request?

This PR is to fix a regression introduced in: https://github.com/apache/spark/pull/21004/files#r236998030

If user specifies schema, Spark don't need to infer data type for of partition columns, otherwise the data type might not match with the one user provided.
E.g. for partition directory `p=4d`, after data type inference  the column value will be `4.0`.
See https://issues.apache.org/jira/browse/SPARK-26188 for more details.

Note that user specified schema **might not cover all the data columns**:
```
val schema = new StructType()
  .add("id", StringType)
  .add("ex", ArrayType(StringType))
val df = spark.read
  .schema(schema)
  .format("parquet")
  .load(src.toString)

assert(df.schema.toList === List(
  StructField("ex", ArrayType(StringType)),
  StructField("part", IntegerType), // inferred partitionColumn dataType
  StructField("id", StringType))) // used user provided partitionColumn dataType
```
For the missing columns in user specified schema, Spark still need to infer their data types if `partitionColumnTypeInferenceEnabled` is enabled.

To implement the partially inference, refactor `PartitioningUtils.parsePartitions`  and pass the user specified schema as parameter to cast partition values.

## How was this patch tested?

Add unit test.

Closes apache#23165 from gengliangwang/fixFileIndex.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 9cfc3ee)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-25921][PYSPARK] Fix barrier task run without BarrierTaskContext while python worker reuse

## What changes were proposed in this pull request?

Running a barrier job after a normal spark job causes the barrier job to run without a BarrierTaskContext. This is because while python worker reuse, BarrierTaskContext._getOrCreate() will still return a TaskContext after firstly submit a normal spark job, we'll get a `AttributeError: 'TaskContext' object has no attribute 'barrier'`. Fix this by adding check logic in BarrierTaskContext._getOrCreate() and make sure it will return BarrierTaskContext in this scenario.

## How was this patch tested?

Add new UT in pyspark-core.

Closes apache#22962 from xuanyuanking/SPARK-25921.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit c00e72f)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants