Skip to content

Conversation

@heary-cao
Copy link
Contributor

@heary-cao heary-cao commented Aug 11, 2017

What changes were proposed in this pull request?

Currently, Did a lot of special handling for non-deterministic projects and filters in optimizer. but not good enough. this patch add a new special case for non-deterministic filters.
in my spark-shell,execute the following SQL statement:

val df3 = (0 until 50).map(i => (i % 2, i % 3, i % 4, i % 5, i % 6, i % 7, i % 8, i % 9, i % 10, i % 11,(i % 2).toString, (i % 3).toString, (i % 4).toString, (i % 5).toString, (i % 6).toString, (i % 7).toString, (i % 2).toDouble, (i % 3).toDouble, (i % 4).toDouble, (i % 5).toDouble, (i % 6).toDouble, (i % 7).toDouble)).toDF("i2","i3","i4","i5","i6","i7","i8","i9","i10","i11","s2","s3","s4","s5","s6","s7","d2","d3","d4","d5","d6","d7")
df3.write.format("orc").partitionBy("i2").bucketBy(8, "i3","i4","i5","i6","i7","i8","i9","i10","i11","s2","s3","s4","s5","s6","s7","d2","d3","d4","d5","d6","d7").saveAsTable("tableorc")

val df = spark.sql("SELECT t1.i3 from tableorc t1 where rand(10) <= 0.5")
println("executed Plan:" + df.queryExecution.executedPlan)
df.show(500)

Before modified,
executed Plan:

