Skip to content

Conversation

@HyukjinKwon
Copy link
Owner

No description provided.

@HyukjinKwon HyukjinKwon deleted the SPARK-32357-test-download branch December 7, 2020 02:06
HyukjinKwon pushed a commit that referenced this pull request Mar 9, 2021
### What changes were proposed in this pull request?

`BroadcastNestedLoopJoinExec` does not have code-gen, and we can potentially boost the CPU performance for this operator if we add code-gen for it. https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html also showed the evidence in one fork.

The codegen for `BroadcastNestedLoopJoinExec` shared some code with `HashJoin`, and the interface `JoinCodegenSupport` is created to hold those common logic. This PR is only supporting inner and cross join. Other join types will be added later in followup PRs.

Example query and generated code:

```
val df1 = spark.range(4).select($"id".as("k1"))
val df2 = spark.range(3).select($"id".as("k2"))
df1.join(df2, $"k1" + 1 =!= $"k2").explain("codegen")
```

```
== Subtree 2 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:203(0.31% used); numInnerClasses:0) ==
*(2) BroadcastNestedLoopJoin BuildRight, Inner, NOT ((k1#2L + 1) = k2#6L)
:- *(2) Project [id#0L AS k1#2L]
:  +- *(2) Range (0, 4, step=1, splits=2)
+- BroadcastExchange IdentityBroadcastMode, [id=#22]
   +- *(1) Project [id#4L AS k2#6L]
      +- *(1) Range (0, 3, step=1, splits=2)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskContext range_taskContext_0;
/* 012 */   private InputMetrics range_inputMetrics_0;
/* 013 */   private long range_batchEnd_0;
/* 014 */   private long range_numElementsTodo_0;
/* 015 */   private InternalRow[] bnlj_buildRowArray_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */
/* 026 */     range_taskContext_0 = TaskContext.get();
/* 027 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 028 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */     bnlj_buildRowArray_0 = (InternalRow[]) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcastTerm */).value();
/* 032 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 033 */
/* 034 */   }
/* 035 */
/* 036 */   private void bnlj_doConsume_0(long bnlj_expr_0_0) throws java.io.IOException {
/* 037 */     for (int bnlj_arrayIndex_0 = 0; bnlj_arrayIndex_0 < bnlj_buildRowArray_0.length; bnlj_arrayIndex_0++) {
/* 038 */       UnsafeRow bnlj_buildRow_0 = (UnsafeRow) bnlj_buildRowArray_0[bnlj_arrayIndex_0];
/* 039 */
/* 040 */       long bnlj_value_1 = bnlj_buildRow_0.getLong(0);
/* 041 */
/* 042 */       long bnlj_value_4 = -1L;
/* 043 */
/* 044 */       bnlj_value_4 = bnlj_expr_0_0 + 1L;
/* 045 */
/* 046 */       boolean bnlj_value_3 = false;
/* 047 */       bnlj_value_3 = bnlj_value_4 == bnlj_value_1;
/* 048 */       boolean bnlj_value_2 = false;
/* 049 */       bnlj_value_2 = !(bnlj_value_3);
/* 050 */       if (!(false || !bnlj_value_2))
/* 051 */       {
/* 052 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 053 */
/* 054 */         range_mutableStateArray_0[3].reset();
/* 055 */
/* 056 */         range_mutableStateArray_0[3].write(0, bnlj_expr_0_0);
/* 057 */
/* 058 */         range_mutableStateArray_0[3].write(1, bnlj_value_1);
/* 059 */         append((range_mutableStateArray_0[3].getRow()).copy());
/* 060 */
/* 061 */       }
/* 062 */     }
/* 063 */
/* 064 */   }
/* 065 */
/* 066 */   private void initRange(int idx) {
/* 067 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 068 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
/* 069 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(4L);
/* 070 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 071 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 072 */     long partitionEnd;
/* 073 */
/* 074 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 075 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 076 */       range_nextIndex_0 = Long.MAX_VALUE;
/* 077 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 078 */       range_nextIndex_0 = Long.MIN_VALUE;
/* 079 */     } else {
/* 080 */       range_nextIndex_0 = st.longValue();
/* 081 */     }
/* 082 */     range_batchEnd_0 = range_nextIndex_0;
/* 083 */
/* 084 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 085 */     .multiply(step).add(start);
/* 086 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 087 */       partitionEnd = Long.MAX_VALUE;
/* 088 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 089 */       partitionEnd = Long.MIN_VALUE;
/* 090 */     } else {
/* 091 */       partitionEnd = end.longValue();
/* 092 */     }
/* 093 */
/* 094 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 095 */       java.math.BigInteger.valueOf(range_nextIndex_0));
/* 096 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
/* 097 */     if (range_numElementsTodo_0 < 0) {
/* 098 */       range_numElementsTodo_0 = 0;
/* 099 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 100 */       range_numElementsTodo_0++;
/* 101 */     }
/* 102 */   }
/* 103 */
/* 104 */   protected void processNext() throws java.io.IOException {
/* 105 */     // initialize Range
/* 106 */     if (!range_initRange_0) {
/* 107 */       range_initRange_0 = true;
/* 108 */       initRange(partitionIndex);
/* 109 */     }
/* 110 */
/* 111 */     while (true) {
/* 112 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 113 */         long range_nextBatchTodo_0;
/* 114 */         if (range_numElementsTodo_0 > 1000L) {
/* 115 */           range_nextBatchTodo_0 = 1000L;
/* 116 */           range_numElementsTodo_0 -= 1000L;
/* 117 */         } else {
/* 118 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 119 */           range_numElementsTodo_0 = 0;
/* 120 */           if (range_nextBatchTodo_0 == 0) break;
/* 121 */         }
/* 122 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 123 */       }
/* 124 */
/* 125 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 126 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 127 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 128 */
/* 129 */         // common sub-expressions
/* 130 */
/* 131 */         bnlj_doConsume_0(range_value_0);
/* 132 */
/* 133 */         if (shouldStop()) {
/* 134 */           range_nextIndex_0 = range_value_0 + 1L;
/* 135 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 136 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 137 */           return;
/* 138 */         }
/* 139 */
/* 140 */       }
/* 141 */       range_nextIndex_0 = range_batchEnd_0;
/* 142 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 143 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 144 */       range_taskContext_0.killTaskIfInterrupted();
/* 145 */     }
/* 146 */   }
/* 147 */
/* 148 */ }
```

