Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
review feedback
  • Loading branch information
squito committed May 29, 2018
commit a9cfe294b15b2c9675d074645865fa403285d1d2
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ private[spark] class BlockManager(
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than memory-mapping the file.
// Until then, replication can go cause the process to use too much memory and get killed
// Until then, replication can cause the process to use too much memory and get killed
// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though
// we've read the data to disk.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this fix is such low-hanging fruit that I would try to do this immediately afterwards. (I haven't filed a jira yet just because there are already so many defunct jira related to this, I was going to wait till my changes got some traction).

I think its OK to get it in like this first, as this makes the behavior for 2.01 gb basically the same as it was for 1.99 gb.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this goes in shortly -- anybody interested in picking up this TODO? maybe @Ngone51 or @NiharS ?

Copy link
Contributor

@jerryshao jerryshao Jul 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a java OOM, since its a memory-mapped file

I'm not sure why memory-mapped file will cause too much memory? AFAIK memory mapping is a lazy loading mechanism in page-wise, system will only load the to-be-accessed file segment to memory page, not the whole file to memory. So from my understanding even very small physical memory could map a super large file. Memory mapping will not occupy too much memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest I don't have perfect understanding of this, but my impression is that it is not exactly lazy loading, the OS has a lot of leeway in deciding how much to keep in memory, but that it should always release the memory under pressure. this is problematic under yarn, when the container's memory use is being monitored independently of the OS. so the OS thinks its fine to put large amounts of data in physical memory, but then the yarn NM looks at the memory use of the specific process tree, decides its over the limits it has configured, and so kills it.

At least, I've seen cases of yarn killing things for exceeding memory limits where I thought that was the case, though I did not directly confirm it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I agree with you that YARN could have some issues in calculating the exact memory usage.

logDebug(s"Getting remote block $blockId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ private[io] class ChunkedByteBufferFileRegion(
private val chunks = chunkedByteBuffer.getChunks()
private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use foldLeft(0) { blah } + avoid the intermediate val?

private val size = cumLength.last
// Chunk size in bytes

protected def deallocate: Unit = {}

override def count(): Long = chunkedByteBuffer.size
override def count(): Long = size

// this is the "start position" of the overall Data in the backing file, not our current position
override def position(): Long = 0
Expand Down Expand Up @@ -73,7 +72,6 @@ private[io] class ChunkedByteBufferFileRegion(
var keepGoing = true
var written = 0L
var currentChunk = chunks(currentChunkIdx)
var originalLimit = currentChunk.limit()
while (keepGoing) {
while (currentChunk.hasRemaining && keepGoing) {
val ioSize = Math.min(currentChunk.remaining(), ioChunkSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar
SparkEnv.set(null)
}

private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = {
private def generateChunkedByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = {
val bytes = (0 until nChunks).map { chunkIdx =>
val bb = ByteBuffer.allocate(perChunk)
(0 until perChunk).foreach { idx =>
Expand All @@ -58,7 +58,7 @@ class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar

test("transferTo can stop and resume correctly") {
SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
val cbb = generateChunkByteBuffer(4, 10)
val cbb = generateChunkedByteBuffer(4, 10)
val fileRegion = cbb.toNetty

val targetChannel = new LimitedWritableByteChannel(40)
Expand Down Expand Up @@ -111,7 +111,7 @@ class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar
val chunkSize = 1e4.toInt
SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong)

val cbb = generateChunkByteBuffer(50, chunkSize)
val cbb = generateChunkedByteBuffer(50, chunkSize)
val fileRegion = cbb.toNetty
val transferLimit = 1e5.toInt
val targetChannel = new LimitedWritableByteChannel(transferLimit)
Expand All @@ -134,7 +134,6 @@ class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar
var pos = 0

override def write(src: ByteBuffer): Int = {
val origSrcPos = src.position()
val length = math.min(acceptNBytes, src.remaining())
src.get(bytes, 0, length)
Copy link
Member

@Ngone51 Ngone51 May 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We override bytes array's previously written data ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is just test code, we're just checking the data that gets written is what we expect (which we know based on the absolute position). Really, I could read just one byte at a time and check that is it the right data, but it seemed a little easier this way.

acceptNBytes -= length
Expand Down