Skip to content

Commit b420a71

Browse files
committed
Move most of the existing SMJ code into Java.
1 parent dfdb93f commit b420a71

File tree

5 files changed

+193
-210
lines changed

5 files changed

+193
-210
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import java.util.Arrays;
2222

2323
import scala.Function1;
24-
import scala.collection.AbstractIterator;
2524
import scala.collection.Iterator;
2625
import scala.math.Ordering;
2726

2827
import org.apache.spark.SparkEnv;
2928
import org.apache.spark.TaskContext;
29+
import org.apache.spark.sql.AbstractScalaRowIterator;
3030
import org.apache.spark.sql.Row;
3131
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
3232
import org.apache.spark.sql.catalyst.expressions.UnsafeRowConverter;
@@ -97,7 +97,7 @@ public Iterator<Row> sort(Iterator<Row> inputIterator) throws IOException {
9797
);
9898
}
9999
final UnsafeSorterIterator sortedIterator = sorter.getSortedIterator();
100-
return new AbstractIterator<Row>() {
100+
return new AbstractScalaRowIterator() {
101101

102102
private final int numFields = schema.length();
103103
private final UnsafeRow row = new UnsafeRow();
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.joins;
19+
20+
import java.util.NoSuchElementException;
21+
import javax.annotation.Nullable;
22+
23+
import scala.Function1;
24+
import scala.collection.Iterator;
25+
import scala.reflect.ClassTag;
26+
import scala.reflect.ClassTag$;
27+
28+
import org.apache.spark.sql.AbstractScalaRowIterator;
29+
import org.apache.spark.sql.Row;
30+
import org.apache.spark.sql.catalyst.expressions.JoinedRow5;
31+
import org.apache.spark.sql.catalyst.expressions.RowOrdering;
32+
import org.apache.spark.util.collection.CompactBuffer;
33+
34+
/**
35+
* Implements the merge step of sort-merge join.
36+
*/
37+
class SortMergeJoinIterator extends AbstractScalaRowIterator {
38+
39+
private static final ClassTag<Row> ROW_CLASS_TAG = ClassTag$.MODULE$.apply(Row.class);
40+
private final Iterator<Row> leftIter;
41+
private final Iterator<Row> rightIter;
42+
private final Function1<Row, Row> leftKeyGenerator;
43+
private final Function1<Row, Row> rightKeyGenerator;
44+
private final RowOrdering keyOrdering;
45+
private final JoinedRow5 joinRow = new JoinedRow5();
46+
47+
@Nullable private Row leftElement;
48+
@Nullable private Row rightElement;
49+
private Row leftKey;
50+
private Row rightKey;
51+
@Nullable private CompactBuffer<Row> rightMatches;
52+
private int rightPosition = -1;
53+
private boolean stop = false;
54+
private Row matchKey;
55+
56+
public SortMergeJoinIterator(
57+
Iterator<Row> leftIter,
58+
Iterator<Row> rightIter,
59+
Function1<Row, Row> leftKeyGenerator,
60+
Function1<Row, Row> rightKeyGenerator,
61+
RowOrdering keyOrdering) {
62+
this.leftIter = leftIter;
63+
this.rightIter = rightIter;
64+
this.leftKeyGenerator = leftKeyGenerator;
65+
this.rightKeyGenerator = rightKeyGenerator;
66+
this.keyOrdering = keyOrdering;
67+
fetchLeft();
68+
fetchRight();
69+
}
70+
71+
private void fetchLeft() {
72+
if (leftIter.hasNext()) {
73+
leftElement = leftIter.next();
74+
leftKey = leftKeyGenerator.apply(leftElement);
75+
} else {
76+
leftElement = null;
77+
}
78+
}
79+
80+
private void fetchRight() {
81+
if (rightIter.hasNext()) {
82+
rightElement = rightIter.next();
83+
rightKey = rightKeyGenerator.apply(rightElement);
84+
} else {
85+
rightElement = null;
86+
}
87+
}
88+
89+
/**
90+
* Searches the right iterator for the next rows that have matches in left side, and store
91+
* them in a buffer.
92+
*
93+
* @return true if the search is successful, and false if the right iterator runs out of
94+
* tuples.
95+
*/
96+
private boolean nextMatchingPair() {
97+
if (!stop && rightElement != null) {
98+
// run both side to get the first match pair
99+
while (!stop && leftElement != null && rightElement != null) {
100+
final int comparing = keyOrdering.compare(leftKey, rightKey);
101+
// for inner join, we need to filter those null keys
102+
stop = comparing == 0 && !leftKey.anyNull();
103+
if (comparing > 0 || rightKey.anyNull()) {
104+
fetchRight();
105+
} else if (comparing < 0 || leftKey.anyNull()) {
106+
fetchLeft();
107+
}
108+
}
109+
rightMatches = new CompactBuffer<Row>(ROW_CLASS_TAG);
110+
if (stop) {
111+
stop = false;
112+
// Iterate the right side to buffer all rows that match.
113+
// As the records should be ordered, exit when we meet the first record that not match.
114+
while (!stop && rightElement != null) {
115+
rightMatches.$plus$eq(rightElement);
116+
fetchRight();
117+
stop = keyOrdering.compare(leftKey, rightKey) != 0;
118+
}
119+
if (rightMatches.size() > 0) {
120+
rightPosition = 0;
121+
matchKey = leftKey;
122+
}
123+
}
124+
}
125+
return rightMatches != null && rightMatches.size() > 0;
126+
}
127+
128+
@Override
129+
public boolean hasNext() {
130+
return nextMatchingPair();
131+
}
132+
133+
@Override
134+
public Row next() {
135+
if (hasNext()) {
136+
// We are using the buffered right rows and run down left iterator
137+
final Row joinedRow = joinRow.apply(leftElement, rightMatches.apply(rightPosition));
138+
rightPosition += 1;
139+
if (rightPosition >= rightMatches.size()) {
140+
rightPosition = 0;
141+
fetchLeft();
142+
if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) {
143+
stop = false;
144+
rightMatches = null;
145+
}
146+
}
147+
return joinedRow;
148+
} else {
149+
// No more results
150+
throw new NoSuchElementException();
151+
}
152+
}
153+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
/**
21+
* Shim to allow us to implement [[scala.Iterator]] in Java. Scala 2.11+ has an AbstractIterator
22+
* class for this, but that class is `private[scala]` in 2.10. We need to explicitly fix this to
23+
* `Row` in order to work around a spurious IntelliJ compiler error.
24+
*/
25+
private[spark] abstract class AbstractScalaRowIterator extends Iterator[Row]

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala

Lines changed: 6 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
package org.apache.spark.sql.execution.joins
1919

20-
import java.util.NoSuchElementException
21-
2220
import org.apache.spark.annotation.DeveloperApi
2321
import org.apache.spark.rdd.RDD
2422
import org.apache.spark.sql.catalyst.expressions._
2523
import org.apache.spark.sql.catalyst.plans.physical._
2624
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
27-
import org.apache.spark.util.collection.CompactBuffer
2825

2926
/**
3027
* :: DeveloperApi ::
@@ -63,105 +60,12 @@ case class SortMergeJoin(
6360
val rightResults = right.execute().map(_.copy())
6461

6562
leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
66-
new Iterator[InternalRow] {
67-
// Mutable per row objects.
68-
private[this] val joinRow = new JoinedRow5
69-
private[this] var leftElement: InternalRow = _
70-
private[this] var rightElement: InternalRow = _
71-
private[this] var leftKey: InternalRow = _
72-
private[this] var rightKey: InternalRow = _
73-
private[this] var rightMatches: CompactBuffer[InternalRow] = _
74-
private[this] var rightPosition: Int = -1
75-
private[this] var stop: Boolean = false
76-
private[this] var matchKey: InternalRow = _
77-
78-
// initialize iterator
79-
initialize()
80-
81-
override final def hasNext: Boolean = nextMatchingPair()
82-
83-
override final def next(): InternalRow = {
84-
if (hasNext) {
85-
// we are using the buffered right rows and run down left iterator
86-
val joinedRow = joinRow(leftElement, rightMatches(rightPosition))
87-
rightPosition += 1
88-
if (rightPosition >= rightMatches.size) {
89-
rightPosition = 0
90-
fetchLeft()
91-
if (leftElement == null || keyOrdering.compare(leftKey, matchKey) != 0) {
92-
stop = false
93-
rightMatches = null
94-
}
95-
}
96-
joinedRow
97-
} else {
98-
// no more result
99-
throw new NoSuchElementException
100-
}
101-
}
102-
103-
private def fetchLeft() = {
104-
if (leftIter.hasNext) {
105-
leftElement = leftIter.next()
106-
leftKey = leftKeyGenerator(leftElement)
107-
} else {
108-
leftElement = null
109-
}
110-
}
111-
112-
private def fetchRight() = {
113-
if (rightIter.hasNext) {
114-
rightElement = rightIter.next()
115-
rightKey = rightKeyGenerator(rightElement)
116-
} else {
117-
rightElement = null
118-
}
119-
}
120-
121-
private def initialize() = {
122-
fetchLeft()
123-
fetchRight()
124-
}
125-
126-
/**
127-
* Searches the right iterator for the next rows that have matches in left side, and store
128-
* them in a buffer.
129-
*
130-
* @return true if the search is successful, and false if the right iterator runs out of
131-
* tuples.
132-
*/
133-
private def nextMatchingPair(): Boolean = {
134-
if (!stop && rightElement != null) {
135-
// run both side to get the first match pair
136-
while (!stop && leftElement != null && rightElement != null) {
137-
val comparing = keyOrdering.compare(leftKey, rightKey)
138-
// for inner join, we need to filter those null keys
139-
stop = comparing == 0 && !leftKey.anyNull
140-
if (comparing > 0 || rightKey.anyNull) {
141-
fetchRight()
142-
} else if (comparing < 0 || leftKey.anyNull) {
143-
fetchLeft()
144-
}
145-
}
146-
rightMatches = new CompactBuffer[InternalRow]()
147-
if (stop) {
148-
stop = false
149-
// iterate the right side to buffer all rows that matches
150-
// as the records should be ordered, exit when we meet the first that not match
151-
while (!stop && rightElement != null) {
152-
rightMatches += rightElement
153-
fetchRight()
154-
stop = keyOrdering.compare(leftKey, rightKey) != 0
155-
}
156-
if (rightMatches.size > 0) {
157-
rightPosition = 0
158-
matchKey = leftKey
159-
}
160-
}
161-
}
162-
rightMatches != null && rightMatches.size > 0
163-
}
164-
}
63+
new SortMergeJoinIterator(
64+
leftIter,
65+
rightIter,
66+
leftKeyGenerator,
67+
rightKeyGenerator,
68+
keyOrdering);
16569
}
16670
}
16771
}

0 commit comments

Comments
 (0)