Skip to content
Closed
Prev Previous commit
Next Next commit
address feedback
  • Loading branch information
dbtsai committed Oct 11, 2019
commit a60135614364fdbe0a2953a47b908176e740a8db
32 changes: 17 additions & 15 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

package org.apache.spark

import java.io._
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock

import com.github.luben.zstd.ZstdInputStream
import com.github.luben.zstd.ZstdOutputStream
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.github.luben.zstd.ZstdInputStream
import com.github.luben.zstd.ZstdOutputStream
import org.apache.commons.io.output.{ByteArrayOutputStream => ApacheByteArrayOutputStream}

import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -812,13 +814,14 @@ private[spark] object MapOutputTracker extends Logging {
// generally be pretty compressible because many map outputs will be on the same hostname.
def serializeMapStatuses(statuses: Array[MapStatus], broadcastManager: BroadcastManager,
isLocal: Boolean, minBroadcastSize: Int): (Array[Byte], Broadcast[Array[Byte]]) = {
import scala.language.reflectiveCalls
val out = new ByteArrayOutputStream(4096) {
// exposing `buf` directly to avoid copy
def getBuf: Array[Byte] = buf
}
val objOut = new ObjectOutputStream(out)
// Using `org.apache.commons.io.output.ByteArrayOutputStream` instead of the standard one
// This implementation doesn't reallocate the whole memory block but allocates
// additional buffers. This way no buffers need to be garbage collected and
// the contents don't have to be copied to the new buffer.
val out = new ApacheByteArrayOutputStream()
val compressedOut = new ApacheByteArrayOutputStream()

val objOut = new ObjectOutputStream(out)
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
Expand All @@ -829,16 +832,15 @@ private[spark] object MapOutputTracker extends Logging {
}

val arr: Array[Byte] = {
val compressedOut = new ByteArrayOutputStream(4096)
val zos = new ZstdOutputStream(compressedOut)
Utils.tryWithSafeFinally {
compressedOut.write(DIRECT)
zos.write(out.getBuf, 0, out.size())
// `out.writeTo(zos)` will write the uncompressed data from `out` to `zos`
// without copying to avoid unnecessary allocation and copy of byte[].
out.writeTo(zos)
} {
zos.close()
}
// We don't want to use the internal `buf` of `compressedOut` as it can be larger than
// the actual used size since it's a buffer kept growing.
compressedOut.toByteArray
}
if (arr.length >= minBroadcastSize) {
Expand All @@ -854,11 +856,11 @@ private[spark] object MapOutputTracker extends Logging {
oos.close()
}
val outArr = {
val compressedOut = new ByteArrayOutputStream(4096)
compressedOut.reset()
val zos = new ZstdOutputStream(compressedOut)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @dbtsai , I am back-porting this into our internal repo. Looks like this compression is unnecessary since arr is already compressed by zstd. Compress again with already compressed byte[] is a waste of cpu time. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actually value of the data (which is already compressed) will not be in the serialized form of out.writeTo(zos) as it's transient. Here, we are just serializing the reference to the actual data, and the actual data will be broadcast through TorrentBroadcast. See the next log, "Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length for your real data. The broadcast one is very small.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your clarification. It's indeed not including the compressed data.

Utils.tryWithSafeFinally {
compressedOut.write(BROADCAST)
zos.write(out.getBuf, 0, out.size())
out.writeTo(zos)
} {
zos.close()
}
Expand Down