Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/pyspark/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import struct
import sys
import threading
import unittest

from pyspark import SparkContext, SparkConf
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- encoding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -150,6 +151,20 @@ def test_with_different_versions_of_python(self):
finally:
self.sc.pythonVer = version

def test_python_exception_non_hanging(self):
# SPARK-21045: exceptions with no ascii encoding shall not hanging PySpark.
try:
def f():
raise Exception("exception with 中 and \xd6\xd0")

self.sc.parallelize([1]).map(lambda x: f()).count()
except Py4JJavaError as e:
if sys.version_info.major < 3:
# we have to use unicode here to avoid UnicodeDecodeError
self.assertRegexpMatches(unicode(e).encode("utf-8"), "exception with 中")
else:
self.assertRegexpMatches(str(e), "exception with 中")


class WorkerReuseTest(PySparkTestCase):

Expand Down
14 changes: 12 additions & 2 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from pyspark.util import _get_argspec, fail_on_stopiteration
from pyspark import shuffle

if sys.version >= '3':
if sys.version_info.major >= 3:
basestring = str
else:
from itertools import imap as map # use iterator map by default
Expand Down Expand Up @@ -598,8 +598,18 @@ def process():
process()
except Exception:
try:
exc_info = traceback.format_exc()
if sys.version_info.major < 3:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, let's drop this right after we drop Python 2, which I will do right after Spark 3.

if isinstance(exc_info, unicode):
exc_info = exc_info.encode("utf-8")
else:
# exc_info may contains other encoding bytes, replace the invalid byte and
# convert it back to utf-8 again
exc_info = exc_info.decode("utf-8", "replace").encode("utf-8")
else:
exc_info = exc_info.encode("utf-8")
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
write_with_length(exc_info, outfile)
except IOError:
# JVM close the socket
pass
Expand Down