Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import scala.xml.Node

import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
Expand Down Expand Up @@ -320,14 +321,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.filter { entry =>
try {
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
fs.access(entry.getPath, FsAction.READ)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is tagged with @LimitedPrivate({"HDFS", "Hive"}), but I think it should be fine to call it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this API actually calls fs.getFileStatus() which is a remote call and completely unnecessary in this context, since you already have the FileStatus.

You could instead directly do the check against the FsPermission object (same way the fs.access() does after it performs the remote call).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, let me change it. Thanks.

!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
prevFileSize < entry.getLen()
} catch {
case e: AccessControlException =>
case _: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
// every poll.
logDebug(s"No permission to read $entry, ignoring.")
Expand Down Expand Up @@ -445,7 +447,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
val newAttempts = try {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
Expand Down Expand Up @@ -571,6 +572,34 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("log without read permission should be filtered out before actual reading") {
class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll need this from another test in this same file:

    // setReadable(...) does not work on Windows. Please refer JDK-6728842.
    assume(!Utils.isWindows)

In fact shouldn't this test be merged with the test for SPARK-3697?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference of this unit test is that this UT checks whether the file is filtered out during check, and for SPARK-3697 UT, it only checks the final result, so file could be filtered out during read. Let me merge this two UTs.

var mergeApplicationListingCall = 0
override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
super.mergeApplicationListing(fileStatus)
mergeApplicationListingCall += 1
}
}

val provider = new TestFsHistoryProvider
val log = newLogFile("app1", Some("app1"), inProgress = true)
writeFile(log, true, None,
SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(),
"test", Some("attempt1")),
SparkListenerApplicationEnd(System.currentTimeMillis()))

// Set the read permission to false to simulate access permission not allowed scenario.
log.setReadable(false)

updateAndCheck(provider) { list =>
list.size should be (0)
}

// Because we already filter out logs without read permission, so it will get a empty file list
// and not invoke mergeApplicationListing() call.
provider.mergeApplicationListingCall should be (0)
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down