Skip to content

Commit 506195a

Browse files
jinxinglycplus
authored andcommitted
[SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold.
## What changes were proposed in this pull request? Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is used to store size of blocks. in HighlyCompressedMapStatus, only average size is stored for non empty blocks. Which is not good for memory control when we shuffle blocks. It makes sense to store the accurate size of block when it's above threshold. ## How was this patch tested? Added test in MapStatusSuite. Author: jinxing <[email protected]> Closes apache#18031 from jinxing64/SPARK-20801.
1 parent fb718ec commit 506195a

File tree

4 files changed

+90
-10
lines changed

4 files changed

+90
-10
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,4 +278,13 @@ package object config {
278278
"spark.io.compression.codec.")
279279
.booleanConf
280280
.createWithDefault(false)
281+
282+
private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
283+
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
284+
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
285+
"record the size accurately if it's above this config. This helps to prevent OOM by " +
286+
"avoiding underestimating shuffle block size when fetch shuffle blocks.")
287+
.bytesConf(ByteUnit.BYTE)
288+
.createWithDefault(100 * 1024 * 1024)
289+
281290
}

core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@ package org.apache.spark.scheduler
1919

2020
import java.io.{Externalizable, ObjectInput, ObjectOutput}
2121

22+
import scala.collection.mutable
23+
import scala.collection.mutable.ArrayBuffer
24+
2225
import org.roaringbitmap.RoaringBitmap
2326

27+
import org.apache.spark.SparkEnv
28+
import org.apache.spark.internal.config
2429
import org.apache.spark.storage.BlockManagerId
2530
import org.apache.spark.util.Utils
2631

@@ -121,48 +126,68 @@ private[spark] class CompressedMapStatus(
121126
}
122127

123128
/**
124-
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
129+
* A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger
130+
* than spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty blocks,
125131
* plus a bitmap for tracking which blocks are empty.
126132
*
127133
* @param loc location where the task is being executed
128134
* @param numNonEmptyBlocks the number of non-empty blocks
129135
* @param emptyBlocks a bitmap tracking which blocks are empty
130-
* @param avgSize average size of the non-empty blocks
136+
* @param avgSize average size of the non-empty and non-huge blocks
137+
* @param hugeBlockSizes sizes of huge blocks by their reduceId.
131138
*/
132139
private[spark] class HighlyCompressedMapStatus private (
133140
private[this] var loc: BlockManagerId,
134141
private[this] var numNonEmptyBlocks: Int,
135142
private[this] var emptyBlocks: RoaringBitmap,
136-
private[this] var avgSize: Long)
143+
private[this] var avgSize: Long,
144+
@transient private var hugeBlockSizes: Map[Int, Byte])
137145
extends MapStatus with Externalizable {
138146

139147
// loc could be null when the default constructor is called during deserialization
140-
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
148+
require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
141149
"Average size can only be zero for map stages that produced no output")
142150

143-
protected def this() = this(null, -1, null, -1) // For deserialization only
151+
protected def this() = this(null, -1, null, -1, null) // For deserialization only
144152

145153
override def location: BlockManagerId = loc
146154

147155
override def getSizeForBlock(reduceId: Int): Long = {
156+
assert(hugeBlockSizes != null)
148157
if (emptyBlocks.contains(reduceId)) {
149158
0
150159
} else {
151-
avgSize
160+
hugeBlockSizes.get(reduceId) match {
161+
case Some(size) => MapStatus.decompressSize(size)
162+
case None => avgSize
163+
}
152164
}
153165
}
154166

155167
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
156168
loc.writeExternal(out)
157169
emptyBlocks.writeExternal(out)
158170
out.writeLong(avgSize)
171+
out.writeInt(hugeBlockSizes.size)
172+
hugeBlockSizes.foreach { kv =>
173+
out.writeInt(kv._1)
174+
out.writeByte(kv._2)
175+
}
159176
}
160177

