1717
1818package org .apache .spark .sql .execution
1919
20+ import java .util .{HashMap => JavaHashMap }
21+
2022import scala .collection .mutable .{ArrayBuffer , BitSet }
2123import scala .concurrent .ExecutionContext .Implicits .global
2224import scala .concurrent ._
@@ -136,14 +138,6 @@ trait HashJoin {
136138 }
137139}
138140
139- /**
140- * Constant Value for Binary Join Node
141- */
142- object HashOuterJoin {
143- val DUMMY_LIST = Seq [Row ](null )
144- val EMPTY_LIST = Seq [Row ]()
145- }
146-
147141/**
148142 * :: DeveloperApi ::
149143 * Performs a hash based outer join for two child relations by shuffling the data using
@@ -181,6 +175,9 @@ case class HashOuterJoin(
181175 }
182176 }
183177
178+ @ transient private [this ] lazy val DUMMY_LIST = Seq [Row ](null )
179+ @ transient private [this ] lazy val EMPTY_LIST = Seq .empty[Row ]
180+
184181 // TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
185182 // iterator for performance purpose.
186183
@@ -199,8 +196,8 @@ case class HashOuterJoin(
199196 joinedRow.copy
200197 } else {
201198 Nil
202- }) ++ HashOuterJoin . DUMMY_LIST .filter(_ => ! matched).map( _ => {
203- // HashOuterJoin. DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
199+ }) ++ DUMMY_LIST .filter(_ => ! matched).map( _ => {
200+ // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
204201 // as we don't know whether we need to append it until finish iterating all of the
205202 // records in right side.
206203 // If we didn't get any proper row, then append a single row with empty right
@@ -224,8 +221,8 @@ case class HashOuterJoin(
224221 joinedRow.copy
225222 } else {
226223 Nil
227- }) ++ HashOuterJoin . DUMMY_LIST .filter(_ => ! matched).map( _ => {
228- // HashOuterJoin. DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
224+ }) ++ DUMMY_LIST .filter(_ => ! matched).map( _ => {
225+ // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
229226 // as we don't know whether we need to append it until finish iterating all of the
230227 // records in left side.
231228 // If we didn't get any proper row, then append a single row with empty left.
@@ -259,10 +256,10 @@ case class HashOuterJoin(
259256 rightMatchedSet.add(idx)
260257 joinedRow.copy
261258 }
262- } ++ HashOuterJoin . DUMMY_LIST .filter(_ => ! matched).map( _ => {
259+ } ++ DUMMY_LIST .filter(_ => ! matched).map( _ => {
263260 // 2. For those unmatched records in left, append additional records with empty right.
264261
265- // HashOuterJoin. DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
262+ // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
266263 // as we don't know whether we need to append it until finish iterating all
267264 // of the records in right side.
268265 // If we didn't get any proper row, then append a single row with empty right.
@@ -287,18 +284,22 @@ case class HashOuterJoin(
287284 }
288285
289286 private [this ] def buildHashTable (
290- iter : Iterator [Row ], keyGenerator : Projection ): Map [Row , ArrayBuffer [Row ]] = {
291- // TODO: Use Spark's HashMap implementation.
292- val hashTable = scala.collection.mutable.Map [Row , ArrayBuffer [Row ]]()
287+ iter : Iterator [Row ], keyGenerator : Projection ): JavaHashMap [Row , ArrayBuffer [Row ]] = {
288+ val hashTable = new JavaHashMap [Row , ArrayBuffer [Row ]]()
293289 while (iter.hasNext) {
294290 val currentRow = iter.next()
295291 val rowKey = keyGenerator(currentRow)
296292
297- val existingMatchList = hashTable.getOrElseUpdate(rowKey, {new ArrayBuffer [Row ]()})
293+ var existingMatchList = hashTable.get(rowKey)
294+ if (existingMatchList == null ) {
295+ existingMatchList = new ArrayBuffer [Row ]()
296+ hashTable.put(rowKey, existingMatchList)
297+ }
298+
298299 existingMatchList += currentRow.copy()
299300 }
300-
301- hashTable.toMap[ Row , ArrayBuffer [ Row ]]
301+
302+ hashTable
302303 }
303304
304305 def execute () = {
@@ -309,21 +310,22 @@ case class HashOuterJoin(
309310 // Build HashMap for current partition in right relation
310311 val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
311312
313+ import scala .collection .JavaConversions ._
312314 val boundCondition =
313315 condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row : Row ) => true )
314316 joinType match {
315317 case LeftOuter => leftHashTable.keysIterator.flatMap { key =>
316- leftOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin . EMPTY_LIST ),
317- rightHashTable.getOrElse(key, HashOuterJoin . EMPTY_LIST ))
318+ leftOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST ),
319+ rightHashTable.getOrElse(key, EMPTY_LIST ))
318320 }
319321 case RightOuter => rightHashTable.keysIterator.flatMap { key =>
320- rightOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin . EMPTY_LIST ),
321- rightHashTable.getOrElse(key, HashOuterJoin . EMPTY_LIST ))
322+ rightOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST ),
323+ rightHashTable.getOrElse(key, EMPTY_LIST ))
322324 }
323325 case FullOuter => (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
324326 fullOuterIterator(key,
325- leftHashTable.getOrElse(key, HashOuterJoin . EMPTY_LIST ),
326- rightHashTable.getOrElse(key, HashOuterJoin . EMPTY_LIST ))
327+ leftHashTable.getOrElse(key, EMPTY_LIST ),
328+ rightHashTable.getOrElse(key, EMPTY_LIST ))
327329 }
328330 case x => throw new Exception (s " HashOuterJoin should not take $x as the JoinType " )
329331 }
0 commit comments