Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
Expand Down Expand Up @@ -353,6 +354,28 @@ class SparkHadoopUtil extends Logging {
}
buffer.toString
}

private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
val perm = status.getPermission
val ugi = UserGroupInformation.getCurrentUser

if (ugi.getShortUserName == status.getOwner) {
if (perm.getUserAction.implies(mode)) {
return true
}
} else if (ugi.getGroupNames.contains(status.getGroup)) {
if (perm.getGroupAction.implies(mode)) {
return true
}
} else if (perm.getOtherAction.implies(mode)) {
return true
}

logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import scala.xml.Node

import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
Expand Down Expand Up @@ -318,21 +319,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
prevFileSize < entry.getLen()
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
// every poll.
logDebug(s"No permission to read $entry, ignoring.")
false
}
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
prevFileSize < entry.getLen() &&
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
}
.flatMap { entry => Some(entry) }
.sortWith { case (entry1, entry2) =>
Expand Down Expand Up @@ -445,7 +439,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
val newAttempts = try {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.json4s.jackson.JsonMethods._
import org.mockito.Matchers.any
Expand All @@ -36,6 +38,7 @@ import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.io._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -130,9 +133,19 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("SPARK-3697: ignore directories that cannot be read.") {
test("SPARK-3697: ignore files that cannot be read.") {
// setReadable(...) does not work on Windows. Please refer JDK-6728842.
assume(!Utils.isWindows)

class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
var mergeApplicationListingCall = 0
override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
super.mergeApplicationListing(fileStatus)
mergeApplicationListingCall += 1
}
}
val provider = new TestFsHistoryProvider

val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
Expand All @@ -143,12 +156,26 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)

val path = new Path(logFile2.toURI)
val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
val status = fs.getFileStatus(path)
SparkHadoopUtil.get.checkAccessPermission(status, FsAction.READ) should be (true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these tests here? There's no SparkHadoopUtilSuite currently but it makes a lot more sense to create one, since that's where the code lives.

You also don't need to reference real files here; you can create custom FileStatus objects to test the behavior you want (the constructors are public and provide all the parameters you need).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vanzin , I will update the code.


logFile2.setReadable(false, false)
val status1 = fs.getFileStatus(path)
SparkHadoopUtil.get.checkAccessPermission(status1, FsAction.READ) should be (false)

logFile2.setReadable(false, true)
val status2 = fs.getFileStatus(path)
SparkHadoopUtil.get.checkAccessPermission(status2, FsAction.READ) should be (false)


val provider = new FsHistoryProvider(createTestConf())
updateAndCheck(provider) { list =>
list.size should be (1)
}

provider.mergeApplicationListingCall should be (1)
}

test("history file is renamed from inprogress to completed") {
Expand Down