diff --git a/python/pyspark/sql/tests/test_pandas_udf.py b/python/pyspark/sql/tests/test_pandas_udf.py index c4b5478a7e89..d4d9679649ee 100644 --- a/python/pyspark/sql/tests/test_pandas_udf.py +++ b/python/pyspark/sql/tests/test_pandas_udf.py @@ -17,12 +17,16 @@ import unittest +from pyspark.sql.functions import udf, pandas_udf, PandasUDFType from pyspark.sql.types import * from pyspark.sql.utils import ParseException +from pyspark.rdd import PythonEvalType from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \ pandas_requirement_message, pyarrow_requirement_message from pyspark.testing.utils import QuietTest +from py4j.protocol import Py4JJavaError + @unittest.skipIf( not have_pandas or not have_pyarrow, @@ -30,9 +34,6 @@ class PandasUDFTests(ReusedSQLTestCase): def test_pandas_udf_basic(self): - from pyspark.rdd import PythonEvalType - from pyspark.sql.functions import pandas_udf, PandasUDFType - udf = pandas_udf(lambda x: x, DoubleType()) self.assertEqual(udf.returnType, DoubleType()) self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF) @@ -65,10 +66,6 @@ def test_pandas_udf_basic(self): self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) def test_pandas_udf_decorator(self): - from pyspark.rdd import PythonEvalType - from pyspark.sql.functions import pandas_udf, PandasUDFType - from pyspark.sql.types import StructType, StructField, DoubleType - @pandas_udf(DoubleType()) def foo(x): return x @@ -114,8 +111,6 @@ def foo(x): self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) def test_udf_wrong_arg(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - with QuietTest(self.sc): with self.assertRaises(ParseException): @pandas_udf('blah') @@ -151,9 +146,6 @@ def foo(k, v, w): return k def test_stopiteration_in_udf(self): - from pyspark.sql.functions import udf, pandas_udf, PandasUDFType - from py4j.protocol import Py4JJavaError - def foo(x): raise StopIteration() diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py index 5383704434c8..18264ead2fd0 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py @@ -17,6 +17,9 @@ import unittest +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import array, explode, col, lit, mean, sum, \ + udf, pandas_udf, PandasUDFType from pyspark.sql.types import * from pyspark.sql.utils import AnalysisException from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \ @@ -31,7 +34,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase): @property def data(self): - from pyspark.sql.functions import array, explode, col, lit return self.spark.range(10).toDF('id') \ .withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ .withColumn("v", explode(col('vs'))) \ @@ -40,8 +42,6 @@ def data(self): @property def python_plus_one(self): - from pyspark.sql.functions import udf - @udf('double') def plus_one(v): assert isinstance(v, (int, float)) @@ -51,7 +51,6 @@ def plus_one(v): @property def pandas_scalar_plus_two(self): import pandas as pd - from pyspark.sql.functions import pandas_udf, PandasUDFType @pandas_udf('double', PandasUDFType.SCALAR) def plus_two(v): @@ -61,8 +60,6 @@ def plus_two(v): @property def pandas_agg_mean_udf(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - @pandas_udf('double', PandasUDFType.GROUPED_AGG) def avg(v): return v.mean() @@ -70,8 +67,6 @@ def avg(v): @property def pandas_agg_sum_udf(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - @pandas_udf('double', PandasUDFType.GROUPED_AGG) def sum(v): return v.sum() @@ -80,7 +75,6 @@ def sum(v): @property def pandas_agg_weighted_mean_udf(self): import numpy as np - from pyspark.sql.functions import pandas_udf, PandasUDFType @pandas_udf('double', PandasUDFType.GROUPED_AGG) def weighted_mean(v, w): @@ -88,8 +82,6 @@ def weighted_mean(v, w): return weighted_mean def test_manual(self): - from pyspark.sql.functions import pandas_udf, array - df = self.data sum_udf = self.pandas_agg_sum_udf mean_udf = self.pandas_agg_mean_udf @@ -118,8 +110,6 @@ def test_manual(self): self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) def test_basic(self): - from pyspark.sql.functions import col, lit, mean - df = self.data weighted_mean_udf = self.pandas_agg_weighted_mean_udf @@ -150,9 +140,6 @@ def test_basic(self): self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) def test_unsupported_types(self): - from pyspark.sql.types import DoubleType, MapType - from pyspark.sql.functions import pandas_udf, PandasUDFType - with QuietTest(self.sc): with self.assertRaisesRegexp(NotImplementedError, 'not supported'): pandas_udf( @@ -173,8 +160,6 @@ def mean_and_std_udf(v): return {v.mean(): v.std()} def test_alias(self): - from pyspark.sql.functions import mean - df = self.data mean_udf = self.pandas_agg_mean_udf @@ -187,8 +172,6 @@ def test_mixed_sql(self): """ Test mixing group aggregate pandas UDF with sql expression. """ - from pyspark.sql.functions import sum - df = self.data sum_udf = self.pandas_agg_sum_udf @@ -225,8 +208,6 @@ def test_mixed_udfs(self): """ Test mixing group aggregate pandas UDF with python UDF and scalar pandas UDF. """ - from pyspark.sql.functions import sum - df = self.data plus_one = self.python_plus_one plus_two = self.pandas_scalar_plus_two @@ -292,8 +273,6 @@ def test_multiple_udfs(self): """ Test multiple group aggregate pandas UDFs in one agg function. """ - from pyspark.sql.functions import sum, mean - df = self.data mean_udf = self.pandas_agg_mean_udf sum_udf = self.pandas_agg_sum_udf @@ -315,8 +294,6 @@ def test_multiple_udfs(self): self.assertPandasEqual(expected1, result1) def test_complex_groupby(self): - from pyspark.sql.functions import sum - df = self.data sum_udf = self.pandas_agg_sum_udf plus_one = self.python_plus_one @@ -359,8 +336,6 @@ def test_complex_groupby(self): self.assertPandasEqual(expected7.toPandas(), result7.toPandas()) def test_complex_expressions(self): - from pyspark.sql.functions import col, sum - df = self.data plus_one = self.python_plus_one plus_two = self.pandas_scalar_plus_two @@ -434,7 +409,6 @@ def test_complex_expressions(self): self.assertPandasEqual(expected3, result3) def test_retain_group_columns(self): - from pyspark.sql.functions import sum with self.sql_conf({"spark.sql.retainGroupColumns": False}): df = self.data sum_udf = self.pandas_agg_sum_udf @@ -444,8 +418,6 @@ def test_retain_group_columns(self): self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) def test_array_type(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - df = self.data array_udf = pandas_udf(lambda x: [1.0, 2.0], 'array', PandasUDFType.GROUPED_AGG) @@ -453,8 +425,6 @@ def test_array_type(self): self.assertEquals(result1.first()['v2'], [1.0, 2.0]) def test_invalid_args(self): - from pyspark.sql.functions import mean - df = self.data plus_one = self.python_plus_one mean_udf = self.pandas_agg_mean_udf @@ -478,9 +448,6 @@ def test_invalid_args(self): df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() def test_register_vectorized_udf_basic(self): - from pyspark.sql.functions import pandas_udf - from pyspark.rdd import PythonEvalType - sum_pandas_udf = pandas_udf( lambda v: v.sum(), "integer", PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF) diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index a12c608dff9d..80e70349b78d 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -18,7 +18,12 @@ import datetime import unittest +from collections import OrderedDict +from decimal import Decimal +from distutils.version import LooseVersion + from pyspark.sql import Row +from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType from pyspark.sql.types import * from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \ pandas_requirement_message, pyarrow_requirement_message @@ -32,16 +37,12 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase): @property def data(self): - from pyspark.sql.functions import array, explode, col, lit return self.spark.range(10).toDF('id') \ .withColumn("vs", array([lit(i) for i in range(20, 30)])) \ .withColumn("v", explode(col('vs'))).drop('vs') def test_supported_types(self): - from decimal import Decimal - from distutils.version import LooseVersion import pyarrow as pa - from pyspark.sql.functions import pandas_udf, PandasUDFType values = [ 1, 2, 3, @@ -131,8 +132,6 @@ def test_supported_types(self): self.assertPandasEqual(expected3, result3) def test_array_type_correct(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col - df = self.data.withColumn("arr", array(col("id"))).repartition(1, "id") output_schema = StructType( @@ -151,8 +150,6 @@ def test_array_type_correct(self): self.assertPandasEqual(expected, result) def test_register_grouped_map_udf(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP) with QuietTest(self.sc): with self.assertRaisesRegexp( @@ -161,7 +158,6 @@ def test_register_grouped_map_udf(self): self.spark.catalog.registerFunction("foo_udf", foo_udf) def test_decorator(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType df = self.data @pandas_udf( @@ -176,7 +172,6 @@ def foo(pdf): self.assertPandasEqual(expected, result) def test_coerce(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType df = self.data foo = pandas_udf( @@ -191,7 +186,6 @@ def test_coerce(self): self.assertPandasEqual(expected, result) def test_complex_groupby(self): - from pyspark.sql.functions import pandas_udf, col, PandasUDFType df = self.data @pandas_udf( @@ -210,7 +204,6 @@ def normalize(pdf): self.assertPandasEqual(expected, result) def test_empty_groupby(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType df = self.data @pandas_udf( @@ -229,7 +222,6 @@ def normalize(pdf): self.assertPandasEqual(expected, result) def test_datatype_string(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType df = self.data foo_udf = pandas_udf( @@ -243,8 +235,6 @@ def test_datatype_string(self): self.assertPandasEqual(expected, result) def test_wrong_return_type(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - with QuietTest(self.sc): with self.assertRaisesRegexp( NotImplementedError, @@ -255,7 +245,6 @@ def test_wrong_return_type(self): PandasUDFType.GROUPED_MAP) def test_wrong_args(self): - from pyspark.sql.functions import udf, pandas_udf, sum, PandasUDFType df = self.data with QuietTest(self.sc): @@ -277,9 +266,7 @@ def test_wrong_args(self): pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR)) def test_unsupported_types(self): - from distutils.version import LooseVersion import pyarrow as pa - from pyspark.sql.functions import pandas_udf, PandasUDFType common_err_msg = 'Invalid returnType.*grouped map Pandas UDF.*' unsupported_types = [ @@ -300,7 +287,6 @@ def test_unsupported_types(self): # Regression test for SPARK-23314 def test_timestamp_dst(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am dt = [datetime.datetime(2015, 11, 1, 0, 30), datetime.datetime(2015, 11, 1, 1, 30), @@ -311,12 +297,12 @@ def test_timestamp_dst(self): self.assertPandasEqual(df.toPandas(), result.toPandas()) def test_udf_with_key(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType + import numpy as np + df = self.data pdf = df.toPandas() def foo1(key, pdf): - import numpy as np assert type(key) == tuple assert type(key[0]) == np.int64 @@ -326,7 +312,6 @@ def foo1(key, pdf): v4=pdf.v * pdf.id.mean()) def foo2(key, pdf): - import numpy as np assert type(key) == tuple assert type(key[0]) == np.int64 assert type(key[1]) == np.int32 @@ -385,9 +370,7 @@ def foo3(key, pdf): self.assertPandasEqual(expected4, result4) def test_column_order(self): - from collections import OrderedDict import pandas as pd - from pyspark.sql.functions import pandas_udf, PandasUDFType # Helper function to set column names from a list def rename_pdf(pdf, names): @@ -468,7 +451,6 @@ def invalid_positional_types(pdf): with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, "KeyError: 'id'"): grouped_df.apply(column_name_typo).collect() - from distutils.version import LooseVersion import pyarrow as pa if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0. @@ -480,7 +462,6 @@ def invalid_positional_types(pdf): def test_positional_assignment_conf(self): import pandas as pd - from pyspark.sql.functions import pandas_udf, PandasUDFType with self.sql_conf({ "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}): @@ -496,9 +477,7 @@ def foo(_): self.assertEqual(r.b, 1) def test_self_join_with_pandas(self): - import pyspark.sql.functions as F - - @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP) + @pandas_udf('key long, col string', PandasUDFType.GROUPED_MAP) def dummy_pandas_udf(df): return df[['key', 'col']] @@ -508,12 +487,11 @@ def dummy_pandas_udf(df): # this was throwing an AnalysisException before SPARK-24208 res = df_with_pandas.alias('temp0').join(df_with_pandas.alias('temp1'), - F.col('temp0.key') == F.col('temp1.key')) + col('temp0.key') == col('temp1.key')) self.assertEquals(res.count(), 5) def test_mixed_scalar_udfs_followed_by_grouby_apply(self): import pandas as pd - from pyspark.sql.functions import udf, pandas_udf, PandasUDFType df = self.spark.range(0, 10).toDF('v1') df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \ diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index 2f585a372598..6a6865a9fb16 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -16,12 +16,20 @@ # import datetime import os +import random import shutil import sys import tempfile import time import unittest +from datetime import date, datetime +from decimal import Decimal +from distutils.version import LooseVersion + +from pyspark.rdd import PythonEvalType +from pyspark.sql import Column +from pyspark.sql.functions import array, col, expr, lit, sum, udf, pandas_udf from pyspark.sql.types import Row from pyspark.sql.types import * from pyspark.sql.utils import AnalysisException @@ -59,18 +67,16 @@ def tearDownClass(cls): @property def nondeterministic_vectorized_udf(self): - from pyspark.sql.functions import pandas_udf + import pandas as pd + import numpy as np @pandas_udf('double') def random_udf(v): - import pandas as pd - import numpy as np return pd.Series(np.random.random(len(v))) random_udf = random_udf.asNondeterministic() return random_udf def test_pandas_udf_tokenize(self): - from pyspark.sql.functions import pandas_udf tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')), ArrayType(StringType())) self.assertEqual(tokenize.returnType, ArrayType(StringType())) @@ -79,7 +85,6 @@ def test_pandas_udf_tokenize(self): self.assertEqual([Row(hi=[u'hi', u'boo']), Row(hi=[u'bye', u'boo'])], result.collect()) def test_pandas_udf_nested_arrays(self): - from pyspark.sql.functions import pandas_udf tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]), ArrayType(ArrayType(StringType()))) self.assertEqual(tokenize.returnType, ArrayType(ArrayType(StringType()))) @@ -88,7 +93,6 @@ def test_pandas_udf_nested_arrays(self): self.assertEqual([Row(hi=[[u'hi', u'boo']]), Row(hi=[[u'bye', u'boo']])], result.collect()) def test_vectorized_udf_basic(self): - from pyspark.sql.functions import pandas_udf, col, array df = self.spark.range(10).select( col('id').cast('string').alias('str'), col('id').cast('int').alias('int'), @@ -114,9 +118,6 @@ def test_vectorized_udf_basic(self): self.assertEquals(df.collect(), res.collect()) def test_register_nondeterministic_vectorized_udf_basic(self): - from pyspark.sql.functions import pandas_udf - from pyspark.rdd import PythonEvalType - import random random_pandas_udf = pandas_udf( lambda x: random.randint(6, 6) + x, IntegerType()).asNondeterministic() self.assertEqual(random_pandas_udf.deterministic, False) @@ -129,7 +130,6 @@ def test_register_nondeterministic_vectorized_udf_basic(self): self.assertEqual(row[0], 7) def test_vectorized_udf_null_boolean(self): - from pyspark.sql.functions import pandas_udf, col data = [(True,), (True,), (None,), (False,)] schema = StructType().add("bool", BooleanType()) df = self.spark.createDataFrame(data, schema) @@ -138,7 +138,6 @@ def test_vectorized_udf_null_boolean(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_byte(self): - from pyspark.sql.functions import pandas_udf, col data = [(None,), (2,), (3,), (4,)] schema = StructType().add("byte", ByteType()) df = self.spark.createDataFrame(data, schema) @@ -147,7 +146,6 @@ def test_vectorized_udf_null_byte(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_short(self): - from pyspark.sql.functions import pandas_udf, col data = [(None,), (2,), (3,), (4,)] schema = StructType().add("short", ShortType()) df = self.spark.createDataFrame(data, schema) @@ -156,7 +154,6 @@ def test_vectorized_udf_null_short(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_int(self): - from pyspark.sql.functions import pandas_udf, col data = [(None,), (2,), (3,), (4,)] schema = StructType().add("int", IntegerType()) df = self.spark.createDataFrame(data, schema) @@ -165,7 +162,6 @@ def test_vectorized_udf_null_int(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_long(self): - from pyspark.sql.functions import pandas_udf, col data = [(None,), (2,), (3,), (4,)] schema = StructType().add("long", LongType()) df = self.spark.createDataFrame(data, schema) @@ -174,7 +170,6 @@ def test_vectorized_udf_null_long(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_float(self): - from pyspark.sql.functions import pandas_udf, col data = [(3.0,), (5.0,), (-1.0,), (None,)] schema = StructType().add("float", FloatType()) df = self.spark.createDataFrame(data, schema) @@ -183,7 +178,6 @@ def test_vectorized_udf_null_float(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_double(self): - from pyspark.sql.functions import pandas_udf, col data = [(3.0,), (5.0,), (-1.0,), (None,)] schema = StructType().add("double", DoubleType()) df = self.spark.createDataFrame(data, schema) @@ -192,8 +186,6 @@ def test_vectorized_udf_null_double(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_decimal(self): - from decimal import Decimal - from pyspark.sql.functions import pandas_udf, col data = [(Decimal(3.0),), (Decimal(5.0),), (Decimal(-1.0),), (None,)] schema = StructType().add("decimal", DecimalType(38, 18)) df = self.spark.createDataFrame(data, schema) @@ -202,7 +194,6 @@ def test_vectorized_udf_null_decimal(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_string(self): - from pyspark.sql.functions import pandas_udf, col data = [("foo",), (None,), ("bar",), ("bar",)] schema = StructType().add("str", StringType()) df = self.spark.createDataFrame(data, schema) @@ -211,7 +202,6 @@ def test_vectorized_udf_null_string(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_string_in_udf(self): - from pyspark.sql.functions import pandas_udf, col import pandas as pd df = self.spark.range(10) str_f = pandas_udf(lambda x: pd.Series(map(str, x)), StringType()) @@ -220,7 +210,6 @@ def test_vectorized_udf_string_in_udf(self): self.assertEquals(expected.collect(), actual.collect()) def test_vectorized_udf_datatype_string(self): - from pyspark.sql.functions import pandas_udf, col df = self.spark.range(10).select( col('id').cast('string').alias('str'), col('id').cast('int').alias('int'), @@ -244,9 +233,8 @@ def test_vectorized_udf_datatype_string(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_binary(self): - from distutils.version import LooseVersion import pyarrow as pa - from pyspark.sql.functions import pandas_udf, col + if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): with QuietTest(self.sc): with self.assertRaisesRegexp( @@ -262,7 +250,6 @@ def test_vectorized_udf_null_binary(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_array_type(self): - from pyspark.sql.functions import pandas_udf, col data = [([1, 2],), ([3, 4],)] array_schema = StructType([StructField("array", ArrayType(IntegerType()))]) df = self.spark.createDataFrame(data, schema=array_schema) @@ -271,7 +258,6 @@ def test_vectorized_udf_array_type(self): self.assertEquals(df.collect(), result.collect()) def test_vectorized_udf_null_array(self): - from pyspark.sql.functions import pandas_udf, col data = [([1, 2],), (None,), (None,), ([3, 4],), (None,)] array_schema = StructType([StructField("array", ArrayType(IntegerType()))]) df = self.spark.createDataFrame(data, schema=array_schema) @@ -280,7 +266,6 @@ def test_vectorized_udf_null_array(self): self.assertEquals(df.collect(), result.collect()) def test_vectorized_udf_complex(self): - from pyspark.sql.functions import pandas_udf, col, expr df = self.spark.range(10).select( col('id').cast('int').alias('a'), col('id').cast('int').alias('b'), @@ -293,7 +278,6 @@ def test_vectorized_udf_complex(self): self.assertEquals(expected.collect(), res.collect()) def test_vectorized_udf_exception(self): - from pyspark.sql.functions import pandas_udf, col df = self.spark.range(10) raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType()) with QuietTest(self.sc): @@ -301,8 +285,8 @@ def test_vectorized_udf_exception(self): df.select(raise_exception(col('id'))).collect() def test_vectorized_udf_invalid_length(self): - from pyspark.sql.functions import pandas_udf, col import pandas as pd + df = self.spark.range(10) raise_exception = pandas_udf(lambda _: pd.Series(1), LongType()) with QuietTest(self.sc): @@ -312,7 +296,6 @@ def test_vectorized_udf_invalid_length(self): df.select(raise_exception(col('id'))).collect() def test_vectorized_udf_chained(self): - from pyspark.sql.functions import pandas_udf, col df = self.spark.range(10) f = pandas_udf(lambda x: x + 1, LongType()) g = pandas_udf(lambda x: x - 1, LongType()) @@ -320,7 +303,6 @@ def test_vectorized_udf_chained(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_wrong_return_type(self): - from pyspark.sql.functions import pandas_udf with QuietTest(self.sc): with self.assertRaisesRegexp( NotImplementedError, @@ -328,7 +310,6 @@ def test_vectorized_udf_wrong_return_type(self): pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType())) def test_vectorized_udf_return_scalar(self): - from pyspark.sql.functions import pandas_udf, col df = self.spark.range(10) f = pandas_udf(lambda x: 1.0, DoubleType()) with QuietTest(self.sc): @@ -336,7 +317,6 @@ def test_vectorized_udf_return_scalar(self): df.select(f(col('id'))).collect() def test_vectorized_udf_decorator(self): - from pyspark.sql.functions import pandas_udf, col df = self.spark.range(10) @pandas_udf(returnType=LongType()) @@ -346,21 +326,18 @@ def identity(x): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_empty_partition(self): - from pyspark.sql.functions import pandas_udf, col df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) f = pandas_udf(lambda x: x, LongType()) res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_varargs(self): - from pyspark.sql.functions import pandas_udf, col df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) f = pandas_udf(lambda *v: v[0], LongType()) res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_unsupported_types(self): - from pyspark.sql.functions import pandas_udf with QuietTest(self.sc): with self.assertRaisesRegexp( NotImplementedError, @@ -368,8 +345,6 @@ def test_vectorized_udf_unsupported_types(self): pandas_udf(lambda x: x, MapType(StringType(), IntegerType())) def test_vectorized_udf_dates(self): - from pyspark.sql.functions import pandas_udf, col - from datetime import date schema = StructType().add("idx", LongType()).add("date", DateType()) data = [(0, date(1969, 1, 1),), (1, date(2012, 2, 2),), @@ -405,8 +380,6 @@ def check_data(idx, date, date_copy): self.assertIsNone(result[i][3]) # "check_data" col def test_vectorized_udf_timestamps(self): - from pyspark.sql.functions import pandas_udf, col - from datetime import datetime schema = StructType([ StructField("idx", LongType(), True), StructField("timestamp", TimestampType(), True)]) @@ -447,8 +420,8 @@ def check_data(idx, timestamp, timestamp_copy): self.assertIsNone(result[i][3]) # "check_data" col def test_vectorized_udf_return_timestamp_tz(self): - from pyspark.sql.functions import pandas_udf, col import pandas as pd + df = self.spark.range(10) @pandas_udf(returnType=TimestampType()) @@ -465,8 +438,8 @@ def gen_timestamps(id): self.assertEquals(expected, ts) def test_vectorized_udf_check_config(self): - from pyspark.sql.functions import pandas_udf, col import pandas as pd + with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}): df = self.spark.range(10, numPartitions=1) @@ -479,9 +452,8 @@ def check_records_per_batch(x): self.assertTrue(r <= 3) def test_vectorized_udf_timestamps_respect_session_timezone(self): - from pyspark.sql.functions import pandas_udf, col - from datetime import datetime import pandas as pd + schema = StructType([ StructField("idx", LongType(), True), StructField("timestamp", TimestampType(), True)]) @@ -519,8 +491,6 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): def test_nondeterministic_vectorized_udf(self): # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations - from pyspark.sql.functions import pandas_udf, col - @pandas_udf('double') def plus_ten(v): return v + 10 @@ -533,8 +503,6 @@ def plus_ten(v): self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10)) def test_nondeterministic_vectorized_udf_in_aggregate(self): - from pyspark.sql.functions import sum - df = self.spark.range(10) random_udf = self.nondeterministic_vectorized_udf @@ -545,8 +513,6 @@ def test_nondeterministic_vectorized_udf_in_aggregate(self): df.agg(sum(random_udf(df.id))).collect() def test_register_vectorized_udf_basic(self): - from pyspark.rdd import PythonEvalType - from pyspark.sql.functions import pandas_udf, col, expr df = self.spark.range(10).select( col('id').cast('int').alias('a'), col('id').cast('int').alias('b')) @@ -563,11 +529,10 @@ def test_register_vectorized_udf_basic(self): # Regression test for SPARK-23314 def test_timestamp_dst(self): - from pyspark.sql.functions import pandas_udf # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am - dt = [datetime.datetime(2015, 11, 1, 0, 30), - datetime.datetime(2015, 11, 1, 1, 30), - datetime.datetime(2015, 11, 1, 2, 30)] + dt = [datetime(2015, 11, 1, 0, 30), + datetime(2015, 11, 1, 1, 30), + datetime(2015, 11, 1, 2, 30)] df = self.spark.createDataFrame(dt, 'timestamp').toDF('time') foo_udf = pandas_udf(lambda x: x, 'timestamp') result = df.withColumn('time', foo_udf(df.time)) @@ -593,7 +558,6 @@ def test_type_annotation(self): def test_mixed_udf(self): import pandas as pd - from pyspark.sql.functions import col, udf, pandas_udf df = self.spark.range(0, 1).toDF('v') @@ -696,8 +660,6 @@ def f4(x): def test_mixed_udf_and_sql(self): import pandas as pd - from pyspark.sql import Column - from pyspark.sql.functions import udf, pandas_udf df = self.spark.range(0, 1).toDF('v') @@ -758,7 +720,6 @@ def test_datasource_with_udf(self): # This needs to a separate test because Arrow dependency is optional import pandas as pd import numpy as np - from pyspark.sql.functions import pandas_udf, lit, col path = tempfile.mkdtemp() shutil.rmtree(path) diff --git a/python/pyspark/sql/tests/test_pandas_udf_window.py b/python/pyspark/sql/tests/test_pandas_udf_window.py index f0e6d2696df6..0a7a19c1c081 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_window.py +++ b/python/pyspark/sql/tests/test_pandas_udf_window.py @@ -18,6 +18,8 @@ import unittest from pyspark.sql.utils import AnalysisException +from pyspark.sql.functions import array, explode, col, lit, mean, min, max, rank, \ + udf, pandas_udf, PandasUDFType from pyspark.sql.window import Window from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \ pandas_requirement_message, pyarrow_requirement_message @@ -30,7 +32,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase): @property def data(self): - from pyspark.sql.functions import array, explode, col, lit return self.spark.range(10).toDF('id') \ .withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ .withColumn("v", explode(col('vs'))) \ @@ -39,18 +40,14 @@ def data(self): @property def python_plus_one(self): - from pyspark.sql.functions import udf return udf(lambda v: v + 1, 'double') @property def pandas_scalar_time_two(self): - from pyspark.sql.functions import pandas_udf return pandas_udf(lambda v: v * 2, 'double') @property def pandas_agg_mean_udf(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - @pandas_udf('double', PandasUDFType.GROUPED_AGG) def avg(v): return v.mean() @@ -58,8 +55,6 @@ def avg(v): @property def pandas_agg_max_udf(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - @pandas_udf('double', PandasUDFType.GROUPED_AGG) def max(v): return v.max() @@ -67,8 +62,6 @@ def max(v): @property def pandas_agg_min_udf(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - @pandas_udf('double', PandasUDFType.GROUPED_AGG) def min(v): return v.min() @@ -88,8 +81,6 @@ def unpartitioned_window(self): return Window.partitionBy() def test_simple(self): - from pyspark.sql.functions import mean - df = self.data w = self.unbounded_window @@ -105,8 +96,6 @@ def test_simple(self): self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) def test_multiple_udfs(self): - from pyspark.sql.functions import max, min, mean - df = self.data w = self.unbounded_window @@ -121,8 +110,6 @@ def test_multiple_udfs(self): self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) def test_replace_existing(self): - from pyspark.sql.functions import mean - df = self.data w = self.unbounded_window @@ -132,8 +119,6 @@ def test_replace_existing(self): self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) def test_mixed_sql(self): - from pyspark.sql.functions import mean - df = self.data w = self.unbounded_window mean_udf = self.pandas_agg_mean_udf @@ -144,8 +129,6 @@ def test_mixed_sql(self): self.assertPandasEqual(expected1.toPandas(), result1.toPandas()) def test_mixed_udf(self): - from pyspark.sql.functions import mean - df = self.data w = self.unbounded_window @@ -171,8 +154,6 @@ def test_mixed_udf(self): self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) def test_without_partitionBy(self): - from pyspark.sql.functions import mean - df = self.data w = self.unpartitioned_window mean_udf = self.pandas_agg_mean_udf @@ -187,8 +168,6 @@ def test_without_partitionBy(self): self.assertPandasEqual(expected2.toPandas(), result2.toPandas()) def test_mixed_sql_and_udf(self): - from pyspark.sql.functions import max, min, rank, col - df = self.data w = self.unbounded_window ow = self.ordered_window @@ -221,8 +200,6 @@ def test_mixed_sql_and_udf(self): self.assertPandasEqual(expected4.toPandas(), result4.toPandas()) def test_array_type(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - df = self.data w = self.unbounded_window @@ -231,8 +208,6 @@ def test_array_type(self): self.assertEquals(result1.first()['v2'], [1.0, 2.0]) def test_invalid_args(self): - from pyspark.sql.functions import pandas_udf, PandasUDFType - df = self.data w = self.unbounded_window ow = self.ordered_window