Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
df48e01
LIHADOOP-48527 Magnet shuffle service block transfer netty protocol
Victsm May 9, 2020
6472267
LIHADOOP-53438 Using different appId for the tests in RemoteBlockPush…
otterc May 12, 2020
1c78e1d
LIHADOOP-53496 Not logging all block push exceptions on the client
otterc May 15, 2020
71f3246
LIHADOOP-53700 Separate configuration for caching the merged index fi…
zhouyejoe Jun 1, 2020
221178f
LIHADOOP-53940 Logging the data file and index file path when shuffle…
otterc Jun 10, 2020
55b4a5f
LIHADOOP-54059 LIHADOOP-53496 Handle the inconsistencies between loc…
otterc Jun 15, 2020
f9d0e86
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
548e2c0
LIHADOOP-52494 Magnet fallback to origin shuffle blocks when fetch of…
otterc Jul 24, 2020
50efba9
LIHADOOP-55372 reduced the default for minChunkSizeInMergedShuffleFile
otterc Aug 26, 2020
8a6e01b
LIHADOOP-55315 Avoid network when fetching merged shuffle file in loc…
zhouyejoe Sep 9, 2020
ae5ffac
LIHADOOP-55654 Duplicate application init calls trigger NPE and wrong…
zhouyejoe Sep 12, 2020
e51042b
Further prune changes that should go into a later PR.
Victsm Sep 23, 2020
83aca99
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
04e0efe
LIHADOOP-55022 Disable the merged shuffle file cleanup in stopApplica…
zhouyejoe Aug 11, 2020
71dfd48
Tests and cleanup
otterc Oct 6, 2020
0c411c1
LIHADOOP-55948 Failure in the push stream should not change the curre…
otterc Oct 1, 2020
d029463
Minor style corrections
otterc Oct 15, 2020
8f3839f
Fixed style issues
otterc Oct 15, 2020
1cd2d03
Renamed variables, methods, fixed indentation, addressed other review…
otterc Oct 19, 2020
3356c19
Addressing review comments
otterc Oct 23, 2020
d879beb
Changed the partitions map and addressed other review comments
otterc Oct 26, 2020
48ae819
Added support for subdirs under merge_manager dirs and removed the ya…
otterc Oct 28, 2020
9b031f7
Addressed test failure and other review comments in RemoteBlockPushRe…
otterc Oct 29, 2020
807cc7b
Minor change in finalization
otterc Oct 29, 2020
5b169bc
Removing the partition from inner map after the files are closed
otterc Oct 30, 2020
9ece587
Server side configuration to specify the implementation of MergedShuf…
otterc Oct 30, 2020
d13c7ad
Change the Push block stream to not encode shuffle Id, map index, and…
otterc Nov 2, 2020
63843bb
Fixed typos, address review comments, made NoOp the default impl, and…
otterc Nov 2, 2020
d35aa4b
Addressed review comments
otterc Nov 3, 2020
ba92311
Fix IndexOutOfBoundsException and avoid instantiating AppsPathInfo mu…
otterc Nov 4, 2020
9be25b3
Removed duplicate declaration of shuffle push prefix
otterc Nov 4, 2020
ba51796
Added UT for collision with 2 streams
otterc Nov 4, 2020
1f4fcfe
Removed unnecessary TODOs, marked MergedShuffleFileManager evolving, …
otterc Nov 4, 2020
28edaae
Changed the serialization on chunktracker and removed serializedSizeI…
otterc Nov 6, 2020
cb1881c
Use RandomAccessFile
otterc Nov 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Change the Push block stream to not encode shuffle Id, map index, and…
… reduceId in a string
  • Loading branch information
