Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
45c7bc5
Adding the proto and the dependencies
grundprinzip Jun 16, 2022
c2c6de0
Trying to fix aarch64
grundprinzip Jul 5, 2022
034a44d
Moving connect to its own module
grundprinzip Jul 7, 2022
0cedf00
[SparkConnect] Spark Connect Planner
grundprinzip Jul 4, 2022
5d4b8dd
adding the planner
grundprinzip Jul 25, 2022
506fc44
adding the python code
grundprinzip Jul 26, 2022
0f6a687
fixing tests
grundprinzip Jul 26, 2022
9adf4bd
Adding some very basic unit tests
grundprinzip Jul 29, 2022
ed2fe24
adding explain to the service
grundprinzip Jul 29, 2022
786efbb
adding apache header and small col function
grundprinzip Jul 31, 2022
8693efb
more apache headers
grundprinzip Jul 31, 2022
d336b2a
Adding some more tests
grundprinzip Jul 31, 2022
dbe3d1f
adding some more doc
grundprinzip Jul 31, 2022
3ea55ca
some basic tests for literals
grundprinzip Jul 31, 2022
127a492
Fixing the pom files
grundprinzip Aug 29, 2022
4c63000
Moving to grpc-java
grundprinzip Aug 29, 2022
b713969
Fixing some python tests
grundprinzip Aug 29, 2022
0547e07
Fixing more python tests
grundprinzip Aug 29, 2022
074e1f7
making connect build by default
grundprinzip Aug 29, 2022
e77a018
fixing sbt build
grundprinzip Aug 29, 2022
21224ec
More sbt stuff
grundprinzip Aug 30, 2022
e22975f
More sbt stuff
grundprinzip Aug 30, 2022
9601449
More sbt stuff
grundprinzip Aug 31, 2022
b3ab663
properly shaded spark connect build
grundprinzip Sep 2, 2022
8968f1e
SBT build and testing works
grundprinzip Sep 4, 2022
87a50d4
Python linting
grundprinzip Sep 4, 2022
fefc84a
Restricting enaling Spark Connect to sepcific tests only and fixing t…
grundprinzip Sep 4, 2022
193e6b0
scala 2.13 fix
grundprinzip Sep 4, 2022
e1f862e
Fixin python lint issues
grundprinzip Sep 4, 2022
d5b6002
Fixin python lint issues
grundprinzip Sep 4, 2022
8bb03d2
Marking all classes in Spark Connect as experimental
grundprinzip Sep 4, 2022
265aca3
Fixing style
grundprinzip Sep 4, 2022
d6f64e8
test infra
grundprinzip Sep 5, 2022
3d00bb0
Missing file for disabling mypy checks
grundprinzip Sep 5, 2022
3525eab
Trying to add a python package the right way
grundprinzip Sep 5, 2022
7e1dd58
Removing grpc/protobuf from pypy
grundprinzip Sep 5, 2022
fa9e85e
Disabling pypy for spark connect
grundprinzip Sep 5, 2022
cf6b19a
Adding licence files
grundprinzip Sep 5, 2022
16bf911
Fixing a bug in UDF handling for Python version selection
grundprinzip Sep 5, 2022
762104a
Trailing newlines
grundprinzip Sep 5, 2022
ca4cfbb
Adding dependency manaifest
grundprinzip Sep 5, 2022
79bbfd6
Moving to classs
grundprinzip Sep 6, 2022
ec1221b
Merge branch 'master' into spark-connect-grpc-shaded
grundprinzip Sep 6, 2022
63d8ebc
dependencies
grundprinzip Sep 6, 2022
70ad818
M2 cache bomb
grundprinzip Sep 6, 2022
ae75ca4
More m2 removal
grundprinzip Sep 6, 2022
7e615f7
Revert "More m2 removal"
HyukjinKwon Sep 7, 2022
8effefb
Revert "M2 cache bomb"
HyukjinKwon Sep 7, 2022
aa68119
Add Python 3.7
HyukjinKwon Sep 7, 2022
fcefe9e
Revert "Add Python 3.7"
HyukjinKwon Sep 7, 2022
ff8e4ad
Merge remote-tracking branch 'upstream/master' into HEAD
HyukjinKwon Sep 7, 2022
b2f6548
Python 3.7
HyukjinKwon Sep 7, 2022
feb0233
Revert "Python 3.7"
HyukjinKwon Sep 7, 2022
4b02eb3
Tweaking memory consumption of SBT
grundprinzip Sep 7, 2022
aec7e3d
Disabling paralell execution for SBT in doc build
grundprinzip Sep 7, 2022
e4485c3
Avoiding copying the shaded jar in doc build
grundprinzip Sep 8, 2022
4ef5a35
Sbt doc build still
grundprinzip Sep 8, 2022
e13b955
test package build
grundprinzip Sep 8, 2022
6c96205
only test package build
grundprinzip Sep 9, 2022
d1e3c13
using exact pyspark build options
grundprinzip Sep 9, 2022
9d43765
Moving things around
grundprinzip Sep 9, 2022
448fcc8
Desperate attempts
grundprinzip Sep 9, 2022
b3d2d8c
Java 11 anyone?
grundprinzip Sep 9, 2022
b2c6bfd
Revert "Java 11 anyone?"
grundprinzip Sep 13, 2022
11ad9b6
disable tests on assembly run
grundprinzip Sep 13, 2022
7617b82
Adding additional tests
grundprinzip Sep 14, 2022
ce4900b
format + slight python api change
grundprinzip Sep 14, 2022
28e7741
update on readme and import
grundprinzip Sep 14, 2022
8223615
Properly catching exceptions and removing stray debug user
grundprinzip Sep 18, 2022
8bf14f2
scalastyle
grundprinzip Sep 18, 2022
87ba6f2
doc fix
grundprinzip Sep 18, 2022
45f23e0
addrsessing review comments
grundprinzip Sep 20, 2022
4971980
removing embedded protos for 3p google
grundprinzip Sep 20, 2022
4aafab8
renaming
grundprinzip Sep 20, 2022
38b69ce
Adding @Since annotation
grundprinzip Sep 20, 2022
07b0ec8
Fixing python test with the right package
grundprinzip Sep 20, 2022
f47b8e9
fixing build error due to package refactoring
grundprinzip Sep 20, 2022
b57cbd2
Sql -> SQL
grundprinzip Sep 21, 2022
77470cd
Scala review comments
grundprinzip Sep 22, 2022
ee13ae2
Build file review
grundprinzip Sep 22, 2022
51be506
Python review comments
grundprinzip Sep 22, 2022
549a10e
Python review comments
grundprinzip Sep 22, 2022
b0608f3
black fmt
grundprinzip Sep 22, 2022
279faf4
Addressing review comments
grundprinzip Sep 23, 2022
e5e2347
Addressing review comments
grundprinzip Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixin python lint issues
  • Loading branch information