### Why are the changes needed?

Improve query CPU performance. Added a micro benchmark query in `JoinBenchmark.scala`.
Saw 1x of run time improvement:

```
OpenJDK 64-Bit Server VM 11.0.9+11-LTS on Linux 4.14.219-161.340.amzn2.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2  2.50GHz
broadcast nested loop join:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------
broadcast nested loop join wholestage off          62922          63052         184          0.3        3000.3       1.0X
broadcast nested loop join wholestage on           30946          30972          26          0.7        1475.6       2.0X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

* Added unit test in `WholeStageCodegenSuite.scala`, and existing unit tests for `BroadcastNestedLoopJoinExec`.
* Updated golden files for several TCPDS query plans, as whole stage code-gen for `BroadcastNestedLoopJoinExec` is triggered.
* Updated `JoinBenchmark-jdk11-results.txt ` and `JoinBenchmark-results.txt` with new benchmark result. Followed previous benchmark PRs - apache#27078 and apache#26003 to use same type of machine:

```
Amazon AWS EC2
type: r3.xlarge
region: us-west-2 (Oregon)
OS: Linux
```

Closes apache#31736 from c21/nested-join-exec.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Mar 23, 2021
…ght outer)

### What changes were proposed in this pull request?

This PR is to add code-gen support for left outer (build right) and right outer (build left). Reference: `BroadcastNestedLoopJoinExec.codegenInner()` and `BroadcastNestedLoopJoinExec.outerJoin()`

### Why are the changes needed?

Improve query CPU performance.
Tested with a simple query:
```scala
val N = 20 << 20
val M = 1 << 4

val dim = broadcast(spark.range(M).selectExpr("id as k2"))
codegenBenchmark("left outer broadcast nested loop join", N) {
   val df = spark.range(N).selectExpr(s"id as k1").join(
     dim, col("k1") + 1 <= col("k2"), "left_outer")
   assert(df.queryExecution.sparkPlan.find(
     _.isInstanceOf[BroadcastNestedLoopJoinExec]).isDefined)
   df.noop()
}
```
Seeing 2x run time improvement:
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
left outer broadcast nested loop join:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------------------
left outer broadcast nested loop join wholestage off           3024           3698         953          6.9         144.2       1.0X
left outer broadcast nested loop join wholestage on            1512           1659         172         13.9          72.1       2.0X
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Changed existing unit tests in `OuterJoinSuite` to cover codegen use cases.
Added unit test in WholeStageCodegenSuite.scala to make sure code-gen for broadcast nested loop join is taking effect, and test for multiple join case as well.

Example query:
```scala
val df1 = spark.range(4).select($"id".as("k1"))
val df2 = spark.range(3).select($"id".as("k2"))
df1.join(df2, $"k1" + 1 <= $"k2", "left_outer").explain("codegen")
```
Example generated code (`bnlj_doConsume_0` method):
```java
== Subtree 2 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:210(0.32% used); numInnerClasses:0) ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftOuter, ((k1#2L + 1) <= k2#6L)
:- *(2) Project [id#0L AS k1#2L]
:  +- *(2) Range (0, 4, step=1, splits=16)
+- BroadcastExchange IdentityBroadcastMode, [id=#22]
   +- *(1) Project [id#4L AS k2#6L]
      +- *(1) Range (0, 3, step=1, splits=16)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskContext range_taskContext_0;
/* 012 */   private InputMetrics range_inputMetrics_0;
/* 013 */   private long range_batchEnd_0;
/* 014 */   private long range_numElementsTodo_0;
/* 015 */   private InternalRow[] bnlj_buildRowArray_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */
/* 026 */     range_taskContext_0 = TaskContext.get();
/* 027 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 028 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */     bnlj_buildRowArray_0 = (InternalRow[]) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcastTerm */).value();
/* 032 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 033 */
/* 034 */   }
/* 035 */
/* 036 */   private void bnlj_doConsume_0(long bnlj_expr_0_0) throws java.io.IOException {
/* 037 */     boolean bnlj_foundMatch_0 = false;
/* 038 */     for (int bnlj_arrayIndex_0 = 0; bnlj_arrayIndex_0 < bnlj_buildRowArray_0.length; bnlj_arrayIndex_0++) {
/* 039 */       UnsafeRow bnlj_buildRow_0 = (UnsafeRow) bnlj_buildRowArray_0[bnlj_arrayIndex_0];
/* 040 */       boolean bnlj_shouldOutputRow_0 = false;
/* 041 */
/* 042 */       boolean bnlj_isNull_2 = true;
/* 043 */       long bnlj_value_2 = -1L;
/* 044 */       if (bnlj_buildRow_0 != null) {
/* 045 */         long bnlj_value_1 = bnlj_buildRow_0.getLong(0);
/* 046 */         bnlj_isNull_2 = false;
/* 047 */         bnlj_value_2 = bnlj_value_1;
/* 048 */       }
/* 049 */
/* 050 */       long bnlj_value_4 = -1L;
/* 051 */
/* 052 */       bnlj_value_4 = bnlj_expr_0_0 + 1L;
/* 053 */
/* 054 */       boolean bnlj_value_3 = false;
/* 055 */       bnlj_value_3 = bnlj_value_4 <= bnlj_value_2;
/* 056 */       if (!(false || !bnlj_value_3))
/* 057 */       {
/* 058 */         bnlj_shouldOutputRow_0 = true;
/* 059 */         bnlj_foundMatch_0 = true;
/* 060 */       }
/* 061 */       if (bnlj_arrayIndex_0 == bnlj_buildRowArray_0.length - 1 && !bnlj_foundMatch_0) {
/* 062 */         bnlj_buildRow_0 = null;
/* 063 */         bnlj_shouldOutputRow_0 = true;
/* 064 */       }
/* 065 */       if (bnlj_shouldOutputRow_0) {
/* 066 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 067 */
/* 068 */         boolean bnlj_isNull_9 = true;
/* 069 */         long bnlj_value_9 = -1L;
/* 070 */         if (bnlj_buildRow_0 != null) {
/* 071 */           long bnlj_value_8 = bnlj_buildRow_0.getLong(0);
/* 072 */           bnlj_isNull_9 = false;
/* 073 */           bnlj_value_9 = bnlj_value_8;
/* 074 */         }
/* 075 */         range_mutableStateArray_0[3].reset();
/* 076 */
/* 077 */         range_mutableStateArray_0[3].zeroOutNullBytes();
/* 078 */
/* 079 */         range_mutableStateArray_0[3].write(0, bnlj_expr_0_0);
/* 080 */
/* 081 */         if (bnlj_isNull_9) {
/* 082 */           range_mutableStateArray_0[3].setNullAt(1);
/* 083 */         } else {
/* 084 */           range_mutableStateArray_0[3].write(1, bnlj_value_9);
/* 085 */         }
/* 086 */         append((range_mutableStateArray_0[3].getRow()).copy());
/* 087 */
/* 088 */       }
/* 089 */     }
/* 090 */
/* 091 */   }
/* 092 */
/* 093 */   private void initRange(int idx) {
/* 094 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 095 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(16L);
/* 096 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(4L);
/* 097 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 098 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 099 */     long partitionEnd;
/* 100 */
/* 101 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 102 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 103 */       range_nextIndex_0 = Long.MAX_VALUE;
/* 104 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 105 */       range_nextIndex_0 = Long.MIN_VALUE;
/* 106 */     } else {
/* 107 */       range_nextIndex_0 = st.longValue();
/* 108 */     }
/* 109 */     range_batchEnd_0 = range_nextIndex_0;
/* 110 */
/* 111 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 112 */     .multiply(step).add(start);
/* 113 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 114 */       partitionEnd = Long.MAX_VALUE;
/* 115 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 116 */       partitionEnd = Long.MIN_VALUE;
/* 117 */     } else {
/* 118 */       partitionEnd = end.longValue();
/* 119 */     }
/* 120 */
/* 121 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 122 */       java.math.BigInteger.valueOf(range_nextIndex_0));
/* 123 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
/* 124 */     if (range_numElementsTodo_0 < 0) {
/* 125 */       range_numElementsTodo_0 = 0;
/* 126 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 127 */       range_numElementsTodo_0++;
/* 128 */     }
/* 129 */   }
/* 130 */
/* 131 */   protected void processNext() throws java.io.IOException {
/* 132 */     // initialize Range
/* 133 */     if (!range_initRange_0) {
/* 134 */       range_initRange_0 = true;
/* 135 */       initRange(partitionIndex);
/* 136 */     }
/* 137 */
/* 138 */     while (true) {
/* 139 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 140 */         long range_nextBatchTodo_0;
/* 141 */         if (range_numElementsTodo_0 > 1000L) {
/* 142 */           range_nextBatchTodo_0 = 1000L;
/* 143 */           range_numElementsTodo_0 -= 1000L;
/* 144 */         } else {
/* 145 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 146 */           range_numElementsTodo_0 = 0;
/* 147 */           if (range_nextBatchTodo_0 == 0) break;
/* 148 */         }
/* 149 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 150 */       }
/* 151 */
/* 152 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 153 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 154 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 155 */
/* 156 */         // common sub-expressions
/* 157 */
/* 158 */         bnlj_doConsume_0(range_value_0);
/* 159 */
/* 160 */         if (shouldStop()) {
/* 161 */           range_nextIndex_0 = range_value_0 + 1L;
/* 162 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 163 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 164 */           return;
/* 165 */         }
/* 166 */
/* 167 */       }
/* 168 */       range_nextIndex_0 = range_batchEnd_0;
/* 169 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 170 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 171 */       range_taskContext_0.killTaskIfInterrupted();
/* 172 */     }
/* 173 */   }
/* 174 */
/* 175 */ }
```

Closes apache#31931 from linzebing/code-left-right-outer.

Authored-by: linzebing <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Jul 22, 2021
### What changes were proposed in this pull request?
This PR adds optimization for scalar and lateral subqueries with OneRowRelation as leaf nodes. It inlines such subqueries before decorrelation to avoid rewriting them as left outer joins. It also introduces a flag to turn on/off this optimization: `spark.sql.optimizer.optimizeOneRowRelationSubquery` (default: True).

For example:
```sql
select (select c1) from t
```
Analyzed plan:
```
Project [scalar-subquery#17 [c1#18] AS scalarsubquery(c1)#22]
:  +- Project [outer(c1#18)]
:     +- OneRowRelation
+- LocalRelation [c1#18, c2#19]
```

Optimized plan before this PR:
```
Project [c1#18#25 AS scalarsubquery(c1)#22]
+- Join LeftOuter, (c1#24 <=> c1#18)
   :- LocalRelation [c1#18]
   +- Aggregate [c1#18], [c1#18 AS c1#18#25, c1#18 AS c1#24]
      +- LocalRelation [c1#18]
```

Optimized plan after this PR:
```
LocalRelation [scalarsubquery(c1)#22]
```

### Why are the changes needed?
To optimize query plans.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new unit tests.

Closes apache#33284 from allisonwang-db/spark-36063-optimize-subquery-one-row-relation.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Apr 13, 2022
### What changes were proposed in this pull request?
This PR adds optimization for scalar and lateral subqueries with OneRowRelation as leaf nodes. It inlines such subqueries before decorrelation to avoid rewriting them as left outer joins. It also introduces a flag to turn on/off this optimization: `spark.sql.optimizer.optimizeOneRowRelationSubquery` (default: True).

For example:
```sql
select (select c1) from t
```
Analyzed plan:
```
Project [scalar-subquery#17 [c1#18] AS scalarsubquery(c1)#22]
:  +- Project [outer(c1#18)]
:     +- OneRowRelation
+- LocalRelation [c1#18, c2#19]
```

Optimized plan before this PR:
```
Project [c1#18#25 AS scalarsubquery(c1)#22]
+- Join LeftOuter, (c1#24 <=> c1#18)
   :- LocalRelation [c1#18]
   +- Aggregate [c1#18], [c1#18 AS c1#18#25, c1#18 AS c1#24]
      +- LocalRelation [c1#18]
```

Optimized plan after this PR:
```
LocalRelation [scalarsubquery(c1)#22]
```

### Why are the changes needed?
To optimize query plans.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new unit tests.

Closes apache#33284 from allisonwang-db/spark-36063-optimize-subquery-one-row-relation.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit de8e4be)
Signed-off-by: Wenchen Fan <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Jul 6, 2025
…pressions in `buildAggExprList`

### What changes were proposed in this pull request?

Trim aliases before matching Sort/Having/Filter expressions with semantically equal expression from the Aggregate below in `buildAggExprList`

### Why are the changes needed?
For a query like:
```
SELECT course, year, GROUPING(course) FROM courseSales GROUP BY CUBE(course, year) ORDER BY GROUPING(course)
```

Plan after `ResolveReferences` and before `ResolveAggregateFunctions` looks like:

```
!Sort [cast((shiftright(tempresolvedcolumn(spark_grouping_id#18L, spark_grouping_id, false), 1) & 1) as tinyint) AS grouping(course)#22 ASC NULLS FIRST], true
 +- Aggregate [course#19, year#20, spark_grouping_id#18L], [course#19, year#20, cast((shiftright(spark_grouping_id#18L, 1) & 1) as tinyint) AS grouping(course)#21 AS grouping(course)#15]
....
```
Because aggregate list has `Alias(Alias(cast((shiftright(spark_grouping_id#18L, 1) & 1) as tinyint))` expression from `SortOrder` won't get matched as semantically equal and it will result in adding an unnecessary `Project`. By stripping inner aliases from aggregate list (that are going to get removed anyways in `CleanupAliases`) we can match `SortOrder` expression and resolve it as `grouping(course)#15`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51339 from mihailotim-db/mihailotim-db/fix_inner_aliases_semi_structured.

Authored-by: Mihailo Timotic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants