-
Notifications
You must be signed in to change notification settings - Fork 3
Fix the way we decide if hasNext is true or not in WALReader. Unit tests... #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
4d08fde
073d1f8
164bd14
d8f14cd
a2457e4
bbfeae1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { | |
| private var closed = false | ||
| private val hflushMethod = { | ||
| try { | ||
| Some(classOf[FSDataOutputStream].getMethod("hflush", new Array[Class[Object]](0): _*)) | ||
| Some(classOf[FSDataOutputStream].getMethod("hflush")) | ||
| } catch { | ||
| case e: Exception => None | ||
| } | ||
|
|
@@ -38,14 +38,14 @@ private[streaming] class WriteAheadLogWriter(path: String) extends Closeable { | |
| // - Data - of length = Length | ||
| def write(data: ByteBuffer): FileSegment = synchronized { | ||
| assertOpen() | ||
| data.rewind() // Rewind to ensure all data in the buffer is retrieved | ||
| val lengthToWrite = data.remaining() | ||
| val segment = new FileSegment(path, nextOffset, lengthToWrite) | ||
| stream.writeInt(lengthToWrite) | ||
| if (data.hasArray) { | ||
| stream.write(data.array()) | ||
| } else { | ||
| // If the buffer is not backed by an array we need to copy the data to an array | ||
| data.rewind() // Rewind to ensure all data in the buffer is retrieved | ||
| val dataArray = new Array[Byte](lengthToWrite) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We dont want to copy the data multiple times! Why not just write off the data one byte at a time. That should be faster than doing two passes on the data, and allocating memory.
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, never mind. Spoke to my colleagues and writing byte at a time is slower than doing extra memcopy. So its fine as is. |
||
| data.get(dataArray) | ||
| stream.write(dataArray) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would wrap this method in hflush stuff as a function from cleanliness, so that we dont have to do
hflushMethod.foreach, just callflush()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrapped the call in a different method. I don't want to do a look up every time we call hflush. Doing that on initialization saves a bunch of reflection lookups. For the sake of keeping it a val, I wrapped the lookup in a different method, which is called at the time of initialization