Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
compilation errors
  • Loading branch information
ericm-db committed Jul 29, 2025
commit 730c8548285ca71c84b19757ff3ee738d006a18e
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.spark.memory

import java.util.UUID
import java.util.concurrent.{ConcurrentHashMap, Executors, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import org.apache.spark.{SparkConf, SparkEnv, SparkIllegalArgumentException}
import org.apache.spark.{SparkConf, SparkIllegalArgumentException}
import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.Tests._
Expand Down Expand Up @@ -298,7 +296,7 @@ object UnifiedMemoryManager extends Logging {
* @param unmanagedMemoryConsumer The consumer to unregister. Only used in tests
*/
private[spark] def unregisterUnmanagedMemoryConsumer(
unmanagedMemoryConsumer: UnmanagedMemoryConsumer): Unit = {
unmanagedMemoryConsumer: UnmanagedMemoryConsumer): Unit = {
val id = unmanagedMemoryConsumer.unmanagedMemoryConsumerId
unmanagedMemoryConsumers.remove(id)
}
Expand Down Expand Up @@ -377,7 +375,7 @@ object UnifiedMemoryManager extends Logging {
log"is no longer active, marking for removal")
(userId, memoryUser, None) // Mark for removal
} else if (memoryUsed < 0L) {
logWarning(log"Invalid memory usage value ${MDC(LogKeys.BYTES, memoryUsed)} " +
logWarning(log"Invalid memory usage value ${MDC(LogKeys.NUM_BYTES, memoryUsed)} " +
log"from unmanaged memory user ${MDC(LogKeys.OBJECT_ID, userId.toString)}")
(userId, memoryUser, Some(0L)) // Treat as 0
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,8 @@ class RocksDB(
* @return Total memory usage in bytes across all tracked components
*/
def getMemoryUsage: Long = {
require(db != null && db.isOpen, "RocksDB must be open to get memory usage")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Let's remove this redundant empty line.

require(db != null && !db.isClosed, "RocksDB must be open to get memory usage")
RocksDB.mainMemorySources.map { memorySource =>
getDBProperty(memorySource)
}.sum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a one-liner? Maybe, the following style?

- RocksDB.mainMemorySources.map { memorySource =>
-   getDBProperty(memorySource)
- }.sum
+ RocksDB.mainMemorySources.map(getDBProperty).sum

Expand All @@ -1412,7 +1413,7 @@ class RocksDB(
val currentTime = System.currentTimeMillis()
val timeSinceLastUpdate = currentTime - lastMemoryUpdateTime.get()

if (timeSinceLastUpdate >= memoryUpdateIntervalMs && db != null && db.isOpen) {
if (timeSinceLastUpdate >= memoryUpdateIntervalMs && db != null && !db.isClosed) {
try {
val usage = getMemoryUsage
lastMemoryUpdateTime.set(currentTime)
Expand Down