Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
package org.apache.spark.streaming.storage

import java.io.Closeable
import java.nio.ByteBuffer

private[streaming] class WriteAheadLogRandomReader(path: String) extends Closeable {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing to add in the parameter is hadoop configuraiton. I will have access to configuration, and pass it on here. Keeps the code generic across multiple HDFS compatible systems.


private val instream = HdfsUtils.getInputStream(path)
private var closed = false

def read(segment: FileSegment): Array[Byte] = synchronized {
def read(segment: FileSegment): ByteBuffer = synchronized {
assertOpen()
instream.seek(segment.offset)
val nextLength = instream.readInt()
HdfsUtils.checkState(nextLength == segment.length,
"Expected message length to be " + segment.length + ", " + "but was " + nextLength)
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
buffer
ByteBuffer.wrap(buffer)
}

override def close(): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,35 @@
*/
package org.apache.spark.streaming.storage

import java.io.Closeable
import java.io.{EOFException, Closeable}
import java.nio.ByteBuffer

private[streaming] class WriteAheadLogReader(path: String)
extends Iterator[Array[Byte]] with Closeable {
extends Iterator[ByteBuffer] with Closeable {

private val instream = HdfsUtils.getInputStream(path)
private var closed = false
private var nextItem: Option[Array[Byte]] = None
private var nextItem: Option[ByteBuffer] = None

override def hasNext: Boolean = synchronized {
assertOpen()
if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
true
} else {
val available = instream.available()
if (available < 4) { // Length of next block (which is an Int = 4 bytes) of data is unavailable!
false
try {
val length = instream.readInt()
val buffer = new Array[Byte](length)
instream.readFully(buffer)
nextItem = Some(ByteBuffer.wrap(buffer))
true
} catch {
case e: EOFException => false
case e: Exception => throw e
}
val length = instream.readInt()
if (instream.available() < length) {
false
}
val buffer = new Array[Byte](length)
instream.readFully(buffer)
nextItem = Some(buffer)
true
}
}

override def next(): Array[Byte] = synchronized {
override def next(): ByteBuffer = synchronized {
// TODO: Possible error case where there are not enough bytes in the stream
// TODO: How to handle that?
val data = nextItem.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,40 @@
package org.apache.spark.streaming.storage

import java.io.Closeable
import java.nio.ByteBuffer

import org.apache.hadoop.fs.FSDataOutputStream

private[streaming] class WriteAheadLogWriter(path: String) extends Closeable {
private val stream = HdfsUtils.getOutputStream(path)
private var nextOffset = stream.getPos
private var closed = false
private val hflushMethod = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would wrap this method in hflush stuff as a function from cleanliness, so that we dont have to do hflushMethod.foreach , just call flush()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrapped the call in a different method. I don't want to do a look up every time we call hflush. Doing that on initialization saves a bunch of reflection lookups. For the sake of keeping it a val, I wrapped the lookup in a different method, which is called at the time of initialization

try {
Some(classOf[FSDataOutputStream].getMethod("hflush"))
} catch {
case e: Exception => None
}
}

// Data is always written as:
// - Length - Long
// - Data - of length = Length
def write(data: Array[Byte]): FileSegment = synchronized {
def write(data: ByteBuffer): FileSegment = synchronized {
assertOpen()
val segment = new FileSegment(path, nextOffset, data.length)
stream.writeInt(data.length)
stream.write(data)
stream.hflush()
data.rewind() // Rewind to ensure all data in the buffer is retrieved
val lengthToWrite = data.remaining()
val segment = new FileSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
} else {
// If the buffer is not backed by an array we need to copy the data to an array
val dataArray = new Array[Byte](lengthToWrite)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont want to copy the data multiple times! Why not just write off the data one byte at a time. That should be faster than doing two passes on the data, and allocating memory.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      while(byteBuffer.hasRemaining) {
        stream.writeByte(byteBuffer.get())
      }

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, never mind. Spoke to my colleagues and writing byte at a time is slower than doing extra memcopy. So its fine as is.

data.get(dataArray)
stream.write(dataArray)
}
hflushMethod.foreach(_.invoke(stream))
nextOffset = stream.getPos
segment
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.{RandomAccessFile, File}
import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object TestUtils {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are functions only useful for testing WAL, its best to put them in the same file as the WALSuite.


val random = new Random()

/**
* Writes data to the file and returns the an array of the bytes written.
* @param count
* @return
*/
// We don't want to be using the WAL writer to test the reader - it will be painful to figure
// out where the bug is. Instead generate the file by hand and see if the WAL reader can
// handle it.
def writeData(count: Int, file: File): ArrayBuffer[(ByteBuffer, Long)] = {
val writtenData = new ArrayBuffer[(ByteBuffer, Long)]()
val writer = new RandomAccessFile(file, "rw")
var i = 0
while (i < count) {
val data = generateRandomData()
writtenData += ((data, writer.getFilePointer))
data.rewind()
writer.writeInt(data.remaining())
writer.write(data.array())
i += 1
}
writer.close()
writtenData
}

def readData(segments: Seq[FileSegment], file: File): Seq[ByteBuffer] = {
val reader = new RandomAccessFile(file, "r")
segments.map { x =>
reader.seek(x.offset)
val data = new Array[Byte](x.length)
reader.readInt()
reader.readFully(data)
ByteBuffer.wrap(data)
}
}

def generateRandomData(): ByteBuffer = {
val data = new Array[Byte](random.nextInt(50))
random.nextBytes(data)
ByteBuffer.wrap(data)
}

def writeUsingWriter(file: File, input: Seq[ByteBuffer]): Seq[FileSegment] = {
val writer = new WriteAheadLogWriter(file.toString)
val segments = input.map(writer.write)
writer.close()
segments
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.File

import com.google.common.io.Files

import org.apache.spark.streaming.TestSuiteBase

class WriteAheadLogRandomReaderSuite extends TestSuiteBase {

test("Test successful reads") {
val file = File.createTempFile("testSuccessFulReads", "")
file.deleteOnExit()
val writtenData = TestUtils.writeData(50, file)
val reader = new WriteAheadLogRandomReader("file:///" + file.toString)
writtenData.foreach {
x =>
val length = x._1.remaining()
assert(x._1 === reader.read(new FileSegment(file.toString, x._2, length)))
}
reader.close()
}

test("Test reading data written with writer") {
val dir = Files.createTempDir()
val file = new File(dir, "TestWriter")
try {
val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData()
val segments = TestUtils.writeUsingWriter(file, dataToWrite)
val iter = dataToWrite.iterator
val reader = new WriteAheadLogRandomReader("file:///" + file.toString)
val writtenData = segments.map { x =>
reader.read(x)
}
assert(dataToWrite.toArray === writtenData.toArray)
} finally {
file.delete()
dir.delete()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consolidate all these WAL related tests, in a single testsuite. WriteAheadLogSuite.

* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.File

import com.google.common.io.Files

import org.apache.spark.streaming.TestSuiteBase

class WriteAheadLogReaderSuite extends TestSuiteBase {

test("Test success") {
val file = File.createTempFile("testSuccessFulReads", "")
file.deleteOnExit()
val writtenData = TestUtils.writeData(50, file)
val reader = new WriteAheadLogReader("file:///" + file.toString)
val iter = writtenData.iterator
iter.foreach { x =>
assert(reader.hasNext === true)
assert(reader.next() === x._1)
}
reader.close()
}


test("Test reading data written with writer") {
val dir = Files.createTempDir()
val file = new File(dir, "TestWriter")
try {
val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData()
val segments = TestUtils.writeUsingWriter(file, dataToWrite)
val iter = dataToWrite.iterator
val reader = new WriteAheadLogReader("file:///" + file.toString)
reader.foreach { x =>
assert(x === iter.next())
}
} finally {
file.delete()
dir.delete()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.storage

import java.io.File

import com.google.common.io.Files

import org.apache.spark.streaming.TestSuiteBase

class WriteAheadLogWriterTestSuite extends TestSuiteBase {

test("Test successful writes") {

val dir = Files.createTempDir()
val file = new File(dir, "TestWriter")
try {
val dataToWrite = for (i <- 1 to 50) yield TestUtils.generateRandomData()
val writer = new WriteAheadLogWriter("file:///" + file.toString)
val segments = dataToWrite.map(writer.write)
writer.close()
val writtenData = TestUtils.readData(segments, file)
assert(writtenData.toArray === dataToWrite.toArray)
} finally {
file.delete()
dir.delete()
}
}
}