Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed comments
  • Loading branch information
tdas committed Apr 12, 2018
commit 35c66367ef5c1a88689ddcaad52ff36f4ee91494
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
*/
package org.apache.spark.sql.execution.streaming

import java.io.{FileSystem => _, _}
import java.io.{FileNotFoundException, IOException, OutputStream}
import java.util.{EnumSet, UUID}

import scala.util.control.NonFatal

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
Expand Down Expand Up @@ -228,7 +227,8 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
}

override def createAtomic(
path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
path: Path,
overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
}

Expand Down Expand Up @@ -311,7 +311,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
}

override def createAtomic(
path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
path: Path,
overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession {
}

test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") {
import FakeFileSystem.scheme
spark.conf.set(s"fs.$scheme.impl", classOf[FakeFileSystem].getName)
import CheckpointFileManagerSuiteFileSystem.scheme
spark.conf.set(s"fs.$scheme.impl", classOf[CheckpointFileManagerSuiteFileSystem].getName)
quietly {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
Expand Down Expand Up @@ -177,15 +177,18 @@ object TestCheckpointFileManager {
}


/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */
private class FakeFileSystem extends RawLocalFileSystem {
import FakeFileSystem.scheme
/**
* CheckpointFileManagerSuiteFileSystem to test fallback of the CheckpointFileManager
* from FileContext to FileSystem API.
*/
private class CheckpointFileManagerSuiteFileSystem extends RawLocalFileSystem {
import CheckpointFileManagerSuiteFileSystem.scheme

override def getUri: URI = {
URI.create(s"$scheme:///")
}
}

private object FakeFileSystem {
val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}"
private object CheckpointFileManagerSuiteFileSystem {
val scheme = s"CheckpointFileManagerSuiteFileSystem${math.abs(Random.nextInt)}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import org.apache.spark.sql.test.SharedSQLContext

class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext {

/** To avoid caching of FS objects */
override protected def sparkConf =
super.sparkConf.set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")

import CompactibleFileStreamLog._

/** -- testing of `object CompactibleFileStreamLog` begins -- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,19 @@
package org.apache.spark.sql.execution.streaming

import java.io.File
import java.net.URI
import java.util.ConcurrentModificationException

import scala.language.implicitConversions
import scala.util.Random

import org.apache.hadoop.fs._
import org.scalatest.concurrent.Waiters._
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.streaming.FakeFileSystem._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.UninterruptibleThread

class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {

/** To avoid caching of FS objects */
override protected def sparkConf =
super.sparkConf.set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")

private implicit def toOption[A](a: A): Option[A] = Option(a)

test("HDFSMetadataLog: basic") {
Expand Down