Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import scala.xml.Node

import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
import org.apache.hadoop.security.{AccessControlException, UserGroupInformation}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -320,14 +321,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.filter { entry =>
try {
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)

def canAccess = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return type? Also I'd make this a top-level method (with the entry and action as parameters), maybe even in SparkHadoopUtil... just to avoid the deeply-nested method declaration. That allows you to easily write a unit test for it (yay!).

val perm = entry.getPermission
val ugi = UserGroupInformation.getCurrentUser
val user = ugi.getShortUserName
val groups = ugi.getGroupNames
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is only used in one place, just inline the call.

if (user == entry.getOwner && perm.getUserAction.implies(FsAction.READ)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is not correct. The code in Hadoop's FileSystem.checkAccessPermissions is slightly different and actually correct.

For example, if you have a file with permissions 066 and your user matches the owner, you do not have permission to read the file, even if you belong to a group that has permissions to read it. Your code allows that user to read that file.

e.g.

$ sudo ls -la /tmp/test
total 132
d---rwxrwx  2 vanzin vanzin   4096 Abr 12 10:30 .
drwxrwxrwt 78 root   root   126976 Abr 12 10:30 ..
$ ls /tmp/test
ls: cannot open directory '/tmp/test': Permission denied

There's also an issue with superusers (they should always have permissions), but then the Hadoop library also has that problem, so maybe we can ignore that one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about it. Let me fix it.

true
} else if (groups.contains(entry.getGroup) &&
perm.getGroupAction.implies(FsAction.READ)) {
true
} else if (perm.getOtherAction.implies(FsAction.READ)) {
true
} else {
throw new AccessControlException(s"Permission denied: user=$user, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why throw an exception instead of returning false? That makes the function's interface weird.

You could just log the access issue here if you want, and then you can even remove the try..catch.

s"path=${entry.getPath}:${entry.getOwner}:${entry.getGroup}" +
s"${if (entry.isDirectory) "d" else "-"}$perm")
}
}

!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
prevFileSize < entry.getLen()
prevFileSize < entry.getLen() &&
canAccess
} catch {
case e: AccessControlException =>
case _: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
// every poll.
logDebug(s"No permission to read $entry, ignoring.")
Expand Down Expand Up @@ -445,7 +467,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
val newAttempts = try {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
Expand Down Expand Up @@ -130,9 +131,19 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("SPARK-3697: ignore directories that cannot be read.") {
test("SPARK-3697: ignore files that cannot be read.") {
// setReadable(...) does not work on Windows. Please refer JDK-6728842.
assume(!Utils.isWindows)

class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
var mergeApplicationListingCall = 0
override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
super.mergeApplicationListing(fileStatus)
mergeApplicationListingCall += 1
}
}
val provider = new TestFsHistoryProvider

val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
Expand All @@ -145,10 +156,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
)
logFile2.setReadable(false, false)

val provider = new FsHistoryProvider(createTestConf())
updateAndCheck(provider) { list =>
list.size should be (1)
}

provider.mergeApplicationListingCall should be (1)
}

test("history file is renamed from inprogress to completed") {
Expand Down