Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix bugs in deamon.py
1. do not use signal handler for SIGCHILD, it's easy to cause deadlock
2. handle EINTR during accept()
3. pass errno into JVM
4. handle EAGAIN during fork()

Now, it can pass 50k tasks tests in 180 seconds.
  • Loading branch information
davies committed Aug 7, 2014
commit 0cd0817b317dfb88cb452f389e564f8e9daf040c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val socket = new Socket(daemonHost, daemonPort)
val pid = new DataInputStream(socket.getInputStream).readInt()
if (pid < 0) {
throw new IllegalStateException("Python daemon failed to launch worker")
throw new IllegalStateException("Python daemon failed to launch worker with code " + pid)
}
daemonWorkers.put(socket, pid)
socket
Expand Down
75 changes: 45 additions & 30 deletions python/pyspark/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import socket
import sys
import traceback
from errno import EINTR, ECHILD
import time
from errno import EINTR, ECHILD, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
from pyspark.worker import main as worker_main
Expand Down Expand Up @@ -80,6 +81,16 @@ def waitSocketClose(sock):
os._exit(compute_real_exit_code(exit_code))


# Cleanup zombie children
def cleanup_dead_children():
try:
while True:
pid, _ = os.waitpid(0, os.WNOHANG)
if not pid:
break
except:
pass

def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)
Expand All @@ -102,18 +113,6 @@ def handle_sigterm(*args):
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP

# Cleanup zombie children
def handle_sigchld(*args):
try:
pid, status = os.waitpid(0, os.WNOHANG)
if status != 0:
msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
print >> sys.stderr, msg
except EnvironmentError as err:
if err.errno not in (ECHILD, EINTR):
raise
signal.signal(SIGCHLD, handle_sigchld)

# Initialization complete
sys.stdout.close()
try:
Expand All @@ -125,6 +124,7 @@ def handle_sigchld(*args):
continue
else:
raise

if 0 in ready_fds:
try:
worker_pid = read_int(sys.stdin)
Expand All @@ -137,29 +137,44 @@ def handle_sigchld(*args):
pass # process already died

if listen_sock in ready_fds:
sock, addr = listen_sock.accept()
try:
sock, _ = listen_sock.accept()
except OSError as e:
if e.errno == EINTR:
continue
raise

# cleanup in signal handler will cause deadlock
cleanup_dead_children()

# Launch a worker process
try:
pid = os.fork()
if pid == 0:
listen_sock.close()
try:
worker(sock)
except:
traceback.print_exc()
os._exit(1)
else:
os._exit(0)
except OSError as e:
if e.errno in (EAGAIN, EINTR):
time.sleep(1)
pid = os.fork() # error here will shutdown daemon
else:
outfile = sock.makefile('w')
write_int(e.errno, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
sock.close()

except OSError as e:
print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
write_int(-1, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
continue

if pid == 0:
# in child process
listen_sock.close()
try:
worker(sock)
except:
traceback.print_exc()
os._exit(1)
else:
os._exit(0)
else:
sock.close()

finally:
shutdown(1)

Expand Down