Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
df48e01
LIHADOOP-48527 Magnet shuffle service block transfer netty protocol
Victsm May 9, 2020
6472267
LIHADOOP-53438 Using different appId for the tests in RemoteBlockPush…
otterc May 12, 2020
1c78e1d
LIHADOOP-53496 Not logging all block push exceptions on the client
otterc May 15, 2020
71f3246
LIHADOOP-53700 Separate configuration for caching the merged index fi…
zhouyejoe Jun 1, 2020
221178f
LIHADOOP-53940 Logging the data file and index file path when shuffle…
otterc Jun 10, 2020
55b4a5f
LIHADOOP-54059 LIHADOOP-53496 Handle the inconsistencies between loc…
otterc Jun 15, 2020
f9d0e86
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
548e2c0
LIHADOOP-52494 Magnet fallback to origin shuffle blocks when fetch of…
otterc Jul 24, 2020
50efba9
LIHADOOP-55372 reduced the default for minChunkSizeInMergedShuffleFile
otterc Aug 26, 2020
8a6e01b
LIHADOOP-55315 Avoid network when fetching merged shuffle file in loc…
zhouyejoe Sep 9, 2020
ae5ffac
LIHADOOP-55654 Duplicate application init calls trigger NPE and wrong…
zhouyejoe Sep 12, 2020
e51042b
Further prune changes that should go into a later PR.
Victsm Sep 23, 2020
83aca99
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
04e0efe
LIHADOOP-55022 Disable the merged shuffle file cleanup in stopApplica…
zhouyejoe Aug 11, 2020
71dfd48
Tests and cleanup
otterc Oct 6, 2020
0c411c1
LIHADOOP-55948 Failure in the push stream should not change the curre…
otterc Oct 1, 2020
d029463
Minor style corrections
otterc Oct 15, 2020
8f3839f
Fixed style issues
otterc Oct 15, 2020
1cd2d03
Renamed variables, methods, fixed indentation, addressed other review…
otterc Oct 19, 2020
3356c19
Addressing review comments
otterc Oct 23, 2020
d879beb
Changed the partitions map and addressed other review comments
otterc Oct 26, 2020
48ae819
Added support for subdirs under merge_manager dirs and removed the ya…
otterc Oct 28, 2020
9b031f7
Addressed test failure and other review comments in RemoteBlockPushRe…
otterc Oct 29, 2020
807cc7b
Minor change in finalization
otterc Oct 29, 2020
5b169bc
Removing the partition from inner map after the files are closed
otterc Oct 30, 2020
9ece587
Server side configuration to specify the implementation of MergedShuf…
otterc Oct 30, 2020
d13c7ad
Change the Push block stream to not encode shuffle Id, map index, and…
otterc Nov 2, 2020
63843bb
Fixed typos, address review comments, made NoOp the default impl, and…
otterc Nov 2, 2020
d35aa4b
Addressed review comments
otterc Nov 3, 2020
ba92311
Fix IndexOutOfBoundsException and avoid instantiating AppsPathInfo mu…
otterc Nov 4, 2020
9be25b3
Removed duplicate declaration of shuffle push prefix
otterc Nov 4, 2020
ba51796
Added UT for collision with 2 streams
otterc Nov 4, 2020
1f4fcfe
Removed unnecessary TODOs, marked MergedShuffleFileManager evolving, …
otterc Nov 4, 2020
28edaae
Changed the serialization on chunktracker and removed serializedSizeI…
otterc Nov 6, 2020
cb1881c
Use RandomAccessFile
otterc Nov 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
LIHADOOP-55315 Avoid network when fetching merged shuffle file in loc…
…al host with a consistent view of app local dirs among different executors

RB=2261073
BUG=LIHADOOP-55315
G=spark-reviewers
R=chsingh,mshen,vsowrira,mmuralid
A=mmuralid,chsingh
  • Loading branch information