grundprinzip committed Sep 4, 2022
commit e1f862eeba1468a36e3b88585e6c4507f55d0c8a
1 change: 1 addition & 0 deletions dev/lint-python
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ function mypy_examples_test {
MYPY_REPORT=$( (MYPYPATH=python $MYPY_BUILD \
--config-file python/mypy.ini \
--exclude "mllib/*" \
--exclude "sql/connect/proto/*" \
examples/src/main/python/) 2>&1)

MYPY_STATUS=$?
Expand Down
2 changes: 2 additions & 0 deletions dev/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ exclude =
python/pyspark/worker.pyi,
python/pyspark/java_gateway.pyi,
dev/ansible-for-test-node/*,
python/pyspark/sql/connect/proto/*,
python/venv/*,
max-line-length = 100
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

from pyspark.sql.connect.data_frame import DataFrame
from pyspark.sql.connect.data_frame import DataFrame # noqa: F401
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def register_udf(self, function, return_type) -> str:
req.user_context.user_id = self._user_id
req.plan.command.create_function.CopyFrom(fun)

result = self._execute_and_fetch(req)
self._execute_and_fetch(req)
return name

def _build_metrics(self, metrics: "pb2.Response.Metrics") -> typing.List[PlanMetrics]:
Expand Down
5 changes: 3 additions & 2 deletions python/pyspark/sql/connect/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def __init__(self, value: PrimitiveType) -> None: # type: ignore[name-defined]
def to_plan(self, session: "RemoteSparkSession") -> proto.Expression:
"""Converts the literal expression to the literal in proto.

TODO This method always assumes the largest type and can thus create weird interpretations of the literal."""
TODO This method always assumes the largest type and can thus
create weird interpretations of the literal."""
value_type = type(self._value)
exp = proto.Expression()
if value_type is int:
Expand Down Expand Up @@ -97,7 +98,7 @@ def from_qualified_name(cls, name) -> "ColumnRef":

def __init__(self, *parts: str) -> None: # type: ignore[name-defined]
super().__init__()
self._parts: List[str] = list(filter(lambda x: not x is None, list(parts)))
self._parts: List[str] = list(filter(lambda x: x is not None, list(parts)))

def name(self) -> str:
"""Returns the qualified name of the column reference."""
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/connect/data_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def columns(self) -> List[str]:
"""Returns the list of columns of the current data frame."""
if self._plan is None:
return []
if not "columns" in self._cache and self._plan is not None:
if "columns" not in self._cache and self._plan is not None:
pdd = self.limit(0).collect()
# Translate to standard pytho array
self._cache["columns"] = pdd.columns.values
Expand Down Expand Up @@ -210,7 +210,7 @@ def where(self, condition):

def _get_alias(self):
p = self._plan
while not p is None:
while p is not None:
if isinstance(p, plan.Project) and p.alias:
return p.alias
p = p._child
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/sql/connect/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import TYPE_CHECKING
from pyspark.sql.connect.column import ColumnRef, LiteralExpression
from pyspark.sql.connect.column import PrimitiveType

Expand Down
21 changes: 13 additions & 8 deletions python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
# limitations under the License.
#

import base64
from calendar import c
from typing import (
AnyStr,
Dict,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -165,7 +161,7 @@ def plan(self, session: "RemoteSparkSession") -> proto.Relation:
for c in self._raw_columns
] # [self.unresolved_attr(*x) for x in self.columns]
common = proto.RelationCommon()
if not self.alias is None:
if self.alias is not None:
common.alias = self.alias

plan = proto.Relation()
Expand Down Expand Up @@ -348,7 +344,10 @@ def plan(self, session: "RemoteSparkSession") -> proto.Relation:

def print(self, indent=0) -> str:
c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else ""
return f"{self._i(indent)}<Sort columns={self.grouping_cols} measures={self.measures}>\n{c_buf}"
return (
f"{self._i(indent)}<Sort columns={self.grouping_cols}"
f"measures={self.measures}>\n{c_buf}"
)

def _repr_html_(self):
return f"""
Expand Down Expand Up @@ -388,7 +387,10 @@ def print(self, indent=0) -> str:
i = self._i(indent)
o = self._i(indent + LogicalPlan.INDENT)
n = indent + LogicalPlan.INDENT * 2
return f"""{i}<Join on={self.on} how={self.how}>\n{o}left=\n{self.left.print(n)}\n{o}right=\n{self.right.print(n)}"""
return (
f"{i}<Join on={self.on} how={self.how}>\n{o}"
f"left=\n{self.left.print(n)}\n{o}right=\n{self.right.print(n)}"
)

def _repr_html_(self):
return f"""
Expand Down Expand Up @@ -420,7 +422,10 @@ def print(self, indent=0) -> str:
i = self._i(indent)
o = self._i(indent + LogicalPlan.INDENT)
n = indent + LogicalPlan.INDENT * 2
return f"""{i}UnionAll\n{o}child1=\n{self._child.print(n)}\n{o}child2=\n{self.other.print(n)}"""
return (
f"{i}UnionAll\n{o}child1=\n{self._child.print(n)}"
f"\n{o}child2=\n{self.other.print(n)}"
)

def _repr_html_(self) -> str:
assert self._child is not None
Expand Down
9 changes: 7 additions & 2 deletions python/pyspark/sql/tests/connect/test_plan_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
generation but do not call Spark."""

def test_simple_project(self):
read_table = lambda x: DataFrame.withPlan(Read(x), self.connect)
def read_table(x):
return DataFrame.withPlan(Read(x), self.connect)

self.connect.set_hook("readTable", read_table)

plan = self.connect.readTable(self.tbl_name)._plan.collect(self.connect)
Expand All @@ -46,9 +48,12 @@ def udf_mock(*args, **kwargs):
expr = u("ThisCol", "ThatCol", "OtherCol")
self.assertTrue(isinstance(expr, UserDefinedFunction))
u_plan = expr.to_plan(self.connect)
assert u_plan is not None

def test_all_the_plans(self):
read_table = lambda x: DataFrame.withPlan(Read(x), self.connect)
def read_table(x):
return DataFrame.withPlan(Read(x), self.connect)

self.connect.set_hook("readTable", read_table)

df = self.connect.readTable(self.tbl_name)
Expand Down
20 changes: 9 additions & 11 deletions python/pyspark/sql/tests/connect/test_spark_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
import uuid
import unittest
import tempfile
import os
import shutil

from pyspark.sql import SparkSession, Row
from pyspark.sql.connect.client import RemoteSparkSession
from pyspark.sql.connect.function_builder import udf, UserDefinedFunction
from pyspark.sql.connect.function_builder import udf
from pyspark.testing.utils import ReusedPySparkTestCase

import py4j


class SparkConnectSQLTestCase(ReusedPySparkTestCase):
"""Parent test fixture class for all Spark Connect related
test cases."""

@classmethod
def setUpClass(cls):
def setUpClass(cls: Any) -> None:
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
cls.hive_available = True
Expand All @@ -46,7 +43,7 @@ def setUpClass(cls):
cls.spark_connect_test_data()

@classmethod
def spark_connect_test_data(cls):
def spark_connect_test_data(cls: Any) -> None:
# Setup Remote Spark Session
cls.tbl_name = f"tbl{uuid.uuid4()}".replace("-", "")
cls.connect = RemoteSparkSession(port=15002)
Expand All @@ -57,22 +54,23 @@ def spark_connect_test_data(cls):


class SparkConnectTests(SparkConnectSQLTestCase):
def test_simple_read(self):
def test_simple_read(self) -> None:
"""Tests that we can access the Spark Connect GRPC service locally."""
df = self.connect.readTable(self.tbl_name)
data = df.limit(10).collect()
# Check that the limit is applied
assert len(data.index) == 10

def test_simple_udf(self):
def conv_udf(x):
def test_simple_udf(self) -> None:
def conv_udf(x) -> str:
return "Martin"

u = udf(conv_udf)
df = self.connect.readTable(self.tbl_name)
result = df.select(u(df.id)).collect()
assert result is not None

def test_simple_explain_string(self):
def test_simple_explain_string(self) -> None:
df = self.connect.readTable(self.tbl_name).limit(10)
result = df.explain()
assert len(result) > 0
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/tests/connect/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.sql.tests.connect.utils.spark_connect_test_utils import PlanOnlyTestFixture

from pyspark.sql.tests.connect.utils.spark_connect_test_utils import ( # noqa: F401
PlanOnlyTestFixture, # noqa: F401
) # noqa: F401
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Dict
import functools
import unittest
import uuid


class MockRemoteSession:
def __init__(self):
self.hooks = {}
def __init__(self) -> None:
self.hooks: Dict[str, Any] = {}

def set_hook(self, name, hook):
def set_hook(self, name: str, hook: Any) -> None:
self.hooks[name] = hook

def __getattr__(self, item):
if not item in self.hooks:
def __getattr__(self, item: str) -> Any:
if item not in self.hooks:
raise LookupError(f"{item} is not defined as a method hook in MockRemoteSession")
return functools.partial(self.hooks[item])


class PlanOnlyTestFixture(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
def setUpClass(cls: Any) -> None:
cls.connect = MockRemoteSession()
cls.tbl_name = f"tbl{uuid.uuid4()}".replace("-", "")
13 changes: 7 additions & 6 deletions python/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ def run_individual_python_test(target_dir, test_name, pyspark_python, keep_test_
additional_config = []
if test_name.startswith("pyspark.sql.tests.connect"):
# Adding Spark Connect JAR and Config
additional_config += ["--conf",
"spark.plugins=org.apache.spark.sql.sparkconnect.service.SparkConnectPlugin"]

additional_config += [
"--conf",
"spark.plugins=org.apache.spark.sql.sparkconnect.service.SparkConnectPlugin"
]

# Also override the JVM's temp directory by setting driver and executor options.
java_options = "-Djava.io.tmpdir={0}".format(tmp_dir)
Expand All @@ -125,11 +126,11 @@ def run_individual_python_test(target_dir, test_name, pyspark_python, keep_test_
"--conf", "spark.driver.extraJavaOptions='{0}'".format(java_options),
"--conf", "spark.executor.extraJavaOptions='{0}'".format(java_options),
"--conf", "spark.sql.warehouse.dir='{0}'".format(metastore_dir),
] + additional_config + [
"pyspark-shell"
]
env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args)
spark_args += additional_config
spark_args += ["pyspark-shell"]

env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args)

output_prefix = get_valid_filename(pyspark_python + "__" + test_name + "__").lstrip("_")
# Delete is always set to False since the cleanup will be either done by removing the
Expand Down