Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-3151: follow @vanzin's style comments
  • Loading branch information
Eyal Farago committed Aug 15, 2017
commit f6fb9e9fd08a676d0c634d407edfe5be289deaff
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ private class DiskBlockData(
}

override def toByteBuffer(): ByteBuffer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will still hit the 2g limitation here, I'm wondering which end-to-end use cases are affected by it.

Copy link
Author

@eyalfa eyalfa Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed.
I chose to postpone the failure from DiskStroe.getBytes to this place as I believe it introduces no regression while still allowing the more common 'streaming' like use-case.

further more, I think this plays well with the comment about future deprecation of org.apache.spark.network.buffer.ManagedBuffer#nioByteBuffer which seems to be the main reason for BlockData exposing the toByteBuffer method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
it took me roughly 4 hours, but I looked both at the shuffle cod path and at BlockManager.getRemoteBytes:
it seems the first is robust to large blocks by using Netty's stream capabilities,
the later seems to be broken as it's not using the Netty's streaming capabilities and actually tries to copy the result buffer into a heap based buffer. I think this deserves its own JIRA/PR.
I think these two places plus the external shuffle server cover most of the relevant use cases (aside from local caching which i believe this PR completes in terms of being 2GB proof).

require( blockSize < Int.MaxValue
, s"can't create a byte buffer of size $blockSize"
+ s" since it exceeds Int.MaxValue ${Int.MaxValue}.")
require(blockSize < Int.MaxValue,
s"can't create a byte buffer of size $blockSize" +
s" since it exceeds Int.MaxValue ${Int.MaxValue}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it exceeds $maxMemoryMapBytes is more accurate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or call Utils.bytesToString to make it more readable.

Utils.tryWithResource(open()) { channel =>
if (blockSize < minMemoryMapBytes) {
// For small files, directly read rather than memory map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1438,7 +1438,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
case (a, b) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a === b?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't compare Arrays, you get identity equality which is usually not what you want. hence the .seq that forces it to be wrapped with a Seq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=== is a helper method in scala test and should be able to compare arrays

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if === does not work, you have Arrays.equal, which is null-safe.

a != null &&
b != null &&
a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq
a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq
})
}
val getResult = store.get(RDDBlockId(42, 0))
Expand All @@ -1448,7 +1448,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
case (a, b) =>
a != null &&
b != null &&
a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq
a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq
})
}
val getBlockRes = store.getBlockData(RDDBlockId(42, 0))
Expand All @@ -1464,7 +1464,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
case (a, b) =>
a != null &&
b != null &&
a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq
a.asInstanceOf[Array[Byte]].seq === b.asInstanceOf[Array[Byte]].seq
})
}
} finally {
Expand Down