Commit 20eea20
[SPARK-22982] Remove unsafe asynchronous close() call from FileDownloadChannel
## What changes were proposed in this pull request?
This patch fixes a severe asynchronous IO bug in Spark's Netty-based file transfer code. At a high-level, the problem is that an unsafe asynchronous `close()` of a pipe's source channel creates a race condition where file transfer code closes a file descriptor then attempts to read from it. If the closed file descriptor's number has been reused by an `open()` call then this invalid read may cause unrelated file operations to return incorrect results. **One manifestation of this problem is incorrect query results.**
For a high-level overview of how file download works, take a look at the control flow in `NettyRpcEnv.openChannel()`: this code creates a pipe to buffer results, then submits an asynchronous stream request to a lower-level TransportClient. The callback passes received data to the sink end of the pipe. The source end of the pipe is passed back to the caller of `openChannel()`. Thus `openChannel()` returns immediately and callers interact with the returned pipe source channel.
Because the underlying stream request is asynchronous, errors may occur after `openChannel()` has returned and after that method's caller has started to `read()` from the returned channel. For example, if a client requests an invalid stream from a remote server then the "stream does not exist" error may not be received from the remote server until after `openChannel()` has returned. In order to be able to propagate the "stream does not exist" error to the file-fetching application thread, this code wraps the pipe's source channel in a special `FileDownloadChannel` which adds an `setError(t: Throwable)` method, then calls this `setError()` method in the FileDownloadCallback's `onFailure` method.
It is possible for `FileDownloadChannel`'s `read()` and `setError()` methods to be called concurrently from different threads: the `setError()` method is called from within the Netty RPC system's stream callback handlers, while the `read()` methods are called from higher-level application code performing remote stream reads.
The problem lies in `setError()`: the existing code closed the wrapped pipe source channel. Because `read()` and `setError()` occur in different threads, this means it is possible for one thread to be calling `source.read()` while another asynchronously calls `source.close()`. Java's IO libraries do not guarantee that this will be safe and, in fact, it's possible for these operations to interleave in such a way that a lower-level `read()` system call occurs right after a `close()` call. In the best-case, this fails as a read of a closed file descriptor; in the worst-case, the file descriptor number has been re-used by an intervening `open()` operation and the read corrupts the result of an unrelated file IO operation being performed by a different thread.
The solution here is to remove the `stream.close()` call in `onError()`: the thread that is performing the `read()` calls is responsible for closing the stream in a `finally` block, so there's no need to close it here. If that thread is blocked in a `read()` then it will become unblocked when the sink end of the pipe is closed in `FileDownloadCallback.onFailure()`.
After making this change, we also need to refine the `read()` method to always check for a `setError()` result, even if the underlying channel `read()` call has succeeded.
This patch also makes a slight cleanup to a dodgy-looking `catch e: Exception` block to use a safer `try-finally` error handling idiom.
This bug was introduced in SPARK-11956 / #9941 and is present in Spark 1.6.0+.
## How was this patch tested?
This fix was tested manually against a workload which non-deterministically hit this bug.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #20179 from JoshRosen/SPARK-22982-fix-unsafe-async-io-in-file-download-channel.
(cherry picked from commit edf0a48)
Signed-off-by: Sean Owen <sowen@cloudera.com>1 parent acab4e7 commit 20eea20
File tree
2 files changed
+39
-19
lines changed- core/src/main/scala/org/apache/spark
- rpc/netty
- shuffle
2 files changed
+39
-19
lines changedLines changed: 22 additions & 15 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
331 | 331 | | |
332 | 332 | | |
333 | 333 | | |
334 | | - | |
| 334 | + | |
335 | 335 | | |
336 | 336 | | |
337 | 337 | | |
338 | | - | |
339 | | - | |
340 | | - | |
341 | | - | |
342 | | - | |
343 | | - | |
| 338 | + | |
| 339 | + | |
| 340 | + | |
| 341 | + | |
344 | 342 | | |
345 | 343 | | |
346 | 344 | | |
| |||
369 | 367 | | |
370 | 368 | | |
371 | 369 | | |
372 | | - | |
| 370 | + | |
373 | 371 | | |
374 | 372 | | |
375 | 373 | | |
376 | 374 | | |
| 375 | + | |
| 376 | + | |
| 377 | + | |
| 378 | + | |
| 379 | + | |
| 380 | + | |
| 381 | + | |
| 382 | + | |
| 383 | + | |
377 | 384 | | |
378 | | - | |
379 | 385 | | |
380 | 386 | | |
381 | 387 | | |
382 | 388 | | |
| 389 | + | |
| 390 | + | |
| 391 | + | |
| 392 | + | |
| 393 | + | |
| 394 | + | |
383 | 395 | | |
384 | | - | |
385 | | - | |
386 | | - | |
387 | | - | |
388 | | - | |
389 | | - | |
| 396 | + | |
390 | 397 | | |
391 | 398 | | |
392 | 399 | | |
| |||
Lines changed: 17 additions & 4 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
18 | 18 | | |
19 | 19 | | |
20 | 20 | | |
21 | | - | |
22 | | - | |
| 21 | + | |
| 22 | + | |
23 | 23 | | |
24 | 24 | | |
25 | 25 | | |
| |||
196 | 196 | | |
197 | 197 | | |
198 | 198 | | |
199 | | - | |
| 199 | + | |
| 200 | + | |
| 201 | + | |
| 202 | + | |
| 203 | + | |
| 204 | + | |
| 205 | + | |
| 206 | + | |
| 207 | + | |
200 | 208 | | |
201 | | - | |
202 | 209 | | |
203 | 210 | | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
| 214 | + | |
| 215 | + | |
| 216 | + | |
204 | 217 | | |
205 | 218 | | |
206 | 219 | | |
| |||
0 commit comments