@@ -121,94 +121,34 @@ private[spark] class CompressedMapStatus(
121121 }
122122}
123123
124-
125-
126124/**
127125 * A [[MapStatus ]] implementation that only stores the average size of non-empty blocks,
128- * plus a hashset for tracking which blocks are not empty.
129- * In this case, no-empty blocks are very sparse,
126+ * plus a hashset for tracking which blocks are empty(dense) / non-empty(sparse).
130127 * using a HashSet[Int] can save more memory usage than BitSet
131128 *
132129 * @param loc location where the task is being executed
133130 * @param numNonEmptyBlocks the number of non-empty blocks
134- * @param nonEmptyBlocks a hashset tracking which blocks are not empty
135- * @param avgSize average size of the non-empty blocks
136- */
137- private [spark] class MapStatusTrackingNoEmptyBlocks private (
138- private [this ] var loc : BlockManagerId ,
139- private [this ] var numNonEmptyBlocks : Int ,
140- private [this ] var nonEmptyBlocks : mutable.HashSet [Int ],
141- private [this ] var avgSize : Long )
142- extends MapStatus with Externalizable {
143-
144- // loc could be null when the default constructor is called during deserialization
145- require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0 ,
146- " Average size can only be zero for map stages that produced no output" )
147-
148- protected def this () = this (null , - 1 , null , - 1 ) // For deserialization only
149-
150- override def location : BlockManagerId = loc
151-
152- override def getSizeForBlock (reduceId : Int ): Long = {
153- if (nonEmptyBlocks.contains(reduceId)) {
154- avgSize
155- } else {
156- 0
157- }
158- }
159-
160- override def writeExternal (out : ObjectOutput ): Unit = Utils .tryOrIOException {
161- loc.writeExternal(out)
162- out.writeObject(nonEmptyBlocks)
163- out.writeLong(avgSize)
164- }
165-
166- override def readExternal (in : ObjectInput ): Unit = Utils .tryOrIOException {
167- loc = BlockManagerId (in)
168- nonEmptyBlocks = new mutable.HashSet [Int ]
169- nonEmptyBlocks = in.readObject().asInstanceOf [mutable.HashSet [Int ]]
170- avgSize = in.readLong()
171- }
172- }
173-
174- private [spark] object MapStatusTrackingNoEmptyBlocks {
175- def apply (
176- loc : BlockManagerId ,
177- numNonEmptyBlocks : Int ,
178- nonEmptyBlocks : mutable.HashSet [Int ],
179- avgSize : Long ): MapStatusTrackingNoEmptyBlocks = {
180- new MapStatusTrackingNoEmptyBlocks (loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize )
181- }
182- }
183-
184- /**
185- * A [[MapStatus ]] implementation that only stores the average size of non-empty blocks,
186- * plus a hashset for tracking which blocks are empty.
187- * In this case, no-empty blocks are very dense,
188- * using a HashSet[Int] can save more memory usage than BitSet
189- *
190- * @param loc location where the task is being executed
191- * @param numNonEmptyBlocks the number of non-empty blocks
192- * @param emptyBlocksHashSet a bitmap tracking which blocks are empty
131+ * @param markedBlocks a HashSet tracking which blocks are empty(dense) / non-empty(sparse)
193132 * @param avgSize average size of the non-empty blocks
194133 */
195134private [spark] class MapStatusTrackingEmptyBlocks private (
196135 private [this ] var loc : BlockManagerId ,
197136 private [this ] var numNonEmptyBlocks : Int ,
198- private [this ] var emptyBlocksHashSet : mutable.HashSet [Int ],
199- private [this ] var avgSize : Long )
137+ private [this ] var markedBlocks : mutable.HashSet [Int ],
138+ private [this ] var avgSize : Long ,
139+ private [this ] var isSparse : Boolean )
200140 extends MapStatus with Externalizable {
201141
202142 // loc could be null when the default constructor is called during deserialization
203143 require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0 ,
204144 " Average size can only be zero for map stages that produced no output" )
205145
206- protected def this () = this (null , - 1 , null , - 1 ) // For deserialization only
146+ protected def this () = this (null , - 1 , null , - 1 , false ) // For deserialization only
207147
208148 override def location : BlockManagerId = loc
209149
210150 override def getSizeForBlock (reduceId : Int ): Long = {
211- if (emptyBlocksHashSet .contains(reduceId)) {
151+ if (isSparse ^ markedBlocks .contains(reduceId)) {
212152 0
213153 } else {
214154 avgSize
@@ -217,14 +157,14 @@ private[spark] class MapStatusTrackingEmptyBlocks private (
217157
218158 override def writeExternal (out : ObjectOutput ): Unit = Utils .tryOrIOException {
219159 loc.writeExternal(out)
220- out.writeObject(emptyBlocksHashSet )
160+ out.writeObject(markedBlocks )
221161 out.writeLong(avgSize)
222162 }
223163
224164 override def readExternal (in : ObjectInput ): Unit = Utils .tryOrIOException {
225165 loc = BlockManagerId (in)
226- emptyBlocksHashSet = new mutable.HashSet [Int ]
227- emptyBlocksHashSet = in.readObject().asInstanceOf [mutable.HashSet [Int ]]
166+ markedBlocks = new mutable.HashSet [Int ]
167+ markedBlocks = in.readObject().asInstanceOf [mutable.HashSet [Int ]]
228168 avgSize = in.readLong()
229169 }
230170}
@@ -233,9 +173,10 @@ private[spark] object MapStatusTrackingEmptyBlocks {
233173 def apply (
234174 loc : BlockManagerId ,
235175 numNonEmptyBlocks : Int ,
236- emptyBlocksHashSet : mutable.HashSet [Int ],
237- avgSize : Long ): MapStatusTrackingEmptyBlocks = {
238- new MapStatusTrackingEmptyBlocks (loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize )
176+ markedBlocks : mutable.HashSet [Int ],
177+ avgSize : Long ,
178+ flag : Boolean ): MapStatusTrackingEmptyBlocks = {
179+ new MapStatusTrackingEmptyBlocks (loc, numNonEmptyBlocks, markedBlocks, avgSize, flag)
239180 }
240181}
241182
@@ -261,7 +202,7 @@ private[spark] class HighlyCompressedMapStatus private (
261202 " Average size can only be zero for map stages that produced no output" )
262203
263204 protected def this () = this (null , - 1 , null , - 1 ) // For deserialization only
264-
205+
265206 override def location : BlockManagerId = loc
266207
267208 override def getSizeForBlock (reduceId : Int ): Long = {
@@ -317,11 +258,13 @@ private[spark] object HighlyCompressedMapStatus {
317258 } else {
318259 0
319260 }
261+ var isSparse = true
320262 if (numNonEmptyBlocks * 32 < totalNumBlocks){
321- MapStatusTrackingNoEmptyBlocks (loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize )
263+ MapStatusTrackingEmptyBlocks (loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse )
322264 }
323265 else if ((totalNumBlocks - numNonEmptyBlocks) * 32 < totalNumBlocks){
324- MapStatusTrackingEmptyBlocks (loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize)
266+ isSparse = false
267+ MapStatusTrackingEmptyBlocks (loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize, isSparse)
325268 }
326269 else {
327270 new HighlyCompressedMapStatus (loc, numNonEmptyBlocks, emptyBlocks, avgSize)
0 commit comments