otterc committed Nov 6, 2020
commit d13c7ade5e4ae7e8d319a82523a004c4cf36e7da
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,14 @@ public void start() {
for (int i = 0; i < blockIds.length; i++) {
assert buffers.containsKey(blockIds[i]) : "Could not find the block buffer for block "
+ blockIds[i];
ByteBuffer header = new PushBlockStream(appId, blockIds[i], i).toByteBuffer();
String[] blockIdParts = blockIds[i].split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals(
PushBlockStream.SHUFFLE_PUSH_BLOCK_PREFIX)) {
throw new IllegalArgumentException(
"Unexpected shuffle push block id format: " + blockIds[i]);
}
ByteBuffer header = new PushBlockStream(appId, Integer.parseInt(blockIdParts[1]),
Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
client.uploadStream(new NioManagedBuffer(header), buffers.get(blockIds[i]),
new BlockPushCallback(i, blockIds[i]));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,9 @@ void deleteExecutorDirs(Path[] dirs) {
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
// Retrieve merged shuffle file metadata
String[] blockIdParts = msg.blockId.split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + msg.blockId);
}
AppShuffleId appShuffleId = new AppShuffleId(msg.appId, Integer.parseInt(blockIdParts[1]));
int mapIndex = Integer.parseInt(blockIdParts[2]);
int reduceId = Integer.parseInt(blockIdParts[3]);
AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
AppShufflePartitionInfo partitionInfoBeforeCheck =
getOrCreateAppShufflePartitionInfo(appShuffleId, reduceId);

getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId);
// Here partitionInfo will be null in 2 cases:
// 1) The request is received for a block that has already been merged, this is possible due
// to the retry logic.
Expand Down Expand Up @@ -321,17 +314,18 @@ public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
final boolean isTooLate = partitionInfoBeforeCheck == null;
// Check if the given block is already merged by checking the bitmap against the given map index
final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null
&& partitionInfoBeforeCheck.mapTracker.contains(mapIndex) ? null : partitionInfoBeforeCheck;
&& partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null
: partitionInfoBeforeCheck;
if (partitionInfo != null) {
return new PushBlockStreamCallback(
this, msg, appShuffleId, reduceId, mapIndex, partitionInfo);
this, msg, appShuffleId, msg.reduceId, msg.mapIndex, partitionInfo);
} else {
// For a duplicate block or a block which is late, respond back with a callback that handles
// them differently.
return new StreamCallbackWithID() {
@Override
public String getID() {
return msg.blockId;
return msg.streamId;
}

@Override
Expand All @@ -345,7 +339,7 @@ public void onComplete(String streamId) {
if (isTooLate) {
// Throw an exception here so the block data is drained from channel and server
// responds RpcFailure to the client.
throw new RuntimeException(String.format("Block %s %s", msg.blockId,
throw new RuntimeException(String.format("Block %s %s", msg.streamId,
ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
}
// For duplicate block that is received before the shuffle merge finalizes, the
Expand Down Expand Up @@ -463,7 +457,7 @@ private PushBlockStreamCallback(

@Override
public String getID() {
return msg.blockId;
return msg.streamId;
}

/**
Expand Down Expand Up @@ -635,7 +629,7 @@ public void onComplete(String streamId) throws IOException {
// however is already finalized. We should thus respond RpcFailure to the client.
if (shufflePartitions == null || !shufflePartitions.containsKey(reduceId)) {
deferredBufs = null;
throw new RuntimeException(String.format("Block %s %s", msg.blockId,
throw new RuntimeException(String.format("Block %s %s", msg.streamId,
ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
}
// Check if we can commit this block
Expand Down Expand Up @@ -668,7 +662,7 @@ public void onComplete(String streamId) throws IOException {
deferredBufs = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we log a warning too in addition to the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are currently logged during the onFailure callback but we are logging at debug level.
Logging them at warning level will clutter the NM logs.
Also, this seems to be expected behavior. There would be some blocks which didn't get the opportunity to merge and doesn't indicate that something is wrong with the shuffle server which is unexpected.
@Victsm What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I know this's the expected behavior but I really care about this case and want to have a way to easily monitor it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. In fact, we have also started working on adding server-side metrics that will account for colliding blocks. Metrics will also make it clearer whether any improvements in this area is effective or not. Adding these metrics are pending tasks listed in the SPIP but I haven't created a Spark jira for it yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to get this information through metrics instead of log.
Since this is inside shuffle service log not Spark driver/executor log, it's usually more difficult to access these logs.
Server side metrics would help to surface the necessary information without polluting the NM logs with many exceptions that doesn't really hurt.

throw new RuntimeException(String.format("%s %s to merged shuffle",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case for collisions are not tested.

Had you considered storing the deferredBufs of the detected collisions (this side) and write them out right after the collided callback (the other) finished its writing?

Copy link
Contributor Author

@otterc otterc Nov 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a testFailureWith3Streams which tests collisions

Had you considered storing the deferredBufs of the detected collisions (this side) and write them out right after the collided callback (the other) finished its writing?

I don't understand the suggestion copletely. The deferredBufs are part of the stream and if the stream doesn't get opportunity to write, then it lands here. We are not storing deferredBufs outside the stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a previous conversation related to this here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have also added another simple UT for collisions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea is sharing those deferredBufs in case of collision via the partitionInfo (in-memory and not via a new file, for example stored in a collection to be able to handle multiple collisions too) with the other stream so let this stream finish successfully and the other stream which is able to write to handle this collided deferredBufs.

Copy link
Contributor Author

@otterc otterc Nov 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. That will impact the memory usage though. Currently these deferredBufs live as long as the stream is alive. Storing it outside increases the time they stay in memory and that could increase the memory considerably.

There is another discussion about limiting the number of bufs in memory here which is relevant to this topic.
The approach we have used so far with push-merge is that it is best-effort. So, current behavior is in-line with that.
@Victsm @mridulm @Ngone51 What do you think?

I do think that if we decide to try this, it should be a follow-up because this change requires us to evaluate how much memory the shuffle service will use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, a follow-up PR is fine for me. My question was really about whether this idea was considered and evaluated. For sure the memory will be increased (at least the time how long this deferred buffs will be kept in the memory but of course as other streams are working this will lead to increased amount too).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated this jira
https://issues.apache.org/jira/browse/SPARK-33331#
I think when we introduce a limit on the number of pending blocks in memory, then we can also add the ignored blocks (the ones that reach here) to memory if there is availability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have left a comment to the JIRA.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to add that one of the things we are considering is the necessity of a bounded in-memory buffer on the server side to help with writing the blocks to merged files.
The consideration is more from the aspect of how much potential improvement we could see on the disk write I/O when merging the blocks, and less about reaching better merge ratio for blocks.
These 2 things are however very related though, and will be something that we will consider holistically inside SPARK-33331.

ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX,
msg.blockId));
msg.streamId));
}
}
isWriting = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,31 @@
import org.apache.spark.network.protocol.Encoders;

// Needed by ScalaDoc. See SPARK-7726
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;


/**
* Request to push a block to a remote shuffle service to be merged in push based shuffle.
* The remote shuffle service will also include this message when responding the push requests.
*/
public class PushBlockStream extends BlockTransferMessage {
public static final String SHUFFLE_PUSH_BLOCK_PREFIX = "shufflePush";
public final String appId;
public final String blockId;
public final int shuffleId;
public final int mapIndex;
public final int reduceId;
// Similar to the chunkIndex in StreamChunkId, indicating the index of a block in a batch of
// blocks to be pushed.
public final int index;
public final String streamId;

public PushBlockStream(String appId, String blockId, int index) {
public PushBlockStream(String appId, int shuffleId, int mapIndex, int reduceId, int index) {
this.appId = appId;
this.blockId = blockId;
this.shuffleId = shuffleId;
this.mapIndex = mapIndex;
this.reduceId = reduceId;
this.index = index;
this.streamId = String.format("%s_%d_%d_%d", SHUFFLE_PUSH_BLOCK_PREFIX,
shuffleId, mapIndex, reduceId);
}

@Override
Expand All @@ -50,14 +57,16 @@ protected Type type() {

@Override
public int hashCode() {
return Objects.hashCode(appId, blockId, index);
return Objects.hashCode(appId, shuffleId, mapIndex , reduceId, index);
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("appId", appId)
.add("blockId", blockId)
.add("streamId", shuffleId)
.add("mapIndex", mapIndex)
.add("reduceId", reduceId)
.add("index", index)
.toString();
}
Expand All @@ -67,29 +76,34 @@ public boolean equals(Object other) {
if (other != null && other instanceof PushBlockStream) {
PushBlockStream o = (PushBlockStream) other;
return Objects.equal(appId, o.appId)
&& Objects.equal(blockId, o.blockId)
&& shuffleId == o.shuffleId
&& mapIndex == o.mapIndex
&& reduceId == o.reduceId
&& index == o.index;
}
return false;
}

@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId)
+ Encoders.Strings.encodedLength(blockId) + 4;
return Encoders.Strings.encodedLength(appId) + 16;
}

@Override
public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
Encoders.Strings.encode(buf, blockId);
buf.writeInt(shuffleId);
buf.writeInt(mapIndex);
buf.writeInt(reduceId);
buf.writeInt(index);
}

public static PushBlockStream decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
String blockId = Encoders.Strings.decode(buf);
int streamId = buf.readInt();
int mapIdx = buf.readInt();
int reduceId = buf.readInt();
int index = buf.readInt();
return new PushBlockStream(appId, blockId, index);
return new PushBlockStream(appId, streamId, mapIdx, reduceId, index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,77 +45,77 @@ public class OneForOneBlockPusherSuite {
@Test
public void testPushOne() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("shuffle_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[1])));
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[1])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);

BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", "shuffle_0_0_0", 0)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0)));

verify(listener).onBlockFetchSuccess(eq("shuffle_0_0_0"), any());
verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
}

@Test
public void testPushThree() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
blocks.put("b2", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("shufflePush_0_1_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
blocks.put("shufflePush_0_2_0", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);

BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", "b0", 0),
new PushBlockStream("app-id", "b1", 1),
new PushBlockStream("app-id", "b2", 2)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));

for (int i = 0; i < 3; i ++) {
verify(listener, times(1)).onBlockFetchSuccess(eq("b" + i), any());
}
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any());
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_2_0"), any());
}

@Test
public void testServerFailures() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("shufflePush_0_1_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);

BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", "b0", 0),
new PushBlockStream("app-id", "b1", 1),
new PushBlockStream("app-id", "b2", 2)));
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));

verify(listener, times(1)).onBlockFetchSuccess(eq("b0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("b2"), any());
verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_2_0"), any());
}

@Test
public void testHandlingRetriableFailures() {
LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("b1", null);
blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
blocks.put("shufflePush_0_1_0", null);
blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);

BlockFetchingListener listener = pushBlocks(
blocks,
blockIds,
Arrays.asList(new PushBlockStream("app-id", "b0", 0),
new PushBlockStream("app-id", "b1", 1),
new PushBlockStream("app-id", "b2", 2)));

verify(listener, times(1)).onBlockFetchSuccess(eq("b0"), any());
verify(listener, times(0)).onBlockFetchSuccess(not(eq("b0")), any());
verify(listener, times(0)).onBlockFetchFailure(eq("b0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
verify(listener, times(2)).onBlockFetchFailure(eq("b2"), any());
Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
new PushBlockStream("app-id", 0, 1, 0, 1),
new PushBlockStream("app-id", 0, 2, 0, 2)));

verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any());
verify(listener, times(0)).onBlockFetchFailure(eq("shufflePush_0_0_0"), any());
verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
verify(listener, times(2)).onBlockFetchFailure(eq("shufflePush_0_2_0"), any());
}

/**
Expand Down
Loading