Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2136,17 +2136,23 @@ class ImageReaderTest2(PySparkTestCase):
@classmethod
def setUpClass(cls):
super(ImageReaderTest2, cls).setUpClass()
cls.hive_available = True
# Note that here we enable Hive's support.
cls.spark = None
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.spark = HiveContext._createForTesting(cls.sc)
cls.hive_available = False
if cls.hive_available:
cls.spark = HiveContext._createForTesting(cls.sc)

def setUp(self):
if not self.hive_available:
self.skipTest("Hive is not available.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished test(python3): pyspark.sql.tests (51s) ... 93 tests were skipped

...

Skipped tests in pyspark.sql.tests with python3:
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
    ...
    test_collect_functions (pyspark.sql.tests.HiveContextSQLTests) ... skipped 'Hive is not available.'
    test_datetime_functions (pyspark.sql.tests.HiveContextSQLTests) ... skipped 'Hive is not available.'
    ...
    test_query_execution_listener_on_collect (pyspark.sql.tests.QueryExecutionListenerTests) ... skipped "'org.apache.spark.sql.TestQueryExecutionListener' is not available. Will skip the related tests."
    ...

@viirya, @bersprockets and @BryanCutler, these were the output from my partial testing in my local.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little worry that it can be too verbose when skipped tests are too many. See #21107 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume there is no way to only print out the skipped test class name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No clean way as far as I can tell. I should do another regex thing but .. I would like to avoid this way as possible as I can ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Yeah the current implementation looks good to me.


@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -2662,6 +2668,6 @@ def testDefaultFitMultiple(self):
if __name__ == "__main__":
from pyspark.ml.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)
4 changes: 2 additions & 2 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1767,9 +1767,9 @@ def test_pca(self):
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)
if not _have_scipy:
print("NOTE: SciPy tests were skipped as it does not seem to be installed")
sc.stop()
51 changes: 30 additions & 21 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3088,23 +3088,28 @@ def setUpClass(cls):
filename_pattern = (
"sql/core/target/scala-*/test-classes/org/apache/spark/sql/"
"TestQueryExecutionListener.class")
if not glob.glob(os.path.join(SPARK_HOME, filename_pattern)):
raise unittest.SkipTest(
cls.has_listener = bool(glob.glob(os.path.join(SPARK_HOME, filename_pattern)))

if cls.has_listener:
# Note that 'spark.sql.queryExecutionListeners' is a static immutable configuration.
cls.spark = SparkSession.builder \
.master("local[4]") \
.appName(cls.__name__) \
.config(
"spark.sql.queryExecutionListeners",
"org.apache.spark.sql.TestQueryExecutionListener") \
.getOrCreate()

def setUp(self):
if not self.has_listener:
raise self.skipTest(
"'org.apache.spark.sql.TestQueryExecutionListener' is not "
"available. Will skip the related tests.")

# Note that 'spark.sql.queryExecutionListeners' is a static immutable configuration.
cls.spark = SparkSession.builder \
.master("local[4]") \
.appName(cls.__name__) \
.config(
"spark.sql.queryExecutionListeners",
"org.apache.spark.sql.TestQueryExecutionListener") \
.getOrCreate()

@classmethod
def tearDownClass(cls):
cls.spark.stop()
if hasattr(cls, "spark"):
cls.spark.stop()

def tearDown(self):
self.spark._jvm.OnSuccessCall.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not called if the test is skipped during setUp right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

Expand Down Expand Up @@ -3188,18 +3193,22 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
cls.hive_available = True
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
os.unlink(cls.tempdir.name)
cls.spark = HiveContext._createForTesting(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
if cls.hive_available:
cls.spark = HiveContext._createForTesting(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()

def setUp(self):
if not self.hive_available:
self.skipTest("Hive is not available.")

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -5308,6 +5317,6 @@ def test_invalid_args(self):
if __name__ == "__main__":
from pyspark.sql.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)
4 changes: 2 additions & 2 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1590,11 +1590,11 @@ def search_kinesis_asl_assembly_jar():
sys.stderr.write("[Running %s]\n" % (testcase))
tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
if xmlrunner:
result = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=3).run(tests)
result = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2).run(tests)
if not result.wasSuccessful():
failed = True
else:
result = unittest.TextTestRunner(verbosity=3).run(tests)
result = unittest.TextTestRunner(verbosity=2).run(tests)
if not result.wasSuccessful():
failed = True
sys.exit(failed)
12 changes: 2 additions & 10 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2353,15 +2353,7 @@ def test_statcounter_array(self):

