Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions .github/workflows/build_python_connect.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: Build / Spark Connect Python-only (master, Python 3.11)

on:
schedule:
- cron: '0 19 * * *'

jobs:
# Build: build Spark and run the tests for specified modules using SBT
build:
name: "Build modules: pyspark-connect"
runs-on: ubuntu-latest
timeout-minutes: 300
steps:
- name: Checkout Spark repository
uses: actions/checkout@v4
- name: Cache Scala, SBT and Maven
uses: actions/cache@v4
with:
path: |
build/apache-maven-*
build/scala-*
build/*.jar
~/.sbt
key: build-spark-connect-python-only-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
restore-keys: |
build-spark-connect-python-only-
- name: Cache Coursier local repository
uses: actions/cache@v4
with:
path: ~/.cache/coursier
key: coursier-build-spark-connect-python-only-${{ hashFiles('**/pom.xml') }}
restore-keys: |
coursier-build-spark-connect-python-only-
- name: Install Java 17
uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 17
- name: Install Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
architecture: x64
- name: Build Spark
run: |
./build/sbt -Phive test:package
- name: Install pure Python package (pyspark-connect)
env:
SPARK_TESTING: 1
run: |
cd python
python packaging/connect/setup.py sdist
cd dist
pip install pyspark-connect-*.tar.gz
- name: Run tests
env:
SPARK_CONNECT_TESTING_REMOTE: sc://localhost
SPARK_TESTING: 1
run: |
# Start a Spark Connect server
./sbin/start-connect-server.sh --jars `find connector/connect/server/target -name spark-connect*SNAPSHOT.jar`
# Remove Py4J and PySpark zipped library to make sure there is no JVM connection
rm python/lib/*
rm -r python/pyspark
./python/run-tests --parallelism=1 --python-executables=python3 --modules pyspark-connect
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v4
with:
name: test-results-spark-connect-python-only
path: "**/target/test-reports/*.xml"
- name: Upload unit tests log files
if: failure()
uses: actions/upload-artifact@v4
with:
name: unit-tests-log-spark-connect-python-only
path: "**/target/unit-tests.log"
16 changes: 15 additions & 1 deletion python/packaging/connect/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@
os.path.isfile("../RELEASE") and len(glob.glob("../jars/spark*core*.jar")) == 1
)

test_packages = []
if "SPARK_TESTING" in os.environ:
test_packages = [
"pyspark.tests", # for Memory profiler parity tests
"pyspark.testing",
"pyspark.sql.tests",
"pyspark.sql.tests.connect",
"pyspark.sql.tests.connect.streaming",
"pyspark.sql.tests.connect.client",
"pyspark.sql.tests.connect.shell",
"pyspark.sql.tests.pandas",
"pyspark.sql.tests.streaming",
]

try:
if in_spark:
copyfile("packaging/connect/setup.py", "setup.py")
Expand Down Expand Up @@ -136,7 +150,7 @@
author="Spark Developers",
author_email="[email protected]",
url="https://github.com/apache/spark/tree/master/python",
packages=connect_packages,
packages=connect_packages + test_packages,
license="http://www.apache.org/licenses/LICENSE-2.0",
# Don't forget to update python/docs/source/getting_started/install.rst
# if you're updating the versions or dependencies.
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/pandas/data_type_ops/datetime_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import pandas as pd
from pandas.api.types import CategoricalDtype

