Skip to content

Commit 0225d4c

Browse files
ericm-dbdongjoon-hyun
authored andcommitted
[SPARK-53001][CORE][SQL][FOLLOW-UP] Disable spark.memory.unmanagedMemoryPollingInterval by default
### What changes were proposed in this pull request? Follow up of [https://github.com/apache/spark/pull/51708](https://github.com/apache/spark/pull/51708), addressing nits and test feedback ### Why are the changes needed? To conform with Spark style standards and make tests less flaky ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #51778 from ericm-db/rocksdb-mm-followup. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 0cd2f90 commit 0225d4c

File tree

6 files changed

+95
-91
lines changed

6 files changed

+95
-91
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ package object config {
508508
"Setting this to 0 disables unmanaged memory polling.")
509509
.version("4.1.0")
510510
.timeConf(TimeUnit.MILLISECONDS)
511-
.createWithDefaultString("1s")
511+
.createWithDefaultString("0s")
512512

513513
private[spark] val STORAGE_UNROLL_MEMORY_THRESHOLD =
514514
ConfigBuilder("spark.storage.unrollMemoryThreshold")

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ object UnifiedMemoryManager extends Logging {
283283
* @param unmanagedMemoryConsumer The consumer to register for memory tracking
284284
*/
285285
def registerUnmanagedMemoryConsumer(
286-
unmanagedMemoryConsumer: UnmanagedMemoryConsumer): Unit = {
286+
unmanagedMemoryConsumer: UnmanagedMemoryConsumer): Unit = {
287287
val id = unmanagedMemoryConsumer.unmanagedMemoryConsumerId
288288
unmanagedMemoryConsumers.put(id, unmanagedMemoryConsumer)
289289
}
@@ -481,57 +481,3 @@ object UnifiedMemoryManager extends Logging {
481481
(usableMemory * memoryFraction).toLong
482482
}
483483
}
484-
485-
/**
486-
* Identifier for an unmanaged memory consumer.
487-
*
488-
* @param componentType The type of component (e.g., "RocksDB", "NativeLibrary")
489-
* @param instanceKey A unique key to identify this specific instance of the component.
490-
* For shared memory consumers, this should be a common key across
491-
* all instances to avoid double counting.
492-
*/
493-
case class UnmanagedMemoryConsumerId(
494-
componentType: String,
495-
instanceKey: String
496-
)
497-
498-
/**
499-
* Interface for components that consume memory outside of Spark's unified memory management.
500-
*
501-
* Components implementing this trait can register themselves with the memory manager
502-
* to have their memory usage tracked and factored into memory allocation decisions.
503-
* This helps prevent OOM errors when unmanaged components use significant memory.
504-
*
505-
* Examples of unmanaged memory consumers:
506-
* - RocksDB state stores in structured streaming
507-
* - Native libraries with custom memory allocation
508-
* - Off-heap caches managed outside of Spark
509-
*/
510-
trait UnmanagedMemoryConsumer {
511-
/**
512-
* Returns the unique identifier for this memory consumer.
513-
* The identifier is used to track and manage the consumer in the memory tracking system.
514-
*/
515-
def unmanagedMemoryConsumerId: UnmanagedMemoryConsumerId
516-
517-
/**
518-
* Returns the memory mode (ON_HEAP or OFF_HEAP) that this consumer uses.
519-
* This is used to ensure unmanaged memory usage only affects the correct memory pool.
520-
*/
521-
def memoryMode: MemoryMode
522-
523-
/**
524-
* Returns the current memory usage in bytes.
525-
*
526-
* This method is called periodically by the memory polling mechanism to track
527-
* memory usage over time. Implementations should return the current total memory
528-
* consumed by this component.
529-
*
530-
* @return Current memory usage in bytes. Should return 0 if no memory is currently used.
531-
* Return -1L to indicate this consumer is no longer active and should be
532-
* automatically removed from tracking.
533-
* @throws Exception if memory usage cannot be determined. The polling mechanism
534-
* will handle exceptions gracefully and log warnings.
535-
*/
536-
def getMemBytesUsed: Long
537-
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.memory
19+
20+
/**
21+
* Identifier for an unmanaged memory consumer.
22+
*
23+
* @param componentType The type of component (e.g., "RocksDB", "NativeLibrary")
24+
* @param instanceKey A unique key to identify this specific instance of the component.
25+
* For shared memory consumers, this should be a common key across
26+
* all instances to avoid double counting.
27+
*/
28+
case class UnmanagedMemoryConsumerId(
29+
componentType: String,
30+
instanceKey: String
31+
)
32+
33+
/**
34+
* Interface for components that consume memory outside of Spark's unified memory management.
35+
*
36+
* Components implementing this trait can register themselves with the memory manager
37+
* to have their memory usage tracked and factored into memory allocation decisions.
38+
* This helps prevent OOM errors when unmanaged components use significant memory.
39+
*
40+
* Examples of unmanaged memory consumers:
41+
* - RocksDB state stores in structured streaming
42+
* - Native libraries with custom memory allocation
43+
* - Off-heap caches managed outside of Spark
44+
*/
45+
trait UnmanagedMemoryConsumer {
46+
/**
47+
* Returns the unique identifier for this memory consumer.
48+
* The identifier is used to track and manage the consumer in the memory tracking system.
49+
*/
50+
def unmanagedMemoryConsumerId: UnmanagedMemoryConsumerId
51+
52+
/**
53+
* Returns the memory mode (ON_HEAP or OFF_HEAP) that this consumer uses.
54+
* This is used to ensure unmanaged memory usage only affects the correct memory pool.
55+
*/
56+
def memoryMode: MemoryMode
57+
58+
/**
59+
* Returns the current memory usage in bytes.
60+
*
61+
* This method is called periodically by the memory polling mechanism to track
62+
* memory usage over time. Implementations should return the current total memory
63+
* consumed by this component.
64+
*
65+
* @return Current memory usage in bytes. Should return 0 if no memory is currently used.
66+
* Return -1L to indicate this consumer is no longer active and should be
67+
* automatically removed from tracking.
68+
* @throws Exception if memory usage cannot be determined. The polling mechanism
69+
* will handle exceptions gracefully and log warnings.
70+
*/
71+
def getMemBytesUsed: Long
72+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,11 +1454,8 @@ class RocksDB(
14541454
* @return Total memory usage in bytes across all tracked components
14551455
*/
14561456
def getMemoryUsage: Long = {
1457-
14581457
require(db != null && !db.isClosed, "RocksDB must be open to get memory usage")
1459-
RocksDB.mainMemorySources.map { memorySource =>
1460-
getDBProperty(memorySource)
1461-
}.sum
1458+
RocksDB.mainMemorySources.map(getDBProperty).sum
14621459
}
14631460

14641461
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager, UnmanagedMemor
3636
* UnifiedMemoryManager, allowing Spark to account for RocksDB memory when making
3737
* memory allocation decisions.
3838
*/
39-
object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer{
39+
object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer {
4040
private var writeBufferManager: WriteBufferManager = null
4141
private var cache: Cache = null
4242

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import java.io.File
2121

2222
import scala.jdk.CollectionConverters.SetHasAsScala
2323

24-
import org.scalatest.time.{Minute, Span}
24+
import org.scalatest.time.{Millis, Minute, Seconds, Span}
2525

26+
import org.apache.spark.memory.UnifiedMemoryManager
2627
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
2728
import org.apache.spark.sql.functions.{count, max}
2829
import org.apache.spark.sql.internal.SQLConf
@@ -332,9 +333,6 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
332333
("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage" ->
333334
boundedMemoryEnabled.toString)) {
334335

335-
import org.apache.spark.memory.UnifiedMemoryManager
336-
import org.apache.spark.sql.streaming.Trigger
337-
338336
// Use rate stream to ensure continuous state operations that trigger memory updates
339337
val query = spark.readStream
340338
.format("rate")
@@ -350,38 +348,29 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
350348
.start()
351349

352350
try {
353-
// Let the stream run to establish RocksDB instances and generate state operations
354-
Thread.sleep(2000) // 2 seconds should be enough for several processing cycles
355-
356-
// Now check for memory tracking - the continuous stream should trigger memory updates
357-
var rocksDBMemory = 0L
358-
var attempts = 0
359-
val maxAttempts = 15 // 15 attempts with 1-second intervals = 15 seconds max
360-
361-
while (rocksDBMemory <= 0L && attempts < maxAttempts) {
362-
Thread.sleep(1000) // Wait between checks to allow memory updates
363-
rocksDBMemory = UnifiedMemoryManager.getMemoryByComponentType("RocksDB")
364-
attempts += 1
365-
366-
if (rocksDBMemory > 0L) {
367-
logInfo(s"RocksDB memory detected: $rocksDBMemory bytes " +
368-
s"after $attempts attempts with boundedMemory=$boundedMemoryEnabled")
369-
}
351+
// Check for memory tracking - the continuous stream should trigger memory updates
352+
var initialRocksDBMemory = 0L
353+
eventually(timeout(Span(20, Seconds)), interval(Span(500, Millis))) {
354+
initialRocksDBMemory = UnifiedMemoryManager.getMemoryByComponentType("RocksDB")
355+
assert(initialRocksDBMemory > 0L,
356+
s"RocksDB memory should be tracked with boundedMemory=$boundedMemoryEnabled")
370357
}
371358

359+
logInfo(s"RocksDB memory detected: $initialRocksDBMemory bytes " +
360+
s"with boundedMemory=$boundedMemoryEnabled")
361+
372362
// Verify memory tracking remains stable during continued operation
373-
Thread.sleep(2000) // Let stream continue running
363+
eventually(timeout(Span(5, Seconds)), interval(Span(500, Millis))) {
364+
val currentMemory = UnifiedMemoryManager.getMemoryByComponentType("RocksDB")
365+
assert(currentMemory > 0L,
366+
s"RocksDB memory tracking should remain active during stream processing: " +
367+
s"got $currentMemory bytes (initial: $initialRocksDBMemory) " +
368+
s"with boundedMemory=$boundedMemoryEnabled")
369+
}
374370

375371
val finalMemory = UnifiedMemoryManager.getMemoryByComponentType("RocksDB")
376-
377-
// Memory should still be tracked (allow for some fluctuation)
378-
assert(finalMemory > 0L,
379-
s"RocksDB memory tracking should remain active during stream processing: " +
380-
s"got $finalMemory bytes (initial: $rocksDBMemory) " +
381-
s"with boundedMemory=$boundedMemoryEnabled")
382-
383372
logInfo(s"RocksDB memory tracking test completed successfully: " +
384-
s"initial=$rocksDBMemory bytes, final=$finalMemory bytes " +
373+
s"initial=$initialRocksDBMemory bytes, final=$finalMemory bytes " +
385374
s"with boundedMemory=$boundedMemoryEnabled")
386375

387376
} finally {

0 commit comments

Comments
 (0)