Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,6 @@ trait HashJoin {
s"BroadcastHashJoin should not take $x as the JoinType")
}

// At the end of the task, we update the avg hash probe.
TaskContext.get().addTaskCompletionListener[Unit](_ =>
avgHashProbe.set(hashed.getAverageProbesPerLookup))

val resultProj = createResultProjection
joinedIter.map { r =>
numOutputRows += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation {
* Release any used resources.
*/
def close(): Unit

/**
* Returns the average number of probes per key lookup.
*/
def getAverageProbesPerLookup: Double
}

private[execution] object HashedRelation {
Expand Down Expand Up @@ -279,8 +274,6 @@ private[joins] class UnsafeHashedRelation(
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
read(() => in.readInt(), () => in.readLong(), in.readBytes)
}

override def getAverageProbesPerLookup: Double = binaryMap.getAverageProbesPerLookup
}

private[joins] object UnsafeHashedRelation {
Expand Down Expand Up @@ -395,10 +388,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
// The number of unique keys.
private var numKeys = 0L

// Tracking average number of probes per key lookup.
private var numKeyLookups = 0L
private var numProbes = 0L

// needed by serializer
def this() = {
this(
Expand Down Expand Up @@ -483,8 +472,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
*/
def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
if (isDense) {
numKeyLookups += 1
numProbes += 1
if (key >= minKey && key <= maxKey) {
val value = array((key - minKey).toInt)
if (value > 0) {
Expand All @@ -493,14 +480,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
} else {
var pos = firstSlot(key)
numKeyLookups += 1
numProbes += 1
while (array(pos + 1) != 0) {
if (array(pos) == key) {
return getRow(array(pos + 1), resultRow)
}
pos = nextSlot(pos)
numProbes += 1
}
}
null
Expand Down Expand Up @@ -528,8 +512,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
*/
def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
if (isDense) {
numKeyLookups += 1
numProbes += 1
if (key >= minKey && key <= maxKey) {
val value = array((key - minKey).toInt)
if (value > 0) {
Expand All @@ -538,14 +520,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
} else {
var pos = firstSlot(key)
numKeyLookups += 1
numProbes += 1
while (array(pos + 1) != 0) {
if (array(pos) == key) {
return valueIter(array(pos + 1), resultRow)
}
pos = nextSlot(pos)
numProbes += 1
}
}
null
Expand Down Expand Up @@ -585,11 +564,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
private def updateIndex(key: Long, address: Long): Unit = {
var pos = firstSlot(key)
assert(numKeys < array.length / 2)
numKeyLookups += 1
numProbes += 1
while (array(pos) != key && array(pos + 1) != 0) {
pos = nextSlot(pos)
numProbes += 1
}
if (array(pos + 1) == 0) {
// this is the first value for this key, put the address in array.
Expand Down Expand Up @@ -721,8 +697,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
writeLong(maxKey)
writeLong(numKeys)
writeLong(numValues)
writeLong(numKeyLookups)
writeLong(numProbes)

writeLong(array.length)
writeLongArray(writeBuffer, array, array.length)
Expand Down Expand Up @@ -764,8 +738,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
maxKey = readLong()
numKeys = readLong()
numValues = readLong()
numKeyLookups = readLong()
numProbes = readLong()

val length = readLong().toInt
mask = length - 2
Expand All @@ -783,11 +755,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
override def read(kryo: Kryo, in: Input): Unit = {
read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
}

/**
* Returns the average number of probes per key lookup.
*/
def getAverageProbesPerLookup: Double = numProbes.toDouble / numKeyLookups
}

private[joins] class LongHashedRelation(
Expand Down Expand Up @@ -839,8 +806,6 @@ private[joins] class LongHashedRelation(
resultRow = new UnsafeRow(nFields)
map = in.readObject().asInstanceOf[LongToUnsafeRowMap]
}

override def getAverageProbesPerLookup: Double = map.getAverageProbesPerLookup
}

/**
Expand Down