From 0cd0817b317dfb88cb452f389e564f8e9daf040c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 7 Aug 2014 14:51:48 -0700 Subject: [PATCH 1/4] fix bugs in deamon.py 1. do not use signal handler for SIGCHILD, it's easy to cause deadlock 2. handle EINTR during accept() 3. pass errno into JVM 4. handle EAGAIN during fork() Now, it can pass 50k tasks tests in 180 seconds. --- .../api/python/PythonWorkerFactory.scala | 2 +- python/pyspark/daemon.py | 75 +++++++++++-------- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 7af260d0b7f2..bf716a8ab025 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -68,7 +68,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val socket = new Socket(daemonHost, daemonPort) val pid = new DataInputStream(socket.getInputStream).readInt() if (pid < 0) { - throw new IllegalStateException("Python daemon failed to launch worker") + throw new IllegalStateException("Python daemon failed to launch worker with code " + pid) } daemonWorkers.put(socket, pid) socket diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index e73538baf0b9..702e23228995 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -22,7 +22,8 @@ import socket import sys import traceback -from errno import EINTR, ECHILD +import time +from errno import EINTR, ECHILD, EAGAIN from socket import AF_INET, SOCK_STREAM, SOMAXCONN from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN from pyspark.worker import main as worker_main @@ -80,6 +81,16 @@ def waitSocketClose(sock): os._exit(compute_real_exit_code(exit_code)) +# Cleanup zombie children +def cleanup_dead_children(): + try: + while True: + pid, _ = os.waitpid(0, os.WNOHANG) + if not pid: + break + except: + pass + def manager(): # Create a new process group to corral our children os.setpgid(0, 0) @@ -102,18 +113,6 @@ def handle_sigterm(*args): signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP - # Cleanup zombie children - def handle_sigchld(*args): - try: - pid, status = os.waitpid(0, os.WNOHANG) - if status != 0: - msg = "worker %s crashed abruptly with exit status %s" % (pid, status) - print >> sys.stderr, msg - except EnvironmentError as err: - if err.errno not in (ECHILD, EINTR): - raise - signal.signal(SIGCHLD, handle_sigchld) - # Initialization complete sys.stdout.close() try: @@ -125,6 +124,7 @@ def handle_sigchld(*args): continue else: raise + if 0 in ready_fds: try: worker_pid = read_int(sys.stdin) @@ -137,29 +137,44 @@ def handle_sigchld(*args): pass # process already died if listen_sock in ready_fds: - sock, addr = listen_sock.accept() + try: + sock, _ = listen_sock.accept() + except OSError as e: + if e.errno == EINTR: + continue + raise + + # cleanup in signal handler will cause deadlock + cleanup_dead_children() + # Launch a worker process try: pid = os.fork() - if pid == 0: - listen_sock.close() - try: - worker(sock) - except: - traceback.print_exc() - os._exit(1) - else: - os._exit(0) + except OSError as e: + if e.errno in (EAGAIN, EINTR): + time.sleep(1) + pid = os.fork() # error here will shutdown daemon else: + outfile = sock.makefile('w') + write_int(e.errno, outfile) # Signal that the fork failed + outfile.flush() + outfile.close() sock.close() - - except OSError as e: - print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e - outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) - write_int(-1, outfile) # Signal that the fork failed - outfile.flush() - outfile.close() + continue + + if pid == 0: + # in child process + listen_sock.close() + try: + worker(sock) + except: + traceback.print_exc() + os._exit(1) + else: + os._exit(0) + else: sock.close() + finally: shutdown(1) From 32cb8295b1365813627f6d2ef4b00349beadf599 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 7 Aug 2014 15:13:26 -0700 Subject: [PATCH 2/4] fix lint --- python/pyspark/daemon.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 702e23228995..fff6091d7c7d 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -91,6 +91,7 @@ def cleanup_dead_children(): except: pass + def manager(): # Create a new process group to corral our children os.setpgid(0, 0) @@ -153,7 +154,7 @@ def handle_sigterm(*args): except OSError as e: if e.errno in (EAGAIN, EINTR): time.sleep(1) - pid = os.fork() # error here will shutdown daemon + pid = os.fork() # error here will shutdown daemon else: outfile = sock.makefile('w') write_int(e.errno, outfile) # Signal that the fork failed From 03a2e8c9c31cc1dcf5b04612b85443c76953c9ce Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 7 Aug 2014 16:17:34 -0700 Subject: [PATCH 3/4] cleanup dead children every seconds --- python/pyspark/daemon.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index fff6091d7c7d..47004f8a3a33 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -119,13 +119,16 @@ def handle_sigterm(*args): try: while True: try: - ready_fds = select.select([0, listen_sock], [], [])[0] + ready_fds = select.select([0, listen_sock], [], [], 1)[0] except select.error as ex: if ex[0] == EINTR: continue else: raise + # cleanup in signal handler will cause deadlock + cleanup_dead_children() + if 0 in ready_fds: try: worker_pid = read_int(sys.stdin) @@ -145,8 +148,6 @@ def handle_sigterm(*args): continue raise - # cleanup in signal handler will cause deadlock - cleanup_dead_children() # Launch a worker process try: From f0ea45174b77516cc466ae469b2e73ef80435c06 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 7 Aug 2014 16:28:25 -0700 Subject: [PATCH 4/4] fix lint --- python/pyspark/daemon.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 47004f8a3a33..22ab8d30c0ae 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -148,7 +148,6 @@ def handle_sigterm(*args): continue raise - # Launch a worker process try: pid = os.fork()