zhouyejoe authored and otterc committed Nov 6, 2020
commit 8a6e01b1d37112a0427693f381307db8520a8617
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ public int removeBlocks(String appId, String execId, String[] blockIds) {

public Map<String, String[]> getLocalDirs(String appId, String[] execIds) {
return Arrays.stream(execIds)
.filter(exec -> !exec.isEmpty())
.map(exec -> {
ExecutorShuffleInfo info = executors.get(new AppExecId(appId, exec));
if (info == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -71,24 +70,21 @@
public class RemoteBlockPushResolver implements MergedShuffleFileManager {

private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class);
private static final String MERGE_MANAGER_DIR = "merge_manager";

private final Path[] localDirs;
private final ConcurrentMap<String, AppPathsInfo> appsPathInfo;
private final ConcurrentMap<AppShufflePartitionId, AppShufflePartitionInfo> partitions;

private final Executor directoryCleaner;
private final TransportConf conf;
private final int minChunkSize;
private final String relativeMergeDirPathPattern;

private final LoadingCache<File, ShuffleIndexInformation> indexCache;

@SuppressWarnings("UnstableApiUsage")
public RemoteBlockPushResolver(TransportConf conf, String[] localDirs) {
public RemoteBlockPushResolver(TransportConf conf, String relativeMergeDirPathPattern) {
this.conf = conf;
this.localDirs = new Path[localDirs.length];
for (int i = 0; i < localDirs.length; i++) {
this.localDirs[i] = Paths.get(localDirs[i]);
}
this.partitions = Maps.newConcurrentMap();
this.appsPathInfo = Maps.newConcurrentMap();
this.directoryCleaner = Executors.newSingleThreadExecutor(
Expand All @@ -106,6 +102,7 @@ public ShuffleIndexInformation load(File file) throws IOException {
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
.build(indexCacheLoader);
this.relativeMergeDirPathPattern = relativeMergeDirPathPattern;
}

/**
Expand Down Expand Up @@ -205,9 +202,22 @@ private File getFile(String appId, String filename) {
AppPathsInfo appPathsInfo = Preconditions.checkNotNull(
appsPathInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
Path[] activeLocalDirs = appPathsInfo.getActiveLocalDirs(localDirs);
Path[] activeLocalDirs = getActiveLocalDirs(appPathsInfo.activeLocalDirs);
Path localDir = activeLocalDirs[hash % activeLocalDirs.length];
return new File(localDir.resolve(appPathsInfo.relativeMergeDir).toFile(), filename);
String relativePath = getRelativePath(appPathsInfo.user, appId);
Path filePath = localDir.resolve(relativePath);
File targetFile = new File(filePath.toFile(), filename);
logger.info("Get the file for " + targetFile.getAbsolutePath());
return targetFile;
}

private Path[] getActiveLocalDirs(String[] activeLocalDirs) {
return Arrays.stream(activeLocalDirs)
.map(localDir -> Paths.get(localDir)).toArray(Path[]::new);
}

private String getRelativePath(String user, String appId) {
return String.format(relativeMergeDirPathPattern + MERGE_MANAGER_DIR, user, appId);
}

private File getMergedShuffleFile(AppShufflePartitionId id) {
Expand All @@ -225,6 +235,16 @@ private File getMergedMetaFile(AppShufflePartitionId id) {
return getFile(id.appId, metaName);
}

@Override
public String[] getMergedBlockDirs(String appId) {
AppPathsInfo appPathsInfo = Preconditions.checkNotNull(
appsPathInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
return Arrays.stream(appPathsInfo.activeLocalDirs)
.map(dir -> dir + getRelativePath(appPathsInfo.user, appId))
.toArray(String[]::new);
}

@Override
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
Expand All @@ -249,8 +269,9 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
}

if (cleanupLocalDirs) {
Path[] dirs = Arrays.stream(appPathsInfo.activeLocalDirs)
.map(dir -> dir.resolve(appPathsInfo.relativeMergeDir)).toArray(Path[]::new);
Path[] dirs = Arrays.stream(getActiveLocalDirs(appPathsInfo.activeLocalDirs))
.map(dir -> dir.resolve(getRelativePath(appPathsInfo.user, appId)))
.toArray(Path[]::new);
directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
}
}
Expand Down Expand Up @@ -596,14 +617,23 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
}

@Override
public void registerApplication(String appId, String relativeAppPath) {
logger.debug("register application with RemoteBlockPushResolver {} {}", appId, relativeAppPath);
appsPathInfo.put(appId, new AppPathsInfo(Paths.get(relativeAppPath)));
public void registerApplication(String appId, String user) {
logger.debug("register application with RemoteBlockPushResolver {} {}", appId, user);
appsPathInfo.put(appId, new AppPathsInfo(user));
}

@VisibleForTesting
public Path[] getLocalDirs() {
return localDirs;
@Override
public void registerExecutor(String appId, String[] localDirs) {
if (logger.isDebugEnabled()) {
logger.debug("register executor with RemoteBlockPushResolver {} {}",
appId, Arrays.toString(localDirs));
}
Preconditions.checkNotNull(appsPathInfo.get(appId),
"application " + appId + " is not registered or NM was restarted.");
appsPathInfo.compute(appId,
(targetAppId, appPathsInfo) ->
appPathsInfo.updateActiveLocalDirs(
targetAppId, relativeMergeDirPathPattern, localDirs));
}

/**
Expand Down Expand Up @@ -796,28 +826,23 @@ private void writeChunkTracker(int mapId) throws IOException {
*/
private static class AppPathsInfo {

private final Path relativeMergeDir;
private Path[] activeLocalDirs;
private final String user;
private String[] activeLocalDirs;

AppPathsInfo(Path relativeMergeDir) {
this.relativeMergeDir = Preconditions.checkNotNull(
relativeMergeDir, "relative merge directory path cannot be null");
AppPathsInfo(String user) {
this.user = Preconditions.checkNotNull(user, "user cannot be null");
}

private Path[] getActiveLocalDirs(Path[] localDirs) {
if (activeLocalDirs != null) {
return activeLocalDirs;
}
synchronized (this) {
private AppPathsInfo updateActiveLocalDirs(
String appId, String relativePathPattern, String[] localDirs) {
if (activeLocalDirs == null) {
String relativePath = String.format(relativePathPattern, user, appId);
activeLocalDirs = Arrays.stream(localDirs)
.filter(rootDir -> rootDir.resolve(relativeMergeDir).toFile().exists())
.toArray(Path[]::new);
if (activeLocalDirs.length == 0) {
throw new RuntimeException(
"Did not find any active local directories wrt " + relativeMergeDir);
}
.map(localDir -> localDir.substring(0, localDir.indexOf(relativePath)))
.toArray(String[]::new);
logger.info("Updated the active local dirs " + Arrays.toString(activeLocalDirs));
}
return activeLocalDirs;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class RemoteBlockPushResolverSuite {
private TransportConf conf;
private RemoteBlockPushResolver pushResolver;
private String[] localDirs;
private final String MERGE_DIR_RELATIVE_PATH = "usercache/%s/appcache/%s/";
private final String USER = "testuser";
private final String BLOCK_MANAGER_DIR = "blockmgr-193d8401";

@Before
public void before() throws IOException {
Expand All @@ -61,7 +64,7 @@ public void before() throws IOException {
MapConfigProvider provider = new MapConfigProvider(
ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "4"));
conf = new TransportConf("shuffle", provider);
pushResolver = new RemoteBlockPushResolver(conf, localDirs);
pushResolver = new RemoteBlockPushResolver(conf, MERGE_DIR_RELATIVE_PATH);
}

@After
Expand All @@ -84,7 +87,7 @@ private void cleanupLocalDirs() throws IOException {
public void testNoIndexFile() {
try {
String appId = "app_NoIndexFile";
registerApplication(appId, localDirs);
registerApplication(appId, USER);
pushResolver.getMergedBlockMeta(appId, 0, 0);
removeApplication(appId);
} catch (Throwable t) {
Expand All @@ -96,7 +99,8 @@ public void testNoIndexFile() {
@Test
public void testChunkCountsAndBlockData() throws IOException {
String appId = "app_ChunkCountsAndBlockData";
registerApplication(appId, localDirs);
registerApplication(appId, USER);
registerExecutor(appId, prepareBlockManagerLocalDirs(appId, USER, localDirs));
PushBlockStream[] pushBlocks = new PushBlockStream[] {
new PushBlockStream(appId, "shuffle_0_0_0", 0),
new PushBlockStream(appId, "shuffle_0_1_0", 0),
Expand All @@ -114,7 +118,8 @@ public void testChunkCountsAndBlockData() throws IOException {
@Test
public void testMultipleBlocksInAChunk() throws IOException {
String appId = "app_MultipleBlocksInAChunk";
registerApplication(appId, localDirs);
registerApplication(appId, USER);
registerExecutor(appId, prepareBlockManagerLocalDirs(appId, USER, localDirs));
PushBlockStream[] pushBlocks = new PushBlockStream[] {
new PushBlockStream(appId, "shuffle_0_0_0", 0),
new PushBlockStream(appId, "shuffle_0_1_0", 0),
Expand All @@ -137,7 +142,8 @@ public void testMultipleBlocksInAChunk() throws IOException {
public void testAppUsingFewerLocalDirs() throws IOException {
String appId = "app_AppUsingFewerLocalDirs";
String[] activeLocalDirs = Arrays.stream(localDirs).skip(1).toArray(String[]::new);
registerApplication(appId, activeLocalDirs);
registerApplication(appId, USER);
registerExecutor(appId, prepareBlockManagerLocalDirs(appId, USER, activeLocalDirs));
PushBlockStream[] pushBlocks = new PushBlockStream[] {
new PushBlockStream(appId, "shuffle_0_0_0", 0),
new PushBlockStream(appId, "shuffle_0_1_0", 0),
Expand All @@ -161,11 +167,23 @@ public void testAppUsingFewerLocalDirs() throws IOException {
* This is because when the application gets removed, the directory cleaner removes the merged
* data and file in a different thread which can delete the relevant data of a different test.
*/
private void registerApplication(String appId, String[] activeLocalDirs) throws IOException {
for (String localDir : activeLocalDirs) {
Files.createDirectories(Paths.get(localDir).resolve(appId + "/merge_manager"));
private void registerApplication(String appId, String user) throws IOException {
pushResolver.registerApplication(appId, user);
}

private void registerExecutor(String appId, String[] localDirs) throws IOException {
pushResolver.registerExecutor(appId, localDirs);
for (String localDir : pushResolver.getMergedBlockDirs(appId)) {
Files.createDirectories(Paths.get(localDir));
}
pushResolver.registerApplication(appId, appId + "/merge_manager");
}

private String[] prepareBlockManagerLocalDirs(String appId, String user, String[] localDirs){
return Arrays.stream(localDirs)
.map(localDir ->
localDir + "/" +
String.format(MERGE_DIR_RELATIVE_PATH + BLOCK_MANAGER_DIR, user, appId))
.toArray(String[]::new);
}

private void removeApplication(String appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -97,10 +96,7 @@ public class YarnShuffleService extends AuxiliaryService {
private static final boolean DEFAULT_STOP_ON_FAILURE = false;

// Used by shuffle merge manager to create merged shuffle files.
private static final String YARN_LOCAL_DIRS = "yarn.nodemanager.local-dirs";
private static final String MERGE_MANAGER_DIR = "merge_manager";
protected static final String MERGE_DIR_RELATIVE_PATH =
"usercache/%s/appcache/%s/" + MERGE_MANAGER_DIR;
protected static final String APP_BASE_RELATIVE_PATH = "usercache/%s/appcache/%s/";

// just for testing when you want to find an open port
@VisibleForTesting
Expand Down Expand Up @@ -182,9 +178,7 @@ protected void serviceInit(Configuration conf) throws Exception {
}

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
String[] localDirs = Arrays.stream(conf.getTrimmedStrings(YARN_LOCAL_DIRS)).sorted()
.map(dir -> new Path(dir).toUri().getPath()).toArray(String[]::new);
shuffleMergeManager = new RemoteBlockPushResolver(transportConf, localDirs);
shuffleMergeManager = new RemoteBlockPushResolver(transportConf, APP_BASE_RELATIVE_PATH);
blockHandler = new ExternalBlockHandler(transportConf, registeredExecutorFile, shuffleMergeManager);

// If authentication is enabled, set up the shuffle server to use a
Expand Down Expand Up @@ -289,11 +283,10 @@ public void initializeApplication(ApplicationInitializationContext context) {
}
secretManager.registerApp(appId, shuffleSecret);
}
shuffleMergeManager.registerApplication(
appId, String.format(MERGE_DIR_RELATIVE_PATH, context.getUser(), appId));
} catch (Exception e) {
logger.error("Exception when initializing application {}", appId, e);
}
shuffleMergeManager.registerApplication(appId, context.getUser());
}

@Override
Expand Down