161178
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
162179
loc = BlockManagerId(in)
163180
emptyBlocks = new RoaringBitmap()
164181
emptyBlocks.readExternal(in)
165182
avgSize = in.readLong()
183+
val count = in.readInt()
184+
val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
185+
(0 until count).foreach { _ =>
186+
val block = in.readInt()
187+
val size = in.readByte()
188+
hugeBlockSizesArray += Tuple2(block, size)
189+
}
190+
hugeBlockSizes = hugeBlockSizesArray.toMap
166191
}
167192
}
168193

@@ -178,11 +203,21 @@ private[spark] object HighlyCompressedMapStatus {
178203
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
179204
val emptyBlocks = new RoaringBitmap()
180205
val totalNumBlocks = uncompressedSizes.length
206+
val threshold = Option(SparkEnv.get)
207+
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
208+
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
209+
val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]()
181210
while (i < totalNumBlocks) {
182-
var size = uncompressedSizes(i)
211+
val size = uncompressedSizes(i)
183212
if (size > 0) {
184213
numNonEmptyBlocks += 1
185-
totalSize += size
214+
// Huge blocks are not included in the calculation for average size, thus size for smaller
215+
// blocks is more accurate.
216+
if (size < threshold) {
217+
totalSize += size
218+
} else {
219+
hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))
220+
}
186221
} else {
187222
emptyBlocks.add(i)
188223
}
@@ -195,6 +230,7 @@ private[spark] object HighlyCompressedMapStatus {
195230
}
196231
emptyBlocks.trim()
197232
emptyBlocks.runOptimize()
198-
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
233+
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
234+
hugeBlockSizesArray.toMap)
199235
}
200236
}

core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
21+
2022
import scala.util.Random
2123

24+
import org.mockito.Mockito._
2225
import org.roaringbitmap.RoaringBitmap
2326

24-
import org.apache.spark.{SparkConf, SparkFunSuite}
27+
import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
28+
import org.apache.spark.internal.config
2529
import org.apache.spark.serializer.JavaSerializer
2630
import org.apache.spark.storage.BlockManagerId
2731

@@ -128,4 +132,26 @@ class MapStatusSuite extends SparkFunSuite {
128132
assert(size1 === size2)
129133
assert(!success)
130134
}
135+
136+
test("Blocks which are bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be " +
137+
"underestimated.") {
138+
val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "1000")
139+
val env = mock(classOf[SparkEnv])
140+
doReturn(conf).when(env).conf
141+
SparkEnv.set(env)
142+
// Value of element in sizes is equal to the corresponding index.
143+
val sizes = (0L to 2000L).toArray
144+
val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
145+
val arrayStream = new ByteArrayOutputStream(102400)
146+
val objectOutputStream = new ObjectOutputStream(arrayStream)
147+
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
148+
objectOutputStream.writeObject(status1)
149+
objectOutputStream.flush()
150+
val array = arrayStream.toByteArray
151+
val objectInput = new ObjectInputStream(new ByteArrayInputStream(array))
152+
val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus]
153+
(1001 to 2000).foreach {
154+
case part => assert(status2.getSizeForBlock(part) >= sizes(part))
155+
}
156+
}
131157
}

docs/configuration.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,15 @@ Apart from these, the following properties are also available, and may be useful
612612
<code>spark.io.compression.codec</code>.
613613
</td>
614614
</tr>
615+
<tr>
616+
<td><code>spark.shuffle.accurateBlockThreshold</code></td>
617+
<td>100 * 1024 * 1024</td>
618+
<td>
619+
When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the
620+
size accurately if it's above this config. This helps to prevent OOM by avoiding
621+
underestimating shuffle block size when fetch shuffle blocks.
622+
</td>
623+
</tr>
615624
<tr>
616625
<td><code>spark.io.encryption.enabled</code></td>
617626
<td>false</td>

0 commit comments

Comments
 (0)