-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-12078][Core]Fix ByteBuffer.limit misuse #10076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,14 +51,14 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul | |
| try { | ||
| val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { | ||
| case directResult: DirectTaskResult[_] => | ||
| if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { | ||
| if (!taskSetManager.canFetchMoreResults(serializedData.remaining())) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like it makes a unit test fail. I think you may have to check the size before the deserializer consumes the byte buffer? This is overall looking good but we probably have to comb through these a little more to think through the implications. |
||
| return | ||
| } | ||
| // deserialize "value" without holding any lock so that it won't block other threads. | ||
| // We should call it here, so that when it's called again in | ||
| // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value. | ||
| directResult.value() | ||
| (directResult, serializedData.limit()) | ||
| (directResult, serializedData.remaining()) | ||
| case IndirectTaskResult(blockId, size) => | ||
| if (!taskSetManager.canFetchMoreResults(size)) { | ||
| // dropped by executor if size is larger than maxResultSize | ||
|
|
@@ -105,7 +105,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul | |
| override def run(): Unit = Utils.logUncaughtExceptions { | ||
| val loader = Utils.getContextOrSparkClassLoader | ||
| try { | ||
| if (serializedData != null && serializedData.limit() > 0) { | ||
| if (serializedData != null && serializedData.remaining() > 0) { | ||
| reason = serializer.get().deserialize[TaskEndReason]( | ||
| serializedData, loader) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -522,12 +522,12 @@ private[spark] class BlockManager( | |
| /* We'll store the bytes in memory if the block's storage level includes | ||
| * "memory serialized", or if it should be cached as objects in memory | ||
| * but we only requested its serialized bytes. */ | ||
| memoryStore.putBytes(blockId, bytes.limit, () => { | ||
| memoryStore.putBytes(blockId, bytes.remaining(), () => { | ||
| // https://issues.apache.org/jira/browse/SPARK-6076 | ||
| // If the file size is bigger than the free memory, OOM will happen. So if we cannot | ||
| // put it into MemoryStore, copyForMemory should not be created. That's why this | ||
| // action is put into a `() => ByteBuffer` and created lazily. | ||
| val copyForMemory = ByteBuffer.allocate(bytes.limit) | ||
| val copyForMemory = ByteBuffer.allocate(bytes.remaining()) | ||
| copyForMemory.put(bytes) | ||
| }) | ||
| bytes.rewind() | ||
|
|
@@ -607,7 +607,7 @@ private[spark] class BlockManager( | |
| return Some(new BlockResult( | ||
| dataDeserialize(blockId, data), | ||
| DataReadMethod.Network, | ||
| data.limit())) | ||
| data.remaining())) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here -- does |
||
| } else { | ||
| return Some(data) | ||
| } | ||
|
|
@@ -951,10 +951,10 @@ private[spark] class BlockManager( | |
| try { | ||
| val onePeerStartTime = System.currentTimeMillis | ||
| data.rewind() | ||
| logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") | ||
| logTrace(s"Trying to replicate $blockId of ${data.remaining()} bytes to $peer") | ||
| blockTransferService.uploadBlockSync( | ||
| peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel) | ||
| logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms" | ||
| logTrace(s"Replicated $blockId of ${data.remaining()} bytes to $peer in %s ms" | ||
| .format(System.currentTimeMillis - onePeerStartTime)) | ||
| peersReplicatedTo += peer | ||
| peersForReplication -= peer | ||
|
|
@@ -977,7 +977,7 @@ private[spark] class BlockManager( | |
| } | ||
| } | ||
| val timeTakeMs = (System.currentTimeMillis - startTime) | ||
| logDebug(s"Replicating $blockId of ${data.limit()} bytes to " + | ||
| logDebug(s"Replicating $blockId of ${data.remaining()} bytes to " + | ||
| s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") | ||
| if (peersReplicatedTo.size < numPeersToReplicateTo) { | ||
| logWarning(s"Block $blockId replicated to only " + | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -110,7 +110,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId: | |
| val byteBuffer = bytes.duplicate() | ||
| byteBuffer.rewind() | ||
| externalBlockManager.get.putBytes(blockId, byteBuffer) | ||
| val size = bytes.limit() | ||
| val size = bytes.remaining() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an interesting one. The original |
||
| val data = if (returnValues) { | ||
| Right(bytes) | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,8 +96,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo | |
| putIterator(blockId, values, level, returnValues = true) | ||
| } else { | ||
| val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] | ||
| tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) | ||
| PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) | ||
| tryToPut(blockId, bytes, bytes.remaining(), deserialized = false, droppedBlocks) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same, |
||
| PutResult(bytes.remaining(), Right(bytes.duplicate()), droppedBlocks) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -114,7 +114,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo | |
| val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false, droppedBlocks) | ||
| val data = | ||
| if (putSuccess) { | ||
| assert(bytes.limit == size) | ||
| assert(bytes.remaining() == size) | ||
| Right(bytes.duplicate()) | ||
| } else { | ||
| null | ||
|
|
@@ -134,8 +134,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo | |
| PutResult(sizeEstimate, Left(values.iterator), droppedBlocks) | ||
| } else { | ||
| val bytes = blockManager.dataSerialize(blockId, values.iterator) | ||
| tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) | ||
| PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) | ||
| tryToPut(blockId, bytes, bytes.remaining(), deserialized = false, droppedBlocks) | ||
| PutResult(bytes.remaining(), Right(bytes.duplicate()), droppedBlocks) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that there's an implicit assumption in some of this code that the buffer's position is 0 on returning, and the entire buffer is filled with valid data. Do we have a situation where the position is not 0 though, but is correctly at the start of the data? at least, this looks like it handles the situation, but it sounds unusual. Equally, if that's an issue, are we sure the entire buffer has valid data, through the end? that assumption is still present here, that the end of the data is the end of the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a
ByteBufferis from Netty, the position could be a non-zero value.The
ByteBuffermay contain more data internally, but the user should only read the part betweenpositionandlimit. I think that's defined inByteBuffer/Bufferjavadoc.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found I was wrong about the position of
ByteBufferfrom Netty. Netty will callByteBuffer.sliceto reset the position to 0 before returning it: https://github.com/netty/netty/blob/0f9492c9affc528c766f9677952412564d4a3f6d/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java#L269I think we don't need this patch.