if __name__ == "__main__":
from pyspark.tests import *
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if not _have_numpy:
print("NOTE: Skipping NumPy tests as it does not seem to be installed")
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
if not _have_scipy:
print("NOTE: SciPy tests were skipped as it does not seem to be installed")
if not _have_numpy:
print("NOTE: NumPy tests were skipped as it does not seem to be installed")
unittest.main(verbosity=2)
115 changes: 51 additions & 64 deletions python/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
else:
import queue as Queue
from distutils.version import LooseVersion
from multiprocessing import Manager


# Append `SPARK_HOME/dev` to the Python path so that we can import the sparktestsupport module
Expand All @@ -50,6 +51,7 @@ def print_red(text):
print('\033[31m' + text + '\033[0m')


SKIPPED_TESTS = Manager().dict()
LOG_FILE = os.path.join(SPARK_HOME, "python/unit-tests.log")
FAILURE_REPORTING_LOCK = Lock()
LOGGER = logging.getLogger()
Expand Down Expand Up @@ -109,8 +111,34 @@ def run_individual_python_test(test_name, pyspark_python):
# this code is invoked from a thread other than the main thread.
os._exit(-1)
else:
per_test_output.close()
LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration)
skipped_counts = 0
try:
per_test_output.seek(0)
# Here expects skipped test output from unittest when verbosity level is
# 2 (or --verbose option is enabled).
decoded_lines = map(lambda line: line.decode(), iter(per_test_output))
skipped_tests = list(filter(
lambda line: re.search('test_.* \(pyspark\..*\) ... skipped ', line),
decoded_lines))
skipped_counts = len(skipped_tests)
if skipped_counts > 0:
key = (pyspark_python, test_name)
SKIPPED_TESTS[key] = skipped_tests
per_test_output.close()
except:
import traceback
print_red("\nGot an exception while trying to store "
"skipped test output:\n%s" % traceback.format_exc())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case anyone is worried:

Got an exception while trying to store skipped test output:
Traceback (most recent call last):
  File "./python/run-tests.py", line 116, in run_individual_python_test
    per_test_output.seek()
TypeError: seek() takes at least 1 argument (0 given)

# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if
# this code is invoked from a thread other than the main thread.
os._exit(-1)
if skipped_counts != 0:
LOGGER.info(
"Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, test_name,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's a better format. let me know.

duration, skipped_counts)
else:
LOGGER.info(
"Finished test(%s): %s (%is)", pyspark_python, test_name, duration)


def get_default_python_executables():
Expand Down Expand Up @@ -152,65 +180,17 @@ def parse_opts():
return opts


def _check_dependencies(python_exec, modules_to_test):
if "COVERAGE_PROCESS_START" in os.environ:
# Make sure if coverage is installed.
try:
subprocess_check_output(
[python_exec, "-c", "import coverage"],
stderr=open(os.devnull, 'w'))
except:
print_red("Coverage is not installed in Python executable '%s' "
"but 'COVERAGE_PROCESS_START' environment variable is set, "
"exiting." % python_exec)
sys.exit(-1)

# If we should test 'pyspark-sql', it checks if PyArrow and Pandas are installed and
# explicitly prints out. See SPARK-23300.
if pyspark_sql in modules_to_test:
# TODO(HyukjinKwon): Relocate and deduplicate these version specifications.
minimum_pyarrow_version = '0.8.0'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we checking dependencies now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now relaying on the existing checks in the tests. For example:

_pyarrow_requirement_message = None
try:
from pyspark.sql.utils import require_minimum_pyarrow_version
require_minimum_pyarrow_version()
except ImportError as e:
# If Arrow version requirement is not satisfied, skip related tests.
_pyarrow_requirement_message = _exception_message(e)

