diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 8f6a22177a5b..e7c45a9faa1d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -17,13 +17,15 @@ package org.apache.spark.scheduler -import java.io.{FileInputStream, InputStream} +import java.io.InputStream import java.util.{Locale, NoSuchElementException, Properties} import scala.util.control.NonFatal import scala.xml.{Node, XML} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -54,10 +56,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) } } -private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) +private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext) extends SchedulableBuilder with Logging { - val schedulerAllocFile = conf.get(SCHEDULER_ALLOCATION_FILE) + val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE) val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL val DEFAULT_POOL_NAME = "default" @@ -74,7 +76,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) var fileData: Option[(InputStream, String)] = None try { fileData = schedulerAllocFile.map { f => - val fis = new FileInputStream(f) + val filePath = new Path(f) + val fis = filePath.getFileSystem(sc.hadoopConfiguration).open(filePath) logInfo(s"Creating Fair Scheduler pools from $f") Some((fis, f)) }.getOrElse { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 373aab46b025..ef3a558d54bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -205,7 +205,7 @@ private[spark] class TaskSchedulerImpl( case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => - new FairSchedulableBuilder(rootPool, conf) + new FairSchedulableBuilder(rootPool, sc) case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index d9de976c789d..fa2c5eaee8ba 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -20,10 +20,14 @@ package org.apache.spark.scheduler import java.io.FileNotFoundException import java.util.Properties -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.VersionInfo + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.SchedulingMode._ +import org.apache.spark.util.Utils /** * Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work @@ -87,7 +91,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val taskScheduler = new TaskSchedulerImpl(sc) val rootPool = new Pool("", FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) schedulableBuilder.buildPools() // Ensure that the XML file was read in correctly. @@ -185,9 +189,10 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml") .getFile() val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) + sc = new SparkContext(LOCAL, APP_NAME, conf) val rootPool = new Pool("", FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) schedulableBuilder.buildPools() verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) @@ -239,7 +244,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val taskScheduler = new TaskSchedulerImpl(sc) val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) schedulableBuilder.buildPools() // Submit a new task set manager with pool properties set to null. This should result @@ -267,7 +272,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val taskScheduler = new TaskSchedulerImpl(sc) val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) schedulableBuilder.buildPools() assert(rootPool.getSchedulableByName(TEST_POOL) === null) @@ -302,7 +307,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(LOCAL, APP_NAME, conf) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) schedulableBuilder.buildPools() verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) @@ -317,7 +322,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(LOCAL, APP_NAME, conf) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) schedulableBuilder.buildPools() verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) @@ -332,12 +337,36 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(LOCAL, APP_NAME, conf) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) intercept[FileNotFoundException] { schedulableBuilder.buildPools() } } + test("SPARK-35083: Support remote scheduler pool file") { + val hadoopVersion = VersionInfo.getVersion.split("\\.") + // HttpFileSystem supported since hadoop 2.9 + assume(hadoopVersion.head.toInt >= 3 || + (hadoopVersion.head.toInt == 2 && hadoopVersion(1).toInt >= 9)) + + val xmlPath = new Path( + Utils.getSparkClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile) + TestUtils.withHttpServer(xmlPath.getParent.toUri.getPath) { baseURL => + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, + baseURL + "fairscheduler-with-valid-data.xml") + sc = new SparkContext(LOCAL, APP_NAME, conf) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) + schedulableBuilder.buildPools() + + verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO) + verifyPool(rootPool, "pool1", 3, 1, FIFO) + verifyPool(rootPool, "pool2", 4, 2, FAIR) + verifyPool(rootPool, "pool3", 2, 3, FAIR) + } + } + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index f2b77cdfcd2c..51060ddf8132 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -252,10 +252,11 @@ properties: The pool properties can be set by creating an XML file, similar to `conf/fairscheduler.xml.template`, and either putting a file named `fairscheduler.xml` on the classpath, or setting `spark.scheduler.allocation.file` property in your -[SparkConf](configuration.html#spark-properties). +[SparkConf](configuration.html#spark-properties). The file path can either be a local file path or HDFS file path. {% highlight scala %} conf.set("spark.scheduler.allocation.file", "/path/to/file") +conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file") {% endhighlight %} The format of the XML file is simply a `` element for each pool, with different elements