from pyspark import SparkContext
from pyspark.sql import Column, functions as F
from pyspark.sql.types import (
BooleanType,
Expand Down Expand Up @@ -151,6 +150,8 @@ class DatetimeNTZOps(DatetimeOps):
"""

def _cast_spark_column_timestamp_to_long(self, scol: Column) -> Column:
from pyspark import SparkContext

jvm = SparkContext._active_spark_context._jvm
return Column(jvm.PythonSQLUtils.castTimestampNTZToLong(scol._jc))

Expand Down
8 changes: 7 additions & 1 deletion python/pyspark/sql/connect/avro/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,18 @@ def _test() -> None:
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.avro.functions
from pyspark.util import is_remote_only

globs = pyspark.sql.connect.avro.functions.__dict__.copy()

# TODO(SPARK-47760): Reeanble Avro function doctests
if is_remote_only():
del pyspark.sql.connect.avro.functions.from_avro
del pyspark.sql.connect.avro.functions.to_avro

globs["spark"] = (
PySparkSession.builder.appName("sql.connect.avro.functions tests")
.remote("local[4]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/connect/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,17 @@ def registerFunction(


def _test() -> None:
import os
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.catalog

globs = pyspark.sql.connect.catalog.__dict__.copy()
globs["spark"] = (
PySparkSession.builder.appName("sql.connect.catalog tests").remote("local[4]").getOrCreate()
PySparkSession.builder.appName("sql.connect.catalog tests")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

(failure_count, test_count) = doctest.testmod(
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/connect/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,17 @@ def __nonzero__(self) -> None:


def _test() -> None:
import os
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.column

globs = pyspark.sql.connect.column.__dict__.copy()
globs["spark"] = (
PySparkSession.builder.appName("sql.connect.column tests").remote("local[4]").getOrCreate()
PySparkSession.builder.appName("sql.connect.column tests")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

(failure_count, test_count) = doctest.testmod(
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/connect/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,17 @@ def _checkType(self, obj: Any, identifier: str) -> None:


def _test() -> None:
import os
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.conf

globs = pyspark.sql.connect.conf.__dict__.copy()
globs["spark"] = (
PySparkSession.builder.appName("sql.connect.conf tests").remote("local[4]").getOrCreate()
PySparkSession.builder.appName("sql.connect.conf tests")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

(failure_count, test_count) = doctest.testmod(
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2279,7 +2279,7 @@ def _test() -> None:

globs["spark"] = (
PySparkSession.builder.appName("sql.connect.dataframe tests")
.remote("local[4]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4065,6 +4065,7 @@ def call_function(funcName: str, *cols: "ColumnOrName") -> Column:

def _test() -> None:
import sys
import os
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.functions.builtin
Expand All @@ -4073,7 +4074,7 @@ def _test() -> None:

globs["spark"] = (
PySparkSession.builder.appName("sql.connect.functions tests")
.remote("local[4]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/connect/functions/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def hours(col: "ColumnOrName") -> Column:

def _test() -> None:
import sys
import os
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.functions.partitioning
Expand All @@ -89,7 +90,7 @@ def _test() -> None:

globs["spark"] = (
PySparkSession.builder.appName("sql.connect.functions tests")
.remote("local[4]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/connect/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ def applyInArrow(


def _test() -> None:
import os
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
Expand All @@ -456,7 +457,9 @@ def _test() -> None:
globs = pyspark.sql.connect.group.__dict__.copy()

globs["spark"] = (
PySparkSession.builder.appName("sql.connect.group tests").remote("local[4]").getOrCreate()
PySparkSession.builder.appName("sql.connect.group tests")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

(failure_count, test_count) = doctest.testmod(
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/connect/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def get(self) -> Dict[str, Any]:


def _test() -> None:
import os
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
Expand All @@ -90,7 +91,7 @@ def _test() -> None:
globs = pyspark.sql.connect.observation.__dict__.copy()
globs["spark"] = (
PySparkSession.builder.appName("sql.connect.observation tests")
.remote("local[4]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

Expand Down
8 changes: 7 additions & 1 deletion python/pyspark/sql/connect/protobuf/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def _read_descriptor_set_file(filePath: str) -> bytes:
def _test() -> None:
import os
import sys
from pyspark.util import is_remote_only
from pyspark.testing.utils import search_jar

protobuf_jar = search_jar("connector/protobuf", "spark-protobuf-assembly-", "spark-protobuf")
Expand All @@ -142,9 +143,14 @@ def _test() -> None:

globs = pyspark.sql.connect.protobuf.functions.__dict__.copy()

# TODO(SPARK-47763): Reeanble Protobuf function doctests
if is_remote_only():
del pyspark.sql.connect.protobuf.functions.from_protobuf
del pyspark.sql.connect.protobuf.functions.to_protobuf

globs["spark"] = (
PySparkSession.builder.appName("sql.protobuf.functions tests")
.remote("local[2]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[2]"))
.getOrCreate()
)

Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/connect/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ def overwritePartitions(self) -> None:

def _test() -> None:
import sys
import os
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.readwriter
Expand All @@ -942,7 +943,7 @@ def _test() -> None:

globs["spark"] = (
PySparkSession.builder.appName("sql.connect.readwriter tests")
.remote("local[4]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,14 +1023,17 @@ def profile(self) -> Profile:


def _test() -> None:
import os
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.sql.connect.session

globs = pyspark.sql.connect.session.__dict__.copy()
globs["spark"] = (
PySparkSession.builder.appName("sql.connect.session tests").remote("local[4]").getOrCreate()
PySparkSession.builder.appName("sql.connect.session tests")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

# Uses PySpark session to test builder.
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/streaming/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def _test() -> None:

globs["spark"] = (
PySparkSession.builder.appName("sql.connect.streaming.query tests")
.remote("local[4]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/connect/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ def toTable(


def _test() -> None:
import os
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
Expand All @@ -659,7 +660,7 @@ def _test() -> None:

globs["spark"] = (
PySparkSession.builder.appName("sql.connect.streaming.readwriter tests")
.remote("local[4]")
.remote(os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]"))
.getOrCreate()
)

Expand Down
Loading