Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use ByteBuffer in the WAL
  • Loading branch information
harishreedharan committed Oct 3, 2014
commit 073d1f88005621837f34b7fa786c5311fc5872e8
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
package org.apache.spark.streaming.storage

import java.io.Closeable
import java.nio.ByteBuffer

private[streaming] class WriteAheadLogRandomReader(path: String) extends Closeable {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing to add in the parameter is hadoop configuraiton. I will have access to configuration, and pass it on here. Keeps the code generic across multiple HDFS compatible systems.


private val instream = HdfsUtils.getInputStream(path)
private var closed = false

def read(segment: FileSegment): Array[Byte] = synchronized {
def read(segment: FileSegment): ByteBuffer = synchronized {
assertOpen()
instream.seek(segment.offset)
val nextLength = instream.readInt()
HdfsUtils.checkState(nextLength == segment.length,
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
buffer
ByteBuffer.wrap(buffer)
}

override def close(): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package org.apache.spark.streaming.storage

import java.io.{EOFException, Closeable}
import java.nio.ByteBuffer

private[streaming] class WriteAheadLogReader(path: String)
extends Iterator[Array[Byte]] with Closeable {
extends Iterator[ByteBuffer] with Closeable {

private val instream = HdfsUtils.getInputStream(path)
private var closed = false
private var nextItem: Option[Array[Byte]] = None
private var nextItem: Option[ByteBuffer] = None

override def hasNext: Boolean = synchronized {
assertOpen()
Expand All @@ -34,7 +35,7 @@ private[streaming] class WriteAheadLogReader(path: String)
val length = instream.readInt()
val buffer = new Array[Byte](length)
instream.readFully(buffer)
nextItem = Some(buffer)
nextItem = Some(ByteBuffer.wrap(buffer))
true
} catch {
case e: EOFException => false
Expand All @@ -43,7 +44,7 @@ private[streaming] class WriteAheadLogReader(path: String)
}
}

override def next(): Array[Byte] = synchronized {
override def next(): ByteBuffer = synchronized {
// TODO: Possible error case where there are not enough bytes in the stream
// TODO: How to handle that?
val data = nextItem.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,40 @@
package org.apache.spark.streaming.storage

import java.io.Closeable
import java.nio.ByteBuffer

import org.apache.hadoop.fs.FSDataOutputStream

private[streaming] class WriteAheadLogWriter(path: String) extends Closeable {
private val stream = HdfsUtils.getOutputStream(path)
private var nextOffset = stream.getPos
private var closed = false
private val hflushMethod = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would wrap this method in hflush stuff as a function from cleanliness, so that we dont have to do hflushMethod.foreach , just call flush()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrapped the call in a different method. I don't want to do a look up every time we call hflush. Doing that on initialization saves a bunch of reflection lookups. For the sake of keeping it a val, I wrapped the lookup in a different method, which is called at the time of initialization

try {
Some(classOf[FSDataOutputStream].getMethod("hflush", new Array[Class[Object]](0): _*))
} catch {
case e: Exception => None
}
}

// Data is always written as:
// - Length - Long
// - Data - of length = Length
def write(data: Array[Byte]): FileSegment = synchronized {
def write(data: ByteBuffer): FileSegment = synchronized {
assertOpen()
val segment = new FileSegment(path, nextOffset, data.length)
stream.writeInt(data.length)
stream.write(data)
stream.hflush()
val lengthToWrite = data.remaining()
val segment = new FileSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
} else {
// If the buffer is not backed by an array we need to copy the data to an array
data.rewind() // Rewind to ensure all data in the buffer is retrieved
val dataArray = new Array[Byte](lengthToWrite)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont want to copy the data multiple times! Why not just write off the data one byte at a time. That should be faster than doing two passes on the data, and allocating memory.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      while(byteBuffer.hasRemaining) {
        stream.writeByte(byteBuffer.get())
      }

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, never mind. Spoke to my colleagues and writing byte at a time is slower than doing extra memcopy. So its fine as is.

data.get(dataArray)
stream.write(dataArray)
}
hflushMethod.foreach(_.invoke(stream))
nextOffset = stream.getPos
segment
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.streaming.storage

import java.io.{RandomAccessFile, File}
import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
Expand All @@ -33,39 +34,40 @@ object TestUtils {
// We don't want to be using the WAL writer to test the reader - it will be painful to figure
// out where the bug is. Instead generate the file by hand and see if the WAL reader can
// handle it.
def writeData(count: Int, file: File): ArrayBuffer[(Array[Byte], Long)] = {
val writtenData = new ArrayBuffer[(Array[Byte], Long)]()
def writeData(count: Int, file: File): ArrayBuffer[(ByteBuffer, Long)] = {
val writtenData = new ArrayBuffer[(ByteBuffer, Long)]()
val writer = new RandomAccessFile(file, "rw")
var i = 0
while (i < count) {
val data = generateRandomData()
writtenData += ((data, writer.getFilePointer))
writer.writeInt(data.length)
writer.write(data)
data.rewind()
writer.writeInt(data.remaining())
writer.write(data.array())
i += 1
}
writer.close()
writtenData
}

def readData(segments: Seq[FileSegment], file: File): Seq[Array[Byte]] = {
def readData(segments: Seq[FileSegment], file: File): Seq[ByteBuffer] = {
val reader = new RandomAccessFile(file, "r")
segments.map { x =>
reader.seek(x.offset)
val data = new Array[Byte](x.length)
reader.readInt()
reader.readFully(data)
data
ByteBuffer.wrap(data)
}
}

def generateRandomData(): Array[Byte] = {
def generateRandomData(): ByteBuffer = {
val data = new Array[Byte](random.nextInt(50))
random.nextBytes(data)
data
ByteBuffer.wrap(data)
}

def writeUsingWriter(file: File, input: Seq[Array[Byte]]): Seq[FileSegment] = {
def writeUsingWriter(file: File, input: Seq[ByteBuffer]): Seq[FileSegment] = {
val writer = new WriteAheadLogWriter(file.toString)
val segments = input.map(writer.write)
writer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ class WriteAheadLogRandomReaderSuite extends TestSuiteBase {
file.deleteOnExit()
val writtenData = TestUtils.writeData(50, file)
val reader = new WriteAheadLogRandomReader("file:///" + file.toString)
var nextOffset = 0l
writtenData.foreach{
writtenData.foreach {
x =>
assert(x._1 === reader.read(new FileSegment(file.toString, x._2, x._1.length)))
nextOffset += (x._2 + 4)
val length = x._1.remaining()
assert(x._1 === reader.read(new FileSegment(file.toString, x._2, length)))
}
reader.close()
}
Expand Down