Skip to content

Conversation

@pan3793
Copy link
Member

@pan3793 pan3793 commented Jun 17, 2024

What changes were proposed in this pull request?

Throw RuntimeException instead of SparkOutOfMemoryError when underlying calls throw InterruptedIOException in TaskMemoryManager#trySpillAndAcquire

Why are the changes needed?

A false SparkOutOfMemoryError case was identified in our production Spark jobs, and it is similar to SPARK-20250

2024-06-17 06:03:20 CST Executor INFO - Executor is trying to kill task 1580.1 in stage 48.0 (TID 59486), reason: another attempt succeeded
2024-06-17 06:03:20 CST TaskMemoryManager ERROR - error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7cfefcb7
java.io.InterruptedIOException: null
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:234) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:272) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:251) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.io.DataInputStream.readInt(DataInputStream.java:393) ~[?:?]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:80) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:626) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:204) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:227) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638) ~[?:?]
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:231) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	... 111 more
2024-06-17 06:03:21 CST Executor ERROR - Exception in task 1580.1 in stage 48.0 (TID 59486)
org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7cfefcb7 : null
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:253) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

Does this PR introduce any user-facing change?

Yes, the killing task on spilling won't report a false SparkOutOfMemoryError, so that the killed task status is KILLED instead of FAILED.

How was this patch tested?

Existing tests to ensure the change breaks nothing.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Jun 17, 2024
@pan3793
Copy link
Member Author

pan3793 commented Jun 17, 2024

cc @cloud-fan because this is similar to SPARK-20250

@cloud-fan
Copy link
Contributor

cc @Ngone51 @jiangxb1987

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

dongjoon-hyun pushed a commit that referenced this pull request Jun 17, 2024
…k on spilling

### What changes were proposed in this pull request?

Throw `RuntimeException` instead of `SparkOutOfMemoryError` when underlying calls throw `InterruptedIOException` in `TaskMemoryManager#trySpillAndAcquire`

### Why are the changes needed?

A false `SparkOutOfMemoryError` case was identified in our production Spark jobs, and it is similar to SPARK-20250

```
2024-06-17 06:03:20 CST Executor INFO - Executor is trying to kill task 1580.1 in stage 48.0 (TID 59486), reason: another attempt succeeded
2024-06-17 06:03:20 CST TaskMemoryManager ERROR - error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7
java.io.InterruptedIOException: null
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:234) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:272) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:251) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.io.DataInputStream.readInt(DataInputStream.java:393) ~[?:?]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:80) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:626) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:204) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:227) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638) ~[?:?]
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:231) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	... 111 more
2024-06-17 06:03:21 CST Executor ERROR - Exception in task 1580.1 in stage 48.0 (TID 59486)
org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7 : null
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:253) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
```

### Does this PR introduce _any_ user-facing change?

Yes, the killing task on spilling won't report a false `SparkOutOfMemoryError`, so that the killed task status is KILLED instead of FAILED.

### How was this patch tested?

Existing tests to ensure the change breaks nothing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47000 from pan3793/SPARK-48642.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 00a96bb)
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Jun 17, 2024
…k on spilling

### What changes were proposed in this pull request?

Throw `RuntimeException` instead of `SparkOutOfMemoryError` when underlying calls throw `InterruptedIOException` in `TaskMemoryManager#trySpillAndAcquire`

### Why are the changes needed?

A false `SparkOutOfMemoryError` case was identified in our production Spark jobs, and it is similar to SPARK-20250

```
2024-06-17 06:03:20 CST Executor INFO - Executor is trying to kill task 1580.1 in stage 48.0 (TID 59486), reason: another attempt succeeded
2024-06-17 06:03:20 CST TaskMemoryManager ERROR - error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7
java.io.InterruptedIOException: null
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:234) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:272) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:251) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.io.DataInputStream.readInt(DataInputStream.java:393) ~[?:?]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:80) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:626) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:204) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:227) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638) ~[?:?]
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:231) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	... 111 more
2024-06-17 06:03:21 CST Executor ERROR - Exception in task 1580.1 in stage 48.0 (TID 59486)
org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7 : null
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:253) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
```

### Does this PR introduce _any_ user-facing change?

Yes, the killing task on spilling won't report a false `SparkOutOfMemoryError`, so that the killed task status is KILLED instead of FAILED.

### How was this patch tested?

Existing tests to ensure the change breaks nothing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47000 from pan3793/SPARK-48642.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 00a96bb)
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member

Merged to master/3.5/3.4. Thank you, @pan3793 and all.

turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…k on spilling

### What changes were proposed in this pull request?

Throw `RuntimeException` instead of `SparkOutOfMemoryError` when underlying calls throw `InterruptedIOException` in `TaskMemoryManager#trySpillAndAcquire`

### Why are the changes needed?

A false `SparkOutOfMemoryError` case was identified in our production Spark jobs, and it is similar to SPARK-20250

```
2024-06-17 06:03:20 CST Executor INFO - Executor is trying to kill task 1580.1 in stage 48.0 (TID 59486), reason: another attempt succeeded
2024-06-17 06:03:20 CST TaskMemoryManager ERROR - error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7
java.io.InterruptedIOException: null
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:234) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:272) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:251) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.io.DataInputStream.readInt(DataInputStream.java:393) ~[?:?]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:80) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:626) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:204) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:227) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638) ~[?:?]
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:231) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	... 111 more
2024-06-17 06:03:21 CST Executor ERROR - Exception in task 1580.1 in stage 48.0 (TID 59486)
org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7 : null
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:253) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
```

### Does this PR introduce _any_ user-facing change?

Yes, the killing task on spilling won't report a false `SparkOutOfMemoryError`, so that the killed task status is KILLED instead of FAILED.

### How was this patch tested?

Existing tests to ensure the change breaks nothing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47000 from pan3793/SPARK-48642.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 00a96bb)
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants