Skip to content

Conversation

@eejbyfeldt
Copy link
Contributor

What changes were proposed in this pull request?

Fix race condition in the class CachedRDDBuilder.

Why are the changes needed?

The previous code had a race condition that mean that cachedColumnBuffers could return null if another thread was concurrently was calling clearCache.

The bug is caused by us checking _cachedColumnBuffers and return it as two separate operations outside a synchronized block. So it possible for another thread to set it to null after the check but before the return.

java.lang.NullPointerException: null
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:156)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:98)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:84)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3459)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4208)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	... 23 more

Does this PR introduce any user-facing change?

Yes, fixes a race condition that can cause crashes.

How was this patch tested?

Adds a test that shows that reliably fails on the old code. Not sure if want to merge that style of test as it very specific for this bug and takes 3 seconds to run.

Was this patch authored or co-authored using generative AI tooling?

No.

The previous code had a race condition that mean that
`cachedColumnBuffers` could return `null` if another thread was
concurrently was calling `clearCache`.

The bug is caused by us checking _cachedColumnBuffers and return it as
two separate operations outside a synchronized block. So it possible for
another thread to set it to `null` after the check but before the
return.

```
java.lang.NullPointerException: null
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:156)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:98)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:84)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4218)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3459)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4208)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	... 23 more
```
@github-actions github-actions bot added the SQL label Dec 13, 2024
@eejbyfeldt
Copy link
Contributor Author

@cloud-fan since you were involved in #21018 where the problematic code was introduced.

@eejbyfeldt
Copy link
Contributor Author

An alternative fix could be to mark the entire method as synchronized if we do not think these are on a critical path.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 24, 2025
@SauronShepherd
Copy link

I'd like to have a look at it, when I have time.
@eejbyfeldt , is there any reason why you didn't continue working on this PR?

@github-actions github-actions bot closed this Mar 25, 2025
@eejbyfeldt
Copy link
Contributor Author

I'd like to have a look at it, when I have time. @eejbyfeldt , is there any reason why you didn't continue working on this PR?

It was just waiting for someone to review it. As far as I am aware it good to go as it is and it fixes a real race condition.

@SauronShepherd
Copy link

SauronShepherd commented Mar 25, 2025

I find this PR quite interesting indeed. It's a pity this hasn't gone ahead. Did you send a message to the dev mail list?

@eejbyfeldt
Copy link
Contributor Author

Sorry about the late response, but no I don't think I did.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants