Skip to content

Commit ce92a74

Browse files
author
guoxiaolong
committed
2 parents cb71f44 + 48d760d commit ce92a74

File tree

50 files changed

+676
-241
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+676
-241
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
2828
import com.google.common.primitives.Longs
2929
import org.apache.hadoop.conf.Configuration
3030
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
31+
import org.apache.hadoop.fs.permission.FsAction
3132
import org.apache.hadoop.mapred.JobConf
3233
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3334
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -353,6 +354,28 @@ class SparkHadoopUtil extends Logging {
353354
}
354355
buffer.toString
355356
}
357+
358+
private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
359+
val perm = status.getPermission
360+
val ugi = UserGroupInformation.getCurrentUser
361+
362+
if (ugi.getShortUserName == status.getOwner) {
363+
if (perm.getUserAction.implies(mode)) {
364+
return true
365+
}
366+
} else if (ugi.getGroupNames.contains(status.getGroup)) {
367+
if (perm.getGroupAction.implies(mode)) {
368+
return true
369+
}
370+
} else if (perm.getOtherAction.implies(mode)) {
371+
return true
372+
}
373+
374+
logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
375+
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
376+
s"${if (status.isDirectory) "d" else "-"}$perm")
377+
false
378+
}
356379
}
357380

358381
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import scala.xml.Node
2727

2828
import com.google.common.io.ByteStreams
2929
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
30-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
30+
import org.apache.hadoop.fs.{FileStatus, Path}
31+
import org.apache.hadoop.fs.permission.FsAction
3132
import org.apache.hadoop.hdfs.DistributedFileSystem
3233
import org.apache.hadoop.hdfs.protocol.HdfsConstants
3334
import org.apache.hadoop.security.AccessControlException
@@ -318,21 +319,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
318319
// scan for modified applications, replay and merge them
319320
val logInfos: Seq[FileStatus] = statusList
320321
.filter { entry =>
321-
try {
322-
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
323-
!entry.isDirectory() &&
324-
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
325-
// reading a garbage file is safe, but we would log an error which can be scary to
326-
// the end-user.
327-
!entry.getPath().getName().startsWith(".") &&
328-
prevFileSize < entry.getLen()
329-
} catch {
330-
case e: AccessControlException =>
331-
// Do not use "logInfo" since these messages can get pretty noisy if printed on
332-
// every poll.
333-
logDebug(s"No permission to read $entry, ignoring.")
334-
false
335-
}
322+
val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
323+
!entry.isDirectory() &&
324+
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
325+
// reading a garbage file is safe, but we would log an error which can be scary to
326+
// the end-user.
327+
!entry.getPath().getName().startsWith(".") &&
328+
prevFileSize < entry.getLen() &&
329+
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
336330
}
337331
.flatMap { entry => Some(entry) }
338332
.sortWith { case (entry1, entry2) =>
@@ -445,7 +439,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
445439
/**
446440
* Replay the log files in the list and merge the list of old applications with new ones
447441
*/
448-
private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
442+
protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
449443
val newAttempts = try {
450444
val eventsFilter: ReplayEventsFilter = { eventString =>
451445
eventString.startsWith(APPL_START_EVENT_PREFIX) ||

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,8 @@ private[spark] class Executor(
432432
setTaskFinishedAndClearInterruptStatus()
433433
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason)))
434434

435-
case NonFatal(_) if task != null && task.reasonIfKilled.isDefined =>
435+
case _: InterruptedException | NonFatal(_) if
436+
task != null && task.reasonIfKilled.isDefined =>
436437
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
437438
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
438439
setTaskFinishedAndClearInterruptStatus()

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
540540
}
541541
}
542542

543-
// Launches one task that will run forever. Once the SparkListener detects the task has
543+
testCancellingTasks("that raise interrupted exception on cancel") {
544+
Thread.sleep(9999999)
545+
}
546+
547+
// SPARK-20217 should not fail stage if task throws non-interrupted exception
548+
testCancellingTasks("that raise runtime exception on cancel") {
549+
try {
550+
Thread.sleep(9999999)
551+
} catch {
552+
case t: Throwable =>
553+
throw new RuntimeException("killed")
554+
}
555+
}
556+
557+
// Launches one task that will block forever. Once the SparkListener detects the task has
544558
// started, kill and re-schedule it. The second run of the task will complete immediately.
545559
// If this test times out, then the first version of the task wasn't killed successfully.
546-
test("Killing tasks") {
560+
def testCancellingTasks(desc: String)(blockFn: => Unit): Unit = test(s"Killing tasks $desc") {
547561
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
548562

549563
SparkContextSuite.isTaskStarted = false
@@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
572586
// first attempt will hang
573587
if (!SparkContextSuite.isTaskStarted) {
574588
SparkContextSuite.isTaskStarted = true
575-
try {
576-
Thread.sleep(9999999)
577-
} catch {
578-
case t: Throwable =>
579-
// SPARK-20217 should not fail stage if task throws non-interrupted exception
580-
throw new RuntimeException("killed")
581-
}
589+
blockFn
582590
}
583591
// second attempt succeeds immediately
584592
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import java.security.PrivilegedExceptionAction
21+
22+
import scala.util.Random
23+
24+
import org.apache.hadoop.fs.FileStatus
25+
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
26+
import org.apache.hadoop.security.UserGroupInformation
27+
import org.scalatest.Matchers
28+
29+
import org.apache.spark.SparkFunSuite
30+
31+
class SparkHadoopUtilSuite extends SparkFunSuite with Matchers {
32+
test("check file permission") {
33+
import FsAction._
34+
val testUser = s"user-${Random.nextInt(100)}"
35+
val testGroups = Array(s"group-${Random.nextInt(100)}")
36+
val testUgi = UserGroupInformation.createUserForTesting(testUser, testGroups)
37+
38+
testUgi.doAs(new PrivilegedExceptionAction[Void] {
39+
override def run(): Void = {
40+
val sparkHadoopUtil = new SparkHadoopUtil
41+
42+
// If file is owned by user and user has access permission
43+
var status = fileStatus(testUser, testGroups.head, READ_WRITE, READ_WRITE, NONE)
44+
sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
45+
sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
46+
47+
// If file is owned by user but user has no access permission
48+
status = fileStatus(testUser, testGroups.head, NONE, READ_WRITE, NONE)
49+
sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
50+
sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
51+
52+
val otherUser = s"test-${Random.nextInt(100)}"
53+
val otherGroup = s"test-${Random.nextInt(100)}"
54+
55+
// If file is owned by user's group and user's group has access permission
56+
status = fileStatus(otherUser, testGroups.head, NONE, READ_WRITE, NONE)
57+
sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
58+
sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
59+
60+
// If file is owned by user's group but user's group has no access permission
61+
status = fileStatus(otherUser, testGroups.head, READ_WRITE, NONE, NONE)
62+
sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
63+
sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
64+
65+
// If file is owned by other user and this user has access permission
66+
status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, READ_WRITE)
67+
sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
68+
sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
69+
70+
// If file is owned by other user but this user has no access permission
71+
status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, NONE)
72+
sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
73+
sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
74+
75+
null
76+
}
77+
})
78+
}
79+
80+
private def fileStatus(
81+
owner: String,
82+
group: String,
83+
userAction: FsAction,
84+
groupAction: FsAction,
85+
otherAction: FsAction): FileStatus = {
86+
new FileStatus(0L,
87+
false,
88+
0,
89+
0L,
90+
0L,
91+
0L,
92+
new FsPermission(userAction, groupAction, otherAction),
93+
owner,
94+
group,
95+
null)
96+
}
97+
}

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
2727
import scala.language.postfixOps
2828

2929
import com.google.common.io.{ByteStreams, Files}
30+
import org.apache.hadoop.fs.FileStatus
3031
import org.apache.hadoop.hdfs.DistributedFileSystem
3132
import org.json4s.jackson.JsonMethods._
3233
import org.mockito.Matchers.any
@@ -130,9 +131,19 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
130131
}
131132
}
132133

133-
test("SPARK-3697: ignore directories that cannot be read.") {
134+
test("SPARK-3697: ignore files that cannot be read.") {
134135
// setReadable(...) does not work on Windows. Please refer JDK-6728842.
135136
assume(!Utils.isWindows)
137+
138+
class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
139+
var mergeApplicationListingCall = 0
140+
override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
141+
super.mergeApplicationListing(fileStatus)
142+
mergeApplicationListingCall += 1
143+
}
144+
}
145+
val provider = new TestFsHistoryProvider
146+
136147
val logFile1 = newLogFile("new1", None, inProgress = false)
137148
writeFile(logFile1, true, None,
138149
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
@@ -145,10 +156,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
145156
)
146157
logFile2.setReadable(false, false)
147158

148-
val provider = new FsHistoryProvider(createTestConf())
149159
updateAndCheck(provider) { list =>
150160
list.size should be (1)
151161
}
162+
163+
provider.mergeApplicationListingCall should be (1)
152164
}
153165

154166
test("history file is renamed from inprogress to completed") {

0 commit comments

Comments
 (0)