@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)

which prints out a skip message like:

    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 
'PyArrow >= 0.8.0 must be installed; however, it was not found.'

which I am capturing here with a regex pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Thanks for the explanation!

minimum_pandas_version = '0.19.2'

try:
pyarrow_version = subprocess_check_output(
[python_exec, "-c", "import pyarrow; print(pyarrow.__version__)"],
universal_newlines=True,
stderr=open(os.devnull, 'w')).strip()
if LooseVersion(pyarrow_version) >= LooseVersion(minimum_pyarrow_version):
LOGGER.info("Will test PyArrow related features against Python executable "
"'%s' in '%s' module." % (python_exec, pyspark_sql.name))
else:
LOGGER.warning(
"Will skip PyArrow related features against Python executable "
"'%s' in '%s' module. PyArrow >= %s is required; however, PyArrow "
"%s was found." % (
python_exec, pyspark_sql.name, minimum_pyarrow_version, pyarrow_version))
except:
LOGGER.warning(
"Will skip PyArrow related features against Python executable "
"'%s' in '%s' module. PyArrow >= %s is required; however, PyArrow "
"was not found." % (python_exec, pyspark_sql.name, minimum_pyarrow_version))

try:
pandas_version = subprocess_check_output(
[python_exec, "-c", "import pandas; print(pandas.__version__)"],
universal_newlines=True,
stderr=open(os.devnull, 'w')).strip()
if LooseVersion(pandas_version) >= LooseVersion(minimum_pandas_version):
LOGGER.info("Will test Pandas related features against Python executable "
"'%s' in '%s' module." % (python_exec, pyspark_sql.name))
else:
LOGGER.warning(
"Will skip Pandas related features against Python executable "
"'%s' in '%s' module. Pandas >= %s is required; however, Pandas "
"%s was found." % (
python_exec, pyspark_sql.name, minimum_pandas_version, pandas_version))
except:
LOGGER.warning(
"Will skip Pandas related features against Python executable "
"'%s' in '%s' module. Pandas >= %s is required; however, Pandas "
"was not found." % (python_exec, pyspark_sql.name, minimum_pandas_version))
def _check_coverage(python_exec):
# Make sure if coverage is installed.
try:
subprocess_check_output(
[python_exec, "-c", "import coverage"],
stderr=open(os.devnull, 'w'))
except:
print_red("Coverage is not installed in Python executable '%s' "
"but 'COVERAGE_PROCESS_START' environment variable is set, "
"exiting." % python_exec)
sys.exit(-1)


def main():
Expand All @@ -237,9 +217,10 @@ def main():

task_queue = Queue.PriorityQueue()
for python_exec in python_execs:
# Check if the python executable has proper dependencies installed to run tests
# for given modules properly.
_check_dependencies(python_exec, modules_to_test)
# Check if the python executable has coverage installed when 'COVERAGE_PROCESS_START'
# environmental variable is set.
if "COVERAGE_PROCESS_START" in os.environ:
_check_coverage(python_exec)

python_implementation = subprocess_check_output(
[python_exec, "-c", "import platform; print(platform.python_implementation())"],
Expand Down Expand Up @@ -281,6 +262,12 @@ def process_queue(task_queue):
total_duration = time.time() - start_time
LOGGER.info("Tests passed in %i seconds", total_duration)

for key, lines in sorted(SKIPPED_TESTS.items()):
pyspark_python, test_name = key
LOGGER.info("\nSkipped tests in %s with %s:" % (test_name, pyspark_python))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be too verbose to print all skipped tests? An option is to record them into LOG_FILE only. No strong preference here.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Apr 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I get it but I feel sure people felt it should be printed out in the console from previous discussions. I actually don't feel strongly too.

Anyway we could have only few of them eventually because most of them are by missing Pandas and Arrow so probably this could be fine.

No strong preference here too. let me know if other guys have a preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think print is fine for now. we could also change later.

for line in lines:
LOGGER.info(" %s" % line.rstrip())


if __name__ == "__main__":
main()