-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-47565][PYTHON] PySpark worker pool crash resilience #45635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,7 +77,7 @@ private[spark] class PythonWorkerFactory( | |
| @GuardedBy("self") | ||
| private var daemonPort: Int = 0 | ||
| @GuardedBy("self") | ||
| private val daemonWorkers = new mutable.WeakHashMap[PythonWorker, Long]() | ||
| private val daemonWorkers = new mutable.WeakHashMap[PythonWorker, ProcessHandle]() | ||
| @GuardedBy("self") | ||
| private val idleWorkers = new mutable.Queue[PythonWorker]() | ||
| @GuardedBy("self") | ||
|
|
@@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory( | |
| def create(): (PythonWorker, Option[Long]) = { | ||
| if (useDaemon) { | ||
| self.synchronized { | ||
| if (idleWorkers.nonEmpty) { | ||
| // Pull from idle workers until we one that is alive, otherwise create a new one. | ||
| while (idleWorkers.nonEmpty) { | ||
|
||
| val worker = idleWorkers.dequeue() | ||
| worker.selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE) | ||
| return (worker, daemonWorkers.get(worker)) | ||
| val workerHandle = daemonWorkers(worker) | ||
sebastianhillig-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (workerHandle.isAlive()) { | ||
| try { | ||
| worker.selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE) | ||
| return (worker, Some(workerHandle.pid())) | ||
| } catch { | ||
| case c: CancelledKeyException => /* pass */ | ||
| } | ||
| } | ||
| logWarning(s"Worker ${worker} process from idle queue is dead, discarding.") | ||
| stopWorker(worker) | ||
| } | ||
| } | ||
| createThroughDaemon() | ||
|
|
@@ -121,15 +131,16 @@ private[spark] class PythonWorkerFactory( | |
| if (pid < 0) { | ||
| throw new IllegalStateException("Python daemon failed to launch worker with code " + pid) | ||
| } | ||
|
|
||
| val processHandle = ProcessHandle.of(pid).orElseThrow( | ||
| () => new IllegalStateException("Python daemon failed to launch worker.") | ||
| ) | ||
| authHelper.authToServer(socketChannel.socket()) | ||
| socketChannel.configureBlocking(false) | ||
| val selector = Selector.open() | ||
| val selectionKey = socketChannel.register(selector, | ||
| SelectionKey.OP_READ | SelectionKey.OP_WRITE) | ||
| val worker = PythonWorker(socketChannel, selector, selectionKey) | ||
|
|
||
| daemonWorkers.put(worker, pid) | ||
| daemonWorkers.put(worker, processHandle) | ||
| (worker, Some(pid)) | ||
| } | ||
|
|
||
|
|
@@ -391,10 +402,10 @@ private[spark] class PythonWorkerFactory( | |
| self.synchronized { | ||
| if (useDaemon) { | ||
| if (daemon != null) { | ||
| daemonWorkers.get(worker).foreach { pid => | ||
| daemonWorkers.get(worker).foreach { processHandle => | ||
| // tell daemon to kill worker by pid | ||
| val output = new DataOutputStream(daemon.getOutputStream) | ||
| output.writeLong(pid) | ||
| output.writeLong(processHandle.pid()) | ||
| output.flush() | ||
| daemon.getOutputStream.flush() | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.