diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index e5fb417561179..316e5ed49b6ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -25,26 +25,24 @@ import java.io.OutputStream; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.*; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; @@ -288,6 +286,75 @@ private void verifyAndCopy(Path destination) downloadAndUnpack(sCopy, destination); } + /** + * Partitions {@code FileStatus} array into numOfParts parts. + * @param fileStatuses input FileStatus array + * @param numOfParts number of partitions to be created + * @return {@code Collection} of partitions as {@code List} objects + */ + private Collection> partitionInputList( + FileStatus[] fileStatuses, + int numOfParts){ + int chunkSize = (int) Math.ceil((float) fileStatuses.length / numOfParts); + final AtomicInteger counter = new AtomicInteger(); + return Arrays.stream(fileStatuses) + .collect(Collectors.groupingBy(it -> + counter.getAndIncrement() / chunkSize)) + .values(); + } + + /** + * Split directory's contents in groups and localize them in parallel. + * @param sourceFileSystem Source filesystem + * @param destinationFileSystem Destination filesystem + * @param source source path to copy. Typically HDFS + * @param destination destination path. Typically local filesystem + * @exception YarnException Any error has occurred + */ + private void localizeDirectoryInParallel(FileSystem sourceFileSystem, + FileSystem destinationFileSystem, + Path source, Path destination) + throws YarnException { + FileStatus[] fileStatuses; + try { + fileStatuses = sourceFileSystem.listStatus(source); + }catch (Exception e) { + throw new YarnException("Download and unpack failed", e); + } + int nThreads = conf.getInt( + YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT); + AtomicReference ioException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + List> callableList = new ArrayList<>(); + for (List part : partitionInputList(fileStatuses, nThreads)) { + callableList.add(() -> { + try { + Path[] sourcePaths = part.stream() + .map(FileStatus::getPath) + .toArray(Path[]::new); + FileUtil.copy(sourceFileSystem, sourcePaths, destinationFileSystem, + destination, false, true, conf); + } catch (IOException e) { + ioException.set(e); + } + return null; + }); + } + try { + executorService.invokeAll(callableList); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } finally { + executorService.shutdown(); + } + + if (ioException.get() != null){ + throw new YarnException("Download and unpack failed", ioException.get()); + } + } + /** * Copy source path to destination with localization rules. * @param source source path to copy. Typically HDFS @@ -299,11 +366,12 @@ private void downloadAndUnpack(Path source, Path destination) try { FileSystem sourceFileSystem = source.getFileSystem(conf); FileSystem destinationFileSystem = destination.getFileSystem(conf); - if (sourceFileSystem.getFileStatus(source).isDirectory()) { - FileUtil.copy( - sourceFileSystem, source, - destinationFileSystem, destination, false, - true, conf); + FileStatus sourceFileStatus = sourceFileSystem.getFileStatus(source); + if (sourceFileStatus.isDirectory()) { + destinationFileSystem.mkdirs(destination, + sourceFileStatus.getPermission()); + localizeDirectoryInParallel(sourceFileSystem, destinationFileSystem, + source, destination); } else { unpack(source, destination, sourceFileSystem, destinationFileSystem); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 59b779c071df4..0bda34257ccc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -29,6 +29,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -615,7 +616,8 @@ private void verifyPermsRecursively(FileSystem fs, } @Test (timeout=10000) - public void testDirDownload() throws IOException, InterruptedException { + public void testDownloadToDirectory() + throws IOException, InterruptedException { FileContext files = FileContext.getLocalFSFileContext(conf); final Path basedir = files.makeQualified(new Path("target", TestFSDownload.class.getSimpleName())); @@ -678,6 +680,91 @@ public void testDirDownload() throws IOException, InterruptedException { } } + /** + * Download a directory to test parallel directory download. + */ + @Test (timeout=10000) + public void testDirectoryDownload() throws IOException, InterruptedException { + FileContext files = FileContext.getLocalFSFileContext(conf); + Path baseDirectory = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + Path sourceBaseDirectory = new Path(baseDirectory, + Long.toString(uniqueNumberGenerator.incrementAndGet())); + files.mkdir(sourceBaseDirectory, null, true); + conf.setStrings(TestFSDownload.class.getName(), + sourceBaseDirectory.toString()); + + Map rsrcVis = new HashMap<>(); + Random rand = new Random(); + rand.setSeed(rand.nextLong()); + Map> pending = new HashMap<>(); + + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); + LocalDirAllocator dirs = + new LocalDirAllocator(TestFSDownload.class.getName()); + LocalResourceVisibility vis; + LocalResource rsrc; + Path newFilePath; + for (int i = 0; i < 5; i++) { + vis = (i % 2 == 1) ? LocalResourceVisibility.APPLICATION : + LocalResourceVisibility.PRIVATE; + newFilePath = new Path(sourceBaseDirectory, + (char) ('a' + i) + "Dir" + i + ".jar"); + rsrc = createJar(files, newFilePath, vis); + rsrcVis.put(rsrc, vis); + } + rsrc = recordFactory.newRecordInstance(LocalResource.class); + newFilePath = new Path(sourceBaseDirectory, "subDirectory"); + rsrc.setResource(URL.fromPath(newFilePath)); + File newDirectory = new File((files.makeQualified(newFilePath)).toUri()); + Files.createDirectory(newDirectory.toPath()); + FileStatus sourceFileStatus = files.getFileStatus(newFilePath); + + for (int i = 0; i < 5; i++) { + vis = (i % 2 == 1) ? LocalResourceVisibility.PRIVATE : + LocalResourceVisibility.APPLICATION; + newFilePath = new Path(newDirectory.toString(), + "file" + (char) ('A' + i) + ".jar"); + createJar(files, newFilePath, vis); + } + + vis = LocalResourceVisibility.APPLICATION; + rsrc = recordFactory.newRecordInstance(LocalResource.class); + rsrc.setResource(URL.fromPath(sourceBaseDirectory)); + rsrc.setSize(sourceFileStatus.getLen()); + rsrc.setTimestamp(sourceFileStatus.getModificationTime()); + rsrc.setType(LocalResourceType.ARCHIVE); + rsrc.setVisibility(vis); + rsrcVis.put(rsrc, vis); + + Path destPath = dirs.getLocalPathForWrite(baseDirectory.toString(), conf); + destPath = new Path(destPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); + FSDownload directoryFsd = + new FSDownload(files, UserGroupInformation.getCurrentUser(), + conf, destPath, rsrc); + pending.put(rsrc, exec.submit(directoryFsd)); + + exec.shutdown(); + while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS)); + for (Future path: pending.values()) { + Assert.assertTrue(path.isDone()); + } + try { + for (Map.Entry> p : pending.entrySet()) { + Path localized = p.getValue().get(); + FileStatus status = files.getFileStatus(localized); + assert(status.isDirectory()); + assert(rsrcVis.containsKey(p.getKey())); + + verifyPermsRecursively(localized.getFileSystem(conf), + files, localized, rsrcVis.get(p.getKey())); + } + } catch (ExecutionException e) { + throw new IOException("Failed exec", e); + } + } + @Test (timeout=10000) public void testUniqueDestinationPath() throws Exception { FileContext files = FileContext.getLocalFSFileContext(conf);