Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ private[streaming] object HdfsUtils {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = this.synchronized {
dfsPath.getFileSystem(conf)
}
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
Expand All @@ -54,17 +53,16 @@ private[streaming] object HdfsUtils {
}

def checkState(state: Boolean, errorMsg: => String) {
if(!state) {
if (!state) {
throw new IllegalStateException(errorMsg)
}
}

def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = this.synchronized {
dfsPath.getFileSystem(conf)
}
val fileStatus = dfs.getFileStatus(dfsPath)
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
blockLocs.map(_.flatMap(_.getHosts))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ private[streaming] class WriteAheadLogManager(
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
} else {
fileSystem.mkdirs(logDirectoryPath,
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
logInfo(s"Created ${logDirectory} for write ahead log files")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
*/
private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
extends Closeable {
private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
val uri = new URI(path)
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
assert(!new File(uri.getPath).exists)
Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
} else {
Right(HdfsUtils.getOutputStream(path, hadoopConf))
}
}
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)

private lazy val hadoopFlushMethod = {
val cls = classOf[FSDataOutputStream]
Expand Down Expand Up @@ -77,21 +67,14 @@ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configura
stream.close()
}

private def stream(): DataOutputStream = {
underlyingStream.fold(x => x, x => x)
}

private def getPosition(): Long = {
underlyingStream match {
case Left(localStream) => localStream.size
case Right(dfsStream) => dfsStream.getPos()
}
stream.getPos()
}

private def flush() {
underlyingStream match {
case Left(localStream) => localStream.flush
case Right(dfsStream) => hadoopFlushMethod.foreach { _.invoke(dfsStream) }
hadoopFlushMethod.foreach {
_.invoke(stream)
}
}

Expand Down
Loading