Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Remove underlying stream from the WALWriter.
  • Loading branch information
harishreedharan committed Oct 22, 2014
commit 5c70d1f2b0050c2d5ca71d5f9d0eb499417b827e
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
*/
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
extends Closeable {
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
val uri = new URI(path)
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
assert(!new File(uri.getPath).exists)
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
} else {
Right(HdfsUtils.getOutputStream(path, hadoopConf))
}
}
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)

private lazy val hadoopFlushMethod = {
val cls = classOf[FSDataOutputStream]
Expand Down Expand Up @@ -77,21 +67,14 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
stream.close()
}

private def stream(): DataOutputStream = {
underlyingStream.fold(x => x, x => x)
}

private def getPosition(): Long = {
underlyingStream match {
case Left(localStream) => localStream.size
case Right(dfsStream) => dfsStream.getPos()
}
stream.getPos()
}

private def flush() {
underlyingStream match {
case Left(localStream) => localStream.flush
case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) }
hadoopFlushMethod.foreach {
_.invoke(stream)
}
}

Expand Down