Skip to content
Merged
Prev Previous commit
Next Next commit
Make most methods in writer and reader thread-safe.
  • Loading branch information
harishreedharan committed Oct 1, 2014
commit f6d3a9df1af7e9a29a5d6bd6a64c3242cfd02822
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ class HdfsSequentialReader(val path: String) {
val instream = HdfsUtils.getInputStream(path)

def hasNext: Boolean = {
instream.available() != 0
synchronized {
instream.available() != 0
}
}

def readNext(): Array[Byte] = {
val length = instream.readInt()
val buffer = new Array[Byte](length)
instream.readFully(buffer)
buffer
synchronized {
val length = instream.readInt()
val buffer = new Array[Byte](length)
instream.readFully(buffer)
buffer
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ class HdfsWalRandomReader(val path: String) {
val instream = HdfsUtils.getInputStream(path)

def read(segment: FileSegment): Array[Byte] = {
instream.seek(segment.offset)
val nextLength = instream.readInt()
assert(nextLength == segment.length, "Expected message length to be " + segment.length + ", " +
"but was " + nextLength)
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
buffer
synchronized {
instream.seek(segment.offset)
val nextLength = instream.readInt()
assert(nextLength == segment.length,
"Expected message length to be " + segment.length + ", " +
"but was " + nextLength)
val buffer = new Array[Byte](nextLength)
instream.readFully(buffer)
buffer
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ class HdfsWalWriter(val path: String) {
// - Length - Long
// - Data - of length = Length
def write(data: Array[Byte]): FileSegment = {
val segment = new FileSegment(path, nextOffset, data.length)
stream.writeInt(data.length)
stream.write(data)
stream.hflush()
nextOffset = stream.getPos
segment
synchronized {
val segment = new FileSegment(path, nextOffset, data.length)
stream.writeInt(data.length)
stream.write(data)
stream.hflush()
nextOffset = stream.getPos
segment
}
}
}