*Project [i3#0]
+- *Filter (rand(10) <= 0.5)
   +- *FileScan orc default.tableorc[i3#0,i4#1,i5#2,i6#3,i7#4,i8#5,i9#6,i10#7,i11#8,s2#9,s3#10,s4#11,s5#12,s6#13,s7#14,d2#15,d3#16,d4#17,d5#18,d6#19,d7#20,i2#21] Batched: false, Format: ORC, Location: CatalogFileIndex[file:/home/cxw/spark/bin/spark-warehouse/tableorc], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i3:int,i4:int,i5:int,i6:int,i7:int,i8:int,i9:int,i10:int,i11:int,s2:string,s3:string,s4:st...

FileScanRDD read userdata: [0,0,0,4,0,3,0,6,4,2,b800000001,c000000001,c800000001,d000000001,d800000001,e000000001,0,0,0,4010000000000000,0,4008000000000000,0,30,30,30,34,30,33]

After modified,
executed Plan:

*Project [i3#0]
+- *Filter (rand(10) <= 0.5)
   +- *FileScan orc default.tableorc[i3#0,i2#21] Batched: false, Format: ORC, Location: PrunedInMemoryFileIndex[file:/home/cxw/spark/bin/spark-warehouse/tableorc/i2=0], PartitionCount: 1, PartitionFilters: [(rand(10) <= 0.5)], PushedFilters: [], ReadSchema: struct<i3:int>

FileScanRDD read userdata: [0,2,0]

So the PR description deal with that we only need to read needs fields.
In addition, we cluster in real environment. HiveTableScans also scan more columns according to the execution plan.

HiveTableScans plan:Aggregate [k#2L], [k#2L, k#2L, sum(cast(id#1 as bigint)) AS sum(id)#395L]
+- Project [d004#205 AS id#1, CEIL(c010#214) AS k#2L]
   +- Filter ((isnotnull(d004#205) && (rand(-4530215890880734772) <= 0.5)) && NOT (cast(cast(d004#205 as decimal(10,0)) as decimal(11,1)) = 0.0))
      +- MetastoreRelation XXX_database, XXX_table

HiveTableScans plan:Project [d004#205 AS id#1, CEIL(c010#214) AS k#2L]
+- Filter ((isnotnull(d004#205) && (rand(-4530215890880734772) <= 0.5)) && NOT (cast(cast(d004#205 as decimal(10,0)) as decimal(11,1)) = 0.0))
   +- MetastoreRelation XXX_database, XXX_table

HiveTableScans plan:Filter ((isnotnull(d004#205) && (rand(-4530215890880734772) <= 0.5)) && NOT (cast(cast(d004#205 as decimal(10,0)) as decimal(11,1)) = 0.0))
+- MetastoreRelation XXX_database, XXX_table

HiveTableScans plan:MetastoreRelation XXX_database, XXX_table

HiveTableScans result plan:HiveTableScan [c030#204L, d004#205, d005#206, d025#207, c002#208, d023#209, d024#210, c005#211L, c008#212, c009#213, c010#214, d021#215, d022#216, c017#217, c018#218, c019#219, c020#220, c021#221, c022#222, c023#223, c024#224, c025#225, c026#226, c027#227, ... 169 more fields], MetastoreRelation  XXX_database, XXX_table

HadoopRDD also read more userdata: {62340760016026144, 254850, 0, 64F00053E382D3AB, 3, , , null, 550667202, -78, -7.0, 6373, 152963, 114.13232277, 32.16357801, 2, 26, -116.657997, 21, 27, 15, 0.021978, -3, 3, -270543.0, 77187.0, 5041, 560, 7, 187, 003E3820BB8F8CA3, 9, 255, 2, 4, null, , , 101, 37.51, 202.74, , , , , , 39309, 610824, 52, 152, -117, 37900, 0, , , , , , , null, null, , null, null, null, null, null, null, null, null, 0, null, null, null, null, null, null, null, null, 0, 4, null, 26, 26, 20, 15, 15, null, 36, 350182624, 1039, 1, 430, 48, 0, -78, null, "5041,-27055,7719", "5041,-13528,3860", "5041,-5411,1544", "5041,-2706,772", "5041,-1353,386", null, 178, 4, 0.0, 0.0, 37.51, 202.74, 0, 30, 0, null, null, 687, 3696, 14768, 26300, 850.0, 125.0, 263, , 6.97, 3.77, null, null, null, null, 256, __, null, null, null, null, null, null, null, 254850_0, null, null, null, null, null, 0, 15, 0, 0, null, null, null, , -5411, 1544, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null}

it will affect the performance of task.

How was this patch tested?

Should be covered existing test cases and add new test cases.

@heary-cao heary-cao changed the title [SQL]Improvement a special case for non-deterministic filters in optimizer [SPARK-21707][SQL]Improvement a special case for non-deterministic filters in optimizer Aug 11, 2017
@heary-cao heary-cao force-pushed the filters_non_deterministic branch from 4216b6c to b79b9af Compare August 11, 2017 10:24
@gatorsmile
Copy link
Member

ok to test

@gatorsmile
Copy link
Member

How about data source tables?

@SparkQA
Copy link

SparkQA commented Aug 11, 2017

Test build #80545 has finished for PR 18918 at commit b79b9af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@heary-cao
Copy link
Contributor Author

@gatorsmile
data source tables:
HiveTableScans result plan:HiveTableScan [c030#204L, d004#205, d005#206, d025#207, c002#208, d023#209, d024#210, c005#211L, c008#212, c009#213, c010#214, d021#215, d022#216, c017#217, c018#218, c019#219, c020#220, c021#221, c022#222, c023#223, c024#224, c025#225, c026#226, c027#227, ... 169 more fields], MetastoreRelation XXX_database, XXX_table.
thanks.

@gatorsmile
Copy link
Member

This is the plan of a Hive serde table.

The fix should not be done in optimizer. We should fix it in the place that causes the issue.

@heary-cao
Copy link
Contributor Author

heary-cao commented Aug 14, 2017

@gatorsmile
I tried to modify PhysicalOperation's collectProjectsAndFilters, Added an match case:

      case Project(fields, child: LeafNode) if !fields.forall(_.deterministic) =>
        val (_, filters, other, aliases) = collectProjectsAndFilters(child)
        val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
        (Some(substitutedFields), filters, other, collectAliases(substitutedFields))

the problem can be solved. but I'm not sure if it will cause other problems. Do you have any suggestions for that? this method is also applicable to #18892. thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we can't remove the Project if the condition is not deterministic?

Copy link
Contributor Author

@heary-cao heary-cao Aug 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we split a project, pruning it here again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it from your explanation. If I understand it correctly, when there is a Project which selects subset of output from the LeafNode, if we remove it by the below pattern, we will retrieve all fields. Is it your purpose?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't get what the test title tries to say. Can you try to rephrase it?

@heary-cao heary-cao force-pushed the filters_non_deterministic branch from b79b9af to bf81c45 Compare August 14, 2017 10:45
@SparkQA
Copy link

SparkQA commented Aug 14, 2017

Test build #80628 has finished for PR 18918 at commit bf81c45.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Yes. We should fix it in object PhysicalOperation

@heary-cao heary-cao force-pushed the filters_non_deterministic branch 2 times, most recently from 9f73949 to 4daec54 Compare August 15, 2017 08:49
@heary-cao
Copy link
Contributor Author

heary-cao commented Aug 15, 2017

@gatorsmile
I have fixt it in PhysicalOperation. and extract a new object FilterOperation to idle it.
Can you help me review it again if you have time?

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80670 has finished for PR 18918 at commit 4daec54.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@heary-cao heary-cao force-pushed the filters_non_deterministic branch from 4daec54 to 82b82af Compare August 15, 2017 11:43
@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80681 has finished for PR 18918 at commit 82b82af.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@heary-cao heary-cao force-pushed the filters_non_deterministic branch from 82b82af to df7ecaa Compare August 15, 2017 14:02
@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80686 has finished for PR 18918 at commit df7ecaa.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@heary-cao heary-cao force-pushed the filters_non_deterministic branch from df7ecaa to 97a3270 Compare August 15, 2017 16:42
@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80693 has finished for PR 18918 at commit 97a3270.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@heary-cao heary-cao force-pushed the filters_non_deterministic branch from 471d81c to 20fc87a Compare August 30, 2017 09:54
@heary-cao
Copy link
Contributor Author

heary-cao commented Aug 30, 2017

cc @cloud-fan @gatorsmile @viirya Could your take a look?

val p = path.getAbsolutePath
Seq(1 -> "a").toDF("a", "b").write.partitionBy("a").parquet(p)
val df = spark.read.parquet(p)
checkAnswer(df.filter(rand(10) <= 1.0).select($"a"), Row(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test can pass on current master.

@cloud-fan
Copy link
Contributor

what exactly are you proposing?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

gentle ping @heary-cao

@asfgit asfgit closed this in 1a4fda8 Jul 19, 2018
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
Closes apache#17422
Closes apache#17619
Closes apache#18034
Closes apache#18229
Closes apache#18268
Closes apache#17973
Closes apache#18125
Closes apache#18918
Closes apache#19274
Closes apache#19456
Closes apache#19510
Closes apache#19420
Closes apache#20090
Closes apache#20177
Closes apache#20304
Closes apache#20319
Closes apache#20543
Closes apache#20437
Closes apache#21261
Closes apache#21726
Closes apache#14653
Closes apache#13143
Closes apache#17894
Closes apache#19758
Closes apache#12951
Closes apache#17092
Closes apache#21240
Closes apache#16910
Closes apache#12904
Closes apache#21731
Closes apache#21095

Added:
Closes apache#19233
Closes apache#20100
Closes apache#21453
Closes apache#21455
Closes apache#18477

Added:
Closes apache#21812
Closes apache#21787

Author: hyukjinkwon <[email protected]>

Closes apache#21781 from HyukjinKwon/closing-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants