Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix the bug that BatchedWriteAheadLog.deaggregate doesn't restore the…
… position
  • Loading branch information
zsxwing committed Dec 2, 2015
commit 81d18120bff0a772a566ddfe19e439f309b5d5df
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private[streaming] class ReceivedBlockTracker(
writeAheadLogOption.foreach { writeAheadLog =>
logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
writeAheadLog.readAll().asScala.foreach { byteBuffer =>
logTrace("Recovering record " + byteBuffer)
logInfo("Recovering record " + byteBuffer)
Utils.deserialize[ReceivedBlockTrackerLogEvent](
JavaUtils.bufferToArray(byteBuffer), Thread.currentThread().getContextClassLoader) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth it to have a version of Utils.deserialize that takes a ByteBuffer, but not required for this change.

case BlockAdditionEvent(receivedBlockInfo) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -197,17 +198,10 @@ private[util] object BatchedWriteAheadLog {
*/
case class Record(data: ByteBuffer, time: Long, promise: Promise[WriteAheadLogRecordHandle])

/** Copies the byte array of a ByteBuffer. */
private def getByteArray(buffer: ByteBuffer): Array[Byte] = {
val byteArray = new Array[Byte](buffer.remaining())
buffer.get(byteArray)
byteArray
}

/** Aggregate multiple serialized ReceivedBlockTrackerLogEvents in a single ByteBuffer. */
def aggregate(records: Seq[Record]): ByteBuffer = {
ByteBuffer.wrap(Utils.serialize[Array[Array[Byte]]](
records.map(record => getByteArray(record.data)).toArray))
records.map(record => JavaUtils.bufferToArray(record.data)).toArray))
}

/**
Expand All @@ -216,10 +210,13 @@ private[util] object BatchedWriteAheadLog {
* method therefore needs to be backwards compatible.
*/
def deaggregate(buffer: ByteBuffer): Array[ByteBuffer] = {
val prevPosition = buffer.position()
try {
Utils.deserialize[Array[Array[Byte]]](getByteArray(buffer)).map(ByteBuffer.wrap)
Utils.deserialize[Array[Array[Byte]]](JavaUtils.bufferToArray(buffer)).map(ByteBuffer.wrap)
} catch {
case _: ClassCastException => // users may restart a stream with batching enabled
// Restore `position` so that the user can read `buffer` later
buffer.position(prevPosition)
Array(buffer)
}
}
Expand Down