diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 41214bcfdbfc..23dfd4e7ec8c 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -41,7 +41,7 @@ from py4j.java_gateway import JavaObject -from pyspark import copy_func, since, _NoValue +from pyspark import copy_func, _NoValue from pyspark._globals import _NoValueType from pyspark.context import SparkContext from pyspark.rdd import ( @@ -57,9 +57,6 @@ from pyspark.sql.streaming import DataStreamWriter from pyspark.sql.types import ( StructType, - StructField, - StringType, - IntegerType, Row, _parse_datatype_json_string, ) @@ -82,31 +79,50 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): """A distributed collection of data grouped into named columns. + .. versionadded:: 1.3.0 + + Examples + -------- A :class:`DataFrame` is equivalent to a relational table in Spark SQL, - and can be created using various functions in :class:`SparkSession`:: + and can be created using various functions in :class:`SparkSession`: - people = spark.read.parquet("...") + >>> people = spark.createDataFrame([ + ... {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50}, + ... {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100}, + ... {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150}, + ... {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200} + ... ]) Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: :class:`DataFrame`, :class:`Column`. - To select a column from the :class:`DataFrame`, use the apply method:: - - ageCol = people.age - - A more concrete example:: - - # To create DataFrame using SparkSession - people = spark.read.parquet("...") - department = spark.read.parquet("...") - - people.filter(people.age > 30).join(department, people.deptId == department.id) \\ - .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"}) - - .. versionadded:: 1.3.0 - - .. note: A DataFrame should only be created as described above. It should not be directly - created via using the constructor. + To select a column from the :class:`DataFrame`, use the apply method: + + >>> age_col = people.age + + A more concrete example: + + >>> # To create DataFrame using SparkSession + ... department = spark.createDataFrame([ + ... {"id": 1, "name": "PySpark"}, + ... {"id": 2, "name": "ML"}, + ... {"id": 3, "name": "Spark SQL"} + ... ]) + + >>> people.filter(people.age > 30).join( + ... department, people.deptId == department.id).groupBy( + ... department.name, "gender").agg({"salary": "avg", "age": "max"}).show() + +-------+------+-----------+--------+ + | name|gender|avg(salary)|max(age)| + +-------+------+-----------+--------+ + | ML| F| 150.0| 60| + |PySpark| M| 75.0| 50| + +-------+------+-----------+--------+ + + Notes + ----- + A DataFrame should only be created as described above. It should not be directly + created via using the constructor. """ def __init__( @@ -171,8 +187,7 @@ def sparkSession(self) -> "SparkSession": """ return self._session - @property # type: ignore[misc] - @since(1.3) + @property def rdd(self) -> "RDD[Row]": """Returns the content as an :class:`pyspark.RDD` of :class:`Row`. @@ -195,8 +210,7 @@ def rdd(self) -> "RDD[Row]": ) return self._lazy_rdd - @property # type: ignore[misc] - @since("1.3.1") + @property def na(self) -> "DataFrameNaFunctions": """Returns a :class:`DataFrameNaFunctions` for handling missing values. @@ -208,9 +222,12 @@ def na(self) -> "DataFrameNaFunctions": Examples -------- - >>> df = spark.sql("select 1 as c1, int(null) as c2") + >>> df = spark.sql("SELECT 1 AS c1, int(NULL) AS c2") >>> type(df.na) + + Replace the missing values as 2. + >>> df.na.fill(2).show() +---+---+ | c1| c2| @@ -220,8 +237,7 @@ def na(self) -> "DataFrameNaFunctions": """ return DataFrameNaFunctions(self) - @property # type: ignore[misc] - @since(1.4) + @property def stat(self) -> "DataFrameStatFunctions": """Returns a :class:`DataFrameStatFunctions` for statistic functions. @@ -234,7 +250,7 @@ def stat(self) -> "DataFrameStatFunctions": Examples -------- >>> import pyspark.sql.functions as f - >>> df = spark.range(3).withColumn("c", f.expr("id+1")) + >>> df = spark.range(3).withColumn("c", f.expr("id + 1")) >>> type(df.stat) >>> df.stat.corr("id", "c") @@ -251,7 +267,7 @@ def toJSON(self, use_unicode: bool = True) -> RDD[str]: Parameters ---------- - use_unicode : bool, optional (default: True) + use_unicode : bool, optional, default True Whether to convert to unicode or not. Returns @@ -260,6 +276,7 @@ def toJSON(self, use_unicode: bool = True) -> RDD[str]: Examples -------- + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.toJSON().first() '{"age":2,"name":"Alice"}' """ @@ -277,10 +294,16 @@ def registerTempTable(self, name: str) -> None: .. deprecated:: 2.0.0 Use :meth:`DataFrame.createOrReplaceTempView` instead. + Parameters + ---------- + name : str + Name of the temporary table to register. + Examples -------- + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.registerTempTable("people") - >>> df2 = spark.sql("select * from people") + >>> df2 = spark.sql("SELECT * FROM people") >>> sorted(df.collect()) == sorted(df2.collect()) True >>> spark.catalog.dropTempView("people") @@ -305,20 +328,22 @@ def createTempView(self, name: str) -> None: name : str Name of the view. - Returns - ------- - None - Examples -------- + Create a local temporary view. + + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.createTempView("people") - >>> df2 = spark.sql("select * from people") + >>> df2 = spark.sql("SELECT * FROM people") >>> sorted(df.collect()) == sorted(df2.collect()) True + + Throw an exception if the table already exists. + >>> df.createTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... - AnalysisException: u"Temporary table 'people' already exists;" + AnalysisException: "Temporary table 'people' already exists;" >>> spark.catalog.dropTempView("people") True @@ -344,10 +369,16 @@ def createOrReplaceTempView(self, name: str) -> None: Examples -------- + Create a local temporary view named 'people'. + + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.createOrReplaceTempView("people") + + Replace the local temporary view. + >>> df2 = df.filter(df.age > 3) >>> df2.createOrReplaceTempView("people") - >>> df3 = spark.sql("select * from people") + >>> df3 = spark.sql("SELECT * FROM people") >>> sorted(df3.collect()) == sorted(df2.collect()) True >>> spark.catalog.dropTempView("people") @@ -376,14 +407,20 @@ def createGlobalTempView(self, name: str) -> None: Examples -------- + Create a global temporary view. + + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.createGlobalTempView("people") - >>> df2 = spark.sql("select * from global_temp.people") + >>> df2 = spark.sql("SELECT * FROM global_temp.people") >>> sorted(df.collect()) == sorted(df2.collect()) True + + Throws an exception if the global temporary view already exists. + >>> df.createGlobalTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... - AnalysisException: u"Temporary table 'people' already exists;" + AnalysisException: "Temporary table 'people' already exists;" >>> spark.catalog.dropGlobalTempView("people") True @@ -408,10 +445,16 @@ def createOrReplaceGlobalTempView(self, name: str) -> None: Examples -------- + Create a global temporary view. + + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.createOrReplaceGlobalTempView("people") + + Replace the global temporary view. + >>> df2 = df.filter(df.age > 3) >>> df2.createOrReplaceGlobalTempView("people") - >>> df3 = spark.sql("select * from global_temp.people") + >>> df3 = spark.sql("SELECT * FROM global_temp.people") >>> sorted(df3.collect()) == sorted(df2.collect()) True >>> spark.catalog.dropGlobalTempView("people") @@ -434,9 +477,15 @@ def write(self) -> DataFrameWriter: Examples -------- + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> type(df.write) + + Write the DataFrame as a table. + + >>> _ = spark.sql("DROP TABLE IF EXISTS tab2") >>> df.write.saveAsTable("tab2") + >>> _ = spark.sql("DROP TABLE tab2") """ return DataFrameWriter(self) @@ -458,9 +507,15 @@ def writeStream(self) -> DataStreamWriter: Examples -------- + >>> import tempfile >>> df = spark.readStream.format("rate").load() - >>> dsw = df.writeStream - >>> dsw.option("checkpointLocation", "/tmp/c").toTable("tab3") # doctest: +ELLIPSIS + >>> type(df.writeStream) + + + >>> with tempfile.TemporaryDirectory() as d: + ... # Create a table with Rate source. + ... df.writeStream.toTable( + ... "my_table", checkpointLocation=d) # doctest: +ELLIPSIS """ return DataStreamWriter(self) @@ -477,8 +532,13 @@ def schema(self) -> StructType: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + + Retrieve the schema of the current DataFrame. + >>> df.schema - StructType([StructField('age', IntegerType(), True), + StructType([StructField('age', LongType(), True), StructField('name', StringType(), True)]) """ if self._schema is None: @@ -501,9 +561,11 @@ def printSchema(self) -> None: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) >>> df.printSchema() root - |-- age: integer (nullable = true) + |-- age: long (nullable = true) |-- name: string (nullable = true) """ print(self._jdf.schema().treeString()) @@ -534,15 +596,18 @@ def explain( .. versionchanged:: 3.0.0 Added optional argument `mode` to specify the expected output format of plans. - Returns - ------- - None - Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + + Print out the physical plan only (default). + >>> df.explain() == Physical Plan == - *(1) Scan ExistingRDD[age#0,name#1] + *(1) Scan ExistingRDD[age...,name...] + + Print out all of parsed, analyzed, optimized and physical plans. >>> df.explain(True) == Parsed Logical Plan == @@ -554,13 +619,17 @@ def explain( == Physical Plan == ... + Print out the plans with two sections: a physical plan outline and node details + >>> df.explain(mode="formatted") == Physical Plan == - * Scan ExistingRDD (1) - (1) Scan ExistingRDD [codegen id : 1] - Output [2]: [age#0, name#1] + * Scan ExistingRDD (...) + (1) Scan ExistingRDD [codegen id : ...] + Output [2]: [age..., name...] ... + Print a logical plan and statistics if they are available. + >>> df.explain("cost") == Optimized Logical Plan == ...Statistics... @@ -628,7 +697,6 @@ def exceptAll(self, other: "DataFrame") -> "DataFrame": >>> df1 = spark.createDataFrame( ... [("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b", 3), ("c", 4)], ["C1", "C2"]) >>> df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"]) - >>> df1.exceptAll(df2).show() +---+---+ | C1| C2| @@ -642,11 +710,12 @@ def exceptAll(self, other: "DataFrame") -> "DataFrame": """ return DataFrame(self._jdf.exceptAll(other._jdf), self.sparkSession) - @since(1.3) def isLocal(self) -> bool: """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally (without any Spark executors). + .. versionadded:: 1.3.0 + Returns ------- bool @@ -700,7 +769,7 @@ def isEmpty(self) -> bool: Examples -------- >>> df_empty = spark.createDataFrame([], 'a STRING') - >>> df_non_empty = spark.createDataFrame([("a")], 'STRING') + >>> df_non_empty = spark.createDataFrame(["a"], 'STRING') >>> df_empty.isEmpty() True >>> df_non_empty.isEmpty() @@ -731,29 +800,43 @@ def show(self, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = Examples -------- - >>> df - DataFrame[age: int, name: string] - >>> df.show() + >>> df = spark.createDataFrame([ + ... (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + + Show only top 2 rows. + + >>> df.show(2) +---+-----+ |age| name| +---+-----+ - | 2|Alice| - | 5| Bob| + | 14| Tom| + | 23|Alice| +---+-----+ + only showing top 2 rows + + Show :class:`DataFrame` where the maximum number of characters is 3. + >>> df.show(truncate=3) +---+----+ |age|name| +---+----+ - | 2| Ali| - | 5| Bob| + | 14| Tom| + | 23| Ali| + | 16| Bob| +---+----+ + + Show :class:`DataFrame` vertically. + >>> df.show(vertical=True) -RECORD 0----- - age | 2 - name | Alice + age | 14 + name | Tom -RECORD 1----- - age | 5 - name | Bob + age | 23 + name | Alice + -RECORD 2----- + age | 16 + name | Bob """ if not isinstance(n, int) or isinstance(n, bool): @@ -832,23 +915,27 @@ def checkpoint(self, eager: bool = True) -> "DataFrame": Parameters ---------- - eager : bool, optional - Whether to checkpoint this :class:`DataFrame` immediately (default True) + eager : bool, optional, default True + Whether to checkpoint this :class:`DataFrame` immediately. Returns ------- :class:`DataFrame` Checkpointed DataFrame. - Examples - -------- - >>> spark.sparkContext.setCheckpointDir("/tmp/bb") - >>> df.checkpoint(False) - DataFrame[age: int, name: string] - Notes ----- This API is experimental. + + Examples + -------- + >>> import tempfile + >>> df = spark.createDataFrame([ + ... (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> with tempfile.TemporaryDirectory() as d: + ... spark.sparkContext.setCheckpointDir("/tmp/bb") + ... df.checkpoint(False) + DataFrame[age: bigint, name: string] """ jdf = self._jdf.checkpoint(eager) return DataFrame(jdf, self.sparkSession) @@ -863,22 +950,24 @@ def localCheckpoint(self, eager: bool = True) -> "DataFrame": Parameters ---------- - eager : bool, optional - Whether to checkpoint this :class:`DataFrame` immediately (default True) + eager : bool, optional, default True + Whether to checkpoint this :class:`DataFrame` immediately. Returns ------- :class:`DataFrame` Checkpointed DataFrame. - Examples - -------- - >>> df.localCheckpoint(False) - DataFrame[age: int, name: string] - Notes ----- This API is experimental. + + Examples + -------- + >>> df = spark.createDataFrame([ + ... (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> df.localCheckpoint(False) + DataFrame[age: bigint, name: string] """ jdf = self._jdf.localCheckpoint(eager) return DataFrame(jdf, self.sparkSession) @@ -915,17 +1004,33 @@ def withWatermark(self, eventTime: str, delayThreshold: str) -> "DataFrame": :class:`DataFrame` Watermarked DataFrame - Examples - -------- - >>> from pyspark.sql.functions import timestamp_seconds - >>> sdf.select( - ... 'name', - ... timestamp_seconds(sdf.time).alias('time')).withWatermark('time', '10 minutes') - DataFrame[name: string, time: timestamp] - Notes ----- + This is a feature only for Structured Streaming. + This API is evolving. + + Examples + -------- + >>> from pyspark.sql import Row + >>> from pyspark.sql.functions import timestamp_seconds + >>> df = spark.readStream.format("rate").load().selectExpr( + ... "value % 5 AS value", "timestamp") + >>> df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes') + DataFrame[value: bigint, time: timestamp] + + Group the data by window and value (0 - 4), and compute the count of each group. + + >>> import time + >>> from pyspark.sql.functions import window + >>> query = (df + ... .withWatermark("timestamp", "10 minutes") + ... .groupBy( + ... window(df.timestamp, "10 minutes", "5 minutes"), + ... df.value) + ... ).count().writeStream.outputMode("complete").format("console").start() + >>> time.sleep(3) + >>> query.stop() """ if not eventTime or type(eventTime) is not str: raise TypeError("eventTime should be provided as a string") @@ -955,12 +1060,21 @@ def hint( Examples -------- - >>> df.join(df2.hint("broadcast"), "name").show() - +----+---+------+ - |name|age|height| - +----+---+------+ - | Bob| 5| 85| - +----+---+------+ + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) + >>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")]) + >>> df.join(df2, "name").explain() + == Physical Plan == + ... + ... +- SortMergeJoin ... + ... + + Explicitly trigger the broadcast hashjoin by providing the hint in ``df2``. + + >>> df.join(df2.hint("broadcast"), "name").explain() + == Physical Plan == + ... + ... +- BroadcastHashJoin ... + ... """ if len(parameters) == 1 and isinstance(parameters[0], list): parameters = parameters[0] # type: ignore[assignment] @@ -992,8 +1106,13 @@ def count(self) -> int: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + + Return the number of rows in the :class:`DataFrame`. + >>> df.count() - 2 + 3 """ return int(self._jdf.count()) @@ -1009,8 +1128,10 @@ def collect(self) -> List[Row]: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) >>> df.collect() - [Row(age=2, name='Alice'), Row(age=5, name='Bob')] + [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')] """ with SCCallSiteSync(self._sc): sock_info = self._jdf.collectToPython() @@ -1037,8 +1158,10 @@ def toLocalIterator(self, prefetchPartitions: bool = False) -> Iterator[Row]: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) >>> list(df.toLocalIterator()) - [Row(age=2, name='Alice'), Row(age=5, name='Bob')] + [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')] """ with SCCallSiteSync(self._sc): sock_info = self._jdf.toPythonIterator(prefetchPartitions) @@ -1062,10 +1185,19 @@ def limit(self, num: int) -> "DataFrame": Examples -------- - >>> df.limit(1).collect() - [Row(age=2, name='Alice')] - >>> df.limit(0).collect() - [] + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> df.limit(1).show() + +---+----+ + |age|name| + +---+----+ + | 14| Tom| + +---+----+ + >>> df.limit(0).show() + +---+----+ + |age|name| + +---+----+ + +---+----+ """ jdf = self._jdf.limit(num) return DataFrame(jdf, self.sparkSession) @@ -1088,8 +1220,13 @@ def take(self, num: int) -> List[Row]: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + + Return the first 2 rows of the :class:`DataFrame`. + >>> df.take(2) - [Row(age=2, name='Alice'), Row(age=5, name='Bob')] + [Row(age=14, name='Tom'), Row(age=23, name='Alice')] """ return self.limit(num).collect() @@ -1115,8 +1252,11 @@ def tail(self, num: int) -> List[Row]: Examples -------- - >>> df.tail(1) - [Row(age=5, name='Bob')] + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + + >>> df.tail(2) + [Row(age=23, name='Alice'), Row(age=16, name='Bob')] """ with SCCallSiteSync(self._sc): sock_info = self._jdf.tailToPython(num) @@ -1135,15 +1275,13 @@ def foreach(self, f: Callable[[Row], None]) -> None: A function that accepts one parameter which will receive each row to process. - Returns - ------- - None - Examples -------- - >>> def f(person): + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> def func(person): ... print(person.name) - >>> df.foreach(f) + >>> df.foreach(func) """ self.rdd.foreach(f) @@ -1160,16 +1298,14 @@ def foreachPartition(self, f: Callable[[Iterator[Row]], None]) -> None: A function that accepts one parameter which will receive each partition to process. - Returns - ------- - None - Examples -------- - >>> def f(people): - ... for person in people: + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> def func(itr): + ... for person in itr: ... print(person.name) - >>> df.foreachPartition(f) + >>> df.foreachPartition(func) """ self.rdd.foreachPartition(f) # type: ignore[arg-type] @@ -1227,6 +1363,9 @@ def persist( >>> df = spark.range(1) >>> df.persist() DataFrame[id: bigint] + + Persists the data in the disk by specifying the storage level. + >>> from pyspark.storagelevel import StorageLevel >>> df.persist(StorageLevel.DISK_ONLY) DataFrame[id: bigint] @@ -1249,10 +1388,13 @@ def storageLevel(self) -> StorageLevel: Examples -------- - >>> df.storageLevel + >>> df1 = spark.range(10) + >>> df1.storageLevel StorageLevel(False, False, False, False, 1) - >>> df.cache().storageLevel + >>> df1.cache().storageLevel StorageLevel(True, True, False, True, 1) + + >>> df2 = spark.range(5) >>> df2.persist(StorageLevel.DISK_ONLY_2).storageLevel StorageLevel(True, False, False, False, 2) """ @@ -1331,6 +1473,7 @@ def coalesce(self, numPartitions: int) -> "DataFrame": Examples -------- + >>> df = spark.range(10) >>> df.coalesce(1).rdd.getNumPartitions() 1 """ @@ -1362,7 +1505,7 @@ def repartition( # type: ignore[misc] cols : str or :class:`Column` partitioning columns. - .. versionchanged:: 1.6 + .. versionchanged:: 1.6.0 Added optional arguments to specify the partitioning columns. Also made numPartitions optional if partitioning columns are specified. @@ -1373,40 +1516,23 @@ def repartition( # type: ignore[misc] Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + + Repartition the data into 10 partitions. + >>> df.repartition(10).rdd.getNumPartitions() 10 - >>> data = df.union(df).repartition("age") - >>> data.show() - +---+-----+ - |age| name| - +---+-----+ - | 2|Alice| - | 5| Bob| - | 2|Alice| - | 5| Bob| - +---+-----+ - >>> data = data.repartition(7, "age") - >>> data.show() - +---+-----+ - |age| name| - +---+-----+ - | 2|Alice| - | 5| Bob| - | 2|Alice| - | 5| Bob| - +---+-----+ - >>> data.rdd.getNumPartitions() + + Repartition the data into 7 partitions by 'age' column. + + >>> df.repartition(7, "age").rdd.getNumPartitions() 7 - >>> data = data.repartition(3, "name", "age") - >>> data.show() - +---+-----+ - |age| name| - +---+-----+ - | 5| Bob| - | 5| Bob| - | 2|Alice| - | 2|Alice| - +---+-----+ + + Repartition the data into 7 partitions by 'age' and 'name columns. + + >>> df.repartition(3, "name", "age").rdd.getNumPartitions() + 3 """ if isinstance(numPartitions, int): if len(cols) == 0: @@ -1437,9 +1563,6 @@ def repartitionByRange( # type: ignore[misc] Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The resulting :class:`DataFrame` is range partitioned. - At least one partition-by expression must be specified. - When no explicit sort order is specified, "ascending nulls first" is assumed. - .. versionadded:: 2.4.0 Parameters @@ -1458,6 +1581,9 @@ def repartitionByRange( # type: ignore[misc] Notes ----- + At least one partition-by expression must be specified. + When no explicit sort order is specified, "ascending nulls first" is assumed. + Due to performance reasons this method uses sampling to estimate the ranges. Hence, the output may not be consistent, since sampling can return different values. The sample size can be controlled by the config @@ -1465,25 +1591,15 @@ def repartitionByRange( # type: ignore[misc] Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + + Repartition the data into 2 partitions by range in 'age' column. + For example, the first partition can have ``(14, "Tom")``, and the second + partition would have ``(16, "Bob")`` and ``(23, "Alice")``. + >>> df.repartitionByRange(2, "age").rdd.getNumPartitions() 2 - >>> df.show() - +---+-----+ - |age| name| - +---+-----+ - | 2|Alice| - | 5| Bob| - +---+-----+ - >>> df.repartitionByRange(1, "age").rdd.getNumPartitions() - 1 - >>> data = df.repartitionByRange("age") - >>> df.show() - +---+-----+ - |age| name| - +---+-----+ - | 2|Alice| - | 5| Bob| - +---+-----+ """ if isinstance(numPartitions, int): if len(cols) == 0: @@ -1511,6 +1627,11 @@ def distinct(self) -> "DataFrame": Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (23, "Alice")], ["age", "name"]) + + Return the number of distinct rows in the :class:`DataFrame` + >>> df.distinct().count() 2 """ @@ -1630,7 +1751,7 @@ def sampleBy( col : :class:`Column` or str column that defines strata - .. versionchanged:: 3.0 + .. versionchanged:: 3.0.0 Added sampling by a column of :class:`Column` fractions : dict sampling fraction for each stratum. If a stratum is not @@ -1645,7 +1766,7 @@ def sampleBy( Examples -------- >>> from pyspark.sql.functions import col - >>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key")) + >>> dataset = spark.range(0, 100).select((col("id") % 3).alias("key")) >>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0) >>> sampled.groupBy("key").count().orderBy("key").show() +---+-----+ @@ -1693,10 +1814,17 @@ def randomSplit(self, weights: List[float], seed: Optional[int] = None) -> List[ Examples -------- - >>> splits = df4.randomSplit([1.0, 2.0], 24) + >>> from pyspark.sql import Row + >>> df = spark.createDataFrame([ + ... Row(age=10, height=80, name="Alice"), + ... Row(age=5, height=None, name="Bob"), + ... Row(age=None, height=None, name="Tom"), + ... Row(age=None, height=None, name=None), + ... ]) + + >>> splits = df.randomSplit([1.0, 2.0], 24) >>> splits[0].count() 2 - >>> splits[1].count() 2 """ @@ -1722,8 +1850,10 @@ def dtypes(self) -> List[Tuple[str, str]]: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) >>> df.dtypes - [('age', 'int'), ('name', 'string')] + [('age', 'bigint'), ('name', 'string')] """ return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields] @@ -1740,6 +1870,8 @@ def columns(self) -> List[str]: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) >>> df.columns ['age', 'name'] """ @@ -1783,24 +1915,6 @@ def to(self, schema: StructType) -> "DataFrame": Returns a new :class:`DataFrame` where each row is reconciled to match the specified schema. - Notes - ----- - 1, Reorder columns and/or inner fields by name to match the specified schema. - - 2, Project away columns and/or inner fields that are not needed by the specified schema. - Missing columns and/or inner fields (present in the specified schema but not input - DataFrame) lead to failures. - - 3, Cast the columns and/or inner fields to match the data types in the specified schema, - if the types are compatible, e.g., numeric to numeric (error if overflows), but not string - to int. - - 4, Carry over the metadata from the specified schema, while the columns and/or inner fields - still keep their own metadata if not overwritten by the specified schema. - - 5, Fail if the nullability is not compatible. For example, the column and/or inner field - is nullable but the specified schema requires them to be not nullable. - .. versionadded:: 3.4.0 Parameters @@ -1813,11 +1927,31 @@ def to(self, schema: StructType) -> "DataFrame": :class:`DataFrame` Reconciled DataFrame. + Notes + ----- + * Reorder columns and/or inner fields by name to match the specified schema. + + * Project away columns and/or inner fields that are not needed by the specified schema. + Missing columns and/or inner fields (present in the specified schema but not input + DataFrame) lead to failures. + + * Cast the columns and/or inner fields to match the data types in the specified schema, + if the types are compatible, e.g., numeric to numeric (error if overflows), but + not string to int. + + * Carry over the metadata from the specified schema, while the columns and/or inner fields + still keep their own metadata if not overwritten by the specified schema. + + * Fail if the nullability is not compatible. For example, the column and/or inner field + is nullable but the specified schema requires them to be not nullable. + Examples -------- + >>> from pyspark.sql.types import StructField, StringType >>> df = spark.createDataFrame([("a", 1)], ["i", "j"]) >>> df.schema StructType([StructField('i', StringType(), True), StructField('j', LongType(), True)]) + >>> schema = StructType([StructField("j", StringType()), StructField("i", StringType())]) >>> df2 = df.to(schema) >>> df2.schema @@ -1850,13 +1984,21 @@ def alias(self, alias: str) -> "DataFrame": Examples -------- - >>> from pyspark.sql.functions import * + >>> from pyspark.sql.functions import col, desc + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) >>> df_as1 = df.alias("df_as1") >>> df_as2 = df.alias("df_as2") >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') - >>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age") \ - .sort(desc("df_as1.name")).collect() - [Row(name='Bob', name='Bob', age=5), Row(name='Alice', name='Alice', age=2)] + >>> joined_df.select( + ... "df_as1.name", "df_as2.name", "df_as2.age").sort(desc("df_as1.name")).show() + +-----+-----+---+ + | name| name|age| + +-----+-----+---+ + | Tom| Tom| 14| + | Bob| Bob| 16| + |Alice|Alice| 23| + +-----+-----+---+ """ assert isinstance(alias, str), "alias should be a string" return DataFrame(getattr(self._jdf, "as")(alias), self.sparkSession) @@ -1878,13 +2020,22 @@ def crossJoin(self, other: "DataFrame") -> "DataFrame": Examples -------- - >>> df.select("age", "name").collect() - [Row(age=2, name='Alice'), Row(age=5, name='Bob')] - >>> df2.select("name", "height").collect() - [Row(name='Tom', height=80), Row(name='Bob', height=85)] - >>> df.crossJoin(df2.select("height")).select("age", "name", "height").collect() - [Row(age=2, name='Alice', height=80), Row(age=2, name='Alice', height=85), - Row(age=5, name='Bob', height=80), Row(age=5, name='Bob', height=85)] + >>> from pyspark.sql import Row + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> df2 = spark.createDataFrame( + ... [Row(height=80, name="Tom"), Row(height=85, name="Bob")]) + >>> df.crossJoin(df2.select("height")).select("age", "name", "height").show() + +---+-----+------+ + |age| name|height| + +---+-----+------+ + | 14| Tom| 80| + | 14| Tom| 85| + | 23|Alice| 80| + | 23|Alice| 85| + | 16| Bob| 80| + | 16| Bob| 85| + +---+-----+------+ """ jdf = self._jdf.crossJoin(other._jdf) @@ -1924,23 +2075,66 @@ def join( -------- The following performs a full outer join between ``df1`` and ``df2``. + >>> from pyspark.sql import Row >>> from pyspark.sql.functions import desc - >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height) \ - .sort(desc("name")).collect() - [Row(name='Bob', height=85), Row(name='Alice', height=None), Row(name=None, height=80)] - - >>> df.join(df2, 'name', 'outer').select('name', 'height').sort(desc("name")).collect() - [Row(name='Tom', height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)] - - >>> cond = [df.name == df3.name, df.age == df3.age] - >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() - [Row(name='Alice', age=2), Row(name='Bob', age=5)] - - >>> df.join(df2, 'name').select(df.name, df2.height).collect() - [Row(name='Bob', height=85)] - - >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect() - [Row(name='Bob', age=5)] + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")]).toDF("age", "name") + >>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")]) + >>> df3 = spark.createDataFrame([Row(age=2, name="Alice"), Row(age=5, name="Bob")]) + >>> df4 = spark.createDataFrame([ + ... Row(age=10, height=80, name="Alice"), + ... Row(age=5, height=None, name="Bob"), + ... Row(age=None, height=None, name="Tom"), + ... Row(age=None, height=None, name=None), + ... ]) + + Inner join on columns (default) + + >>> df.join(df2, 'name').select(df.name, df2.height).show() + +----+------+ + |name|height| + +----+------+ + | Bob| 85| + +----+------+ + >>> df.join(df4, ['name', 'age']).select(df.name, df.age).show() + +----+---+ + |name|age| + +----+---+ + | Bob| 5| + +----+---+ + + Outer join for both DataFrames on 'name' column. + + >>> df.join(df2, df.name == df2.name, 'outer').select( + ... df.name, df2.height).sort(desc("name")).show() + +-----+------+ + | name|height| + +-----+------+ + | Bob| 85| + |Alice| null| + | null| 80| + +-----+------+ + >>> df.join(df2, 'name', 'outer').select('name', 'height').sort(desc("name")).show() + +-----+------+ + | name|height| + +-----+------+ + | Tom| 80| + | Bob| 85| + |Alice| null| + +-----+------+ + + Outer join for both DataFrams with multiple columns. + + >>> df.join( + ... df3, + ... [df.name == df3.name, df.age == df3.age], + ... 'outer' + ... ).select(df.name, df3.age).show() + +-----+---+ + | name|age| + +-----+---+ + |Alice| 2| + | Bob| 5| + +-----+---+ """ if on is not None and not isinstance(on, list): @@ -1984,8 +2178,6 @@ def _joinAsOf( This is similar to a left-join except that we match on nearest key rather than equal keys. - .. versionadded:: 3.3.0 - Parameters ---------- other : :class:`DataFrame` @@ -2100,8 +2292,8 @@ def sortWithinPartitions( Other Parameters ---------------- - ascending : bool or list, optional - boolean or list of boolean (default ``True``). + ascending : bool or list, optional, default True + boolean or list of boolean. Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the `cols`. @@ -2112,13 +2304,9 @@ def sortWithinPartitions( Examples -------- - >>> df.sortWithinPartitions("age", ascending=False).show() - +---+-----+ - |age| name| - +---+-----+ - | 2|Alice| - | 5| Bob| - +---+-----+ + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) + >>> df.sortWithinPartitions("age", ascending=False) + DataFrame[age: bigint, name: string] """ jdf = self._jdf.sortWithinPartitions(self._sort_cols(cols, kwargs)) return DataFrame(jdf, self.sparkSession) @@ -2137,8 +2325,8 @@ def sort( Other Parameters ---------------- - ascending : bool or list, optional - boolean or list of boolean (default ``True``). + ascending : bool or list, optional, default True + boolean or list of boolean. Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the `cols`. @@ -2149,19 +2337,67 @@ def sort( Examples -------- - >>> df.sort(df.age.desc()).collect() - [Row(age=5, name='Bob'), Row(age=2, name='Alice')] - >>> df.sort("age", ascending=False).collect() - [Row(age=5, name='Bob'), Row(age=2, name='Alice')] - >>> df.orderBy(df.age.desc()).collect() - [Row(age=5, name='Bob'), Row(age=2, name='Alice')] - >>> from pyspark.sql.functions import * - >>> df.sort(asc("age")).collect() - [Row(age=2, name='Alice'), Row(age=5, name='Bob')] - >>> df.orderBy(desc("age"), "name").collect() - [Row(age=5, name='Bob'), Row(age=2, name='Alice')] - >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect() - [Row(age=5, name='Bob'), Row(age=2, name='Alice')] + >>> from pyspark.sql.functions import desc, asc + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (5, "Bob")], schema=["age", "name"]) + + Sort the DataFrame in ascending order. + + >>> df.sort(asc("age")).show() + +---+-----+ + |age| name| + +---+-----+ + | 2|Alice| + | 5| Bob| + +---+-----+ + + Sort the DataFrame in descending order. + + >>> df.sort(df.age.desc()).show() + +---+-----+ + |age| name| + +---+-----+ + | 5| Bob| + | 2|Alice| + +---+-----+ + >>> df.orderBy(df.age.desc()).show() + +---+-----+ + |age| name| + +---+-----+ + | 5| Bob| + | 2|Alice| + +---+-----+ + >>> df.sort("age", ascending=False).show() + +---+-----+ + |age| name| + +---+-----+ + | 5| Bob| + | 2|Alice| + +---+-----+ + + Specify miltiple columns + + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (2, "Bob"), (5, "Bob")], schema=["age", "name"]) + >>> df.orderBy(desc("age"), "name").show() + +---+-----+ + |age| name| + +---+-----+ + | 5| Bob| + | 2|Alice| + | 2| Bob| + +---+-----+ + + Specify miltiple columns for sorting order at `ascending`. + + >>> df.orderBy(["age", "name"], ascending=[False, False]).show() + +---+-----+ + |age| name| + +---+-----+ + | 5| Bob| + | 2| Bob| + | 2|Alice| + +---+-----+ """ jdf = self._jdf.sort(self._sort_cols(cols, kwargs)) return DataFrame(jdf, self.sparkSession) @@ -2285,12 +2521,6 @@ def summary(self, *statistics: str) -> "DataFrame": .. versionadded:: 2.3.0 - Notes - ----- - This function is meant for exploratory data analysis, as we make no - guarantee about the backward compatibility of the schema of the resulting - :class:`DataFrame`. - Parameters ---------- statistics : str, optional @@ -2301,6 +2531,12 @@ def summary(self, *statistics: str) -> "DataFrame": :class:`DataFrame` A new DataFrame that provides statistics for the given DataFrame. + Notes + ----- + This function is meant for exploratory data analysis, as we make no + guarantee about the backward compatibility of the schema of the resulting + :class:`DataFrame`. + Examples -------- >>> df = spark.createDataFrame( @@ -2371,6 +2607,8 @@ def head(self, n: Optional[int] = None) -> Union[Optional[Row], List[Row]]: Examples -------- + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.head() Row(age=2, name='Alice') >>> df.head(1) @@ -2388,10 +2626,13 @@ def first(self) -> Optional[Row]: Returns ------- - First Row if DataFrame is not empty, otherwise None. + :class:`Row` + First row if :class:`DataFrame` is not empty, otherwise ``None``. Examples -------- + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.first() Row(age=2, name='Alice') """ @@ -2412,14 +2653,40 @@ def __getitem__(self, item: Union[int, str, Column, List, Tuple]) -> Union[Colum Examples -------- - >>> df.select(df['age']).collect() - [Row(age=2), Row(age=5)] - >>> df[ ["name", "age"]].collect() - [Row(name='Alice', age=2), Row(name='Bob', age=5)] - >>> df[ df.age > 3 ].collect() - [Row(age=5, name='Bob')] - >>> df[df[0] > 3].collect() - [Row(age=5, name='Bob')] + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (5, "Bob")], schema=["age", "name"]) + + Retrieve a column instance. + + >>> df.select(df['age']).show() + +---+ + |age| + +---+ + | 2| + | 5| + +---+ + + Select multiple string columns as index. + + >>> df[["name", "age"]].show() + +-----+---+ + | name|age| + +-----+---+ + |Alice| 2| + | Bob| 5| + +-----+---+ + >>> df[df.age > 3].show() + +---+----+ + |age|name| + +---+----+ + | 5| Bob| + +---+----+ + >>> df[df[0] > 3].show() + +---+----+ + |age|name| + +---+----+ + | 5| Bob| + +---+----+ """ if isinstance(item, str): jc = self._jdf.apply(item) @@ -2451,10 +2718,18 @@ def __getattr__(self, name: str) -> Column: Examples -------- - >>> df.select(df.age).collect() - [Row(age=2), Row(age=5)] - >>> df["age"] - Column<'age'> + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (5, "Bob")], schema=["age", "name"]) + + Retrieve a column instance. + + >>> df.select(df.age).show() + +---+ + |age| + +---+ + | 2| + | 5| + +---+ """ if name not in self.columns: raise AttributeError( @@ -2490,12 +2765,28 @@ def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] Examples -------- - >>> df.select('*').collect() - [Row(age=2, name='Alice'), Row(age=5, name='Bob')] - >>> df.select('name', 'age').collect() - [Row(name='Alice', age=2), Row(name='Bob', age=5)] - >>> df.select(df.name, (df.age + 10).alias('age')).collect() - [Row(name='Alice', age=12), Row(name='Bob', age=15)] + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (5, "Bob")], schema=["age", "name"]) + + Select all columns in the DataFrame. + + >>> df.select('*').show() + +---+-----+ + |age| name| + +---+-----+ + | 2|Alice| + | 5| Bob| + +---+-----+ + + Select a column with other expressions in the DataFrame. + + >>> df.select(df.name, (df.age + 10).alias('age')).show() + +-----+---+ + | name|age| + +-----+---+ + |Alice| 12| + | Bob| 15| + +-----+---+ """ jdf = self._jdf.select(self._jcols(*cols)) return DataFrame(jdf, self.sparkSession) @@ -2522,8 +2813,15 @@ def selectExpr(self, *expr: Union[str, List[str]]) -> "DataFrame": Examples -------- - >>> df.selectExpr("age * 2", "abs(age)").collect() - [Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)] + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (5, "Bob")], schema=["age", "name"]) + >>> df.selectExpr("age * 2", "abs(age)").show() + +---------+--------+ + |(age * 2)|abs(age)| + +---------+--------+ + | 4| 2| + | 10| 5| + +---------+--------+ """ if len(expr) == 1 and isinstance(expr[0], list): expr = expr[0] # type: ignore[assignment] @@ -2550,15 +2848,38 @@ def filter(self, condition: "ColumnOrName") -> "DataFrame": Examples -------- - >>> df.filter(df.age > 3).collect() - [Row(age=5, name='Bob')] - >>> df.where(df.age == 2).collect() - [Row(age=2, name='Alice')] + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (5, "Bob")], schema=["age", "name"]) - >>> df.filter("age > 3").collect() - [Row(age=5, name='Bob')] - >>> df.where("age = 2").collect() - [Row(age=2, name='Alice')] + Filter by :class:`Column` instances. + + >>> df.filter(df.age > 3).show() + +---+----+ + |age|name| + +---+----+ + | 5| Bob| + +---+----+ + >>> df.where(df.age == 2).show() + +---+-----+ + |age| name| + +---+-----+ + | 2|Alice| + +---+-----+ + + Filter by SQL expression in a string. + + >>> df.filter("age > 3").show() + +---+----+ + |age|name| + +---+----+ + | 5| Bob| + +---+----+ + >>> df.where("age = 2").show() + +---+-----+ + |age| name| + +---+-----+ + | 2|Alice| + +---+-----+ """ if isinstance(condition, str): jdf = self._jdf.filter(condition) @@ -2599,14 +2920,48 @@ def groupBy(self, *cols: "ColumnOrName") -> "GroupedData": # type: ignore[misc] Examples -------- - >>> df.groupBy().avg().collect() - [Row(avg(age)=3.5)] - >>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect()) - [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)] - >>> sorted(df.groupBy(df.name).avg().collect()) - [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)] - >>> sorted(df.groupBy(['name', df.age]).count().collect()) - [Row(name='Alice', age=2, count=1), Row(name='Bob', age=5, count=1)] + >>> df = spark.createDataFrame([ + ... (2, "Alice"), (2, "Bob"), (2, "Bob"), (5, "Bob")], schema=["age", "name"]) + + Empty grouping columns triggers a global aggregation. + + >>> df.groupBy().avg().show() + +--------+ + |avg(age)| + +--------+ + | 2.75| + +--------+ + + Group-by 'name', and specify a dictionary to calculate the summation of 'age'. + + >>> df.groupBy("name").agg({"age": "sum"}).sort("name").show() + +-----+--------+ + | name|sum(age)| + +-----+--------+ + |Alice| 2| + | Bob| 9| + +-----+--------+ + + Group-by 'name', and calculate maximum values. + + >>> df.groupBy(df.name).max().sort("name").show() + +-----+--------+ + | name|max(age)| + +-----+--------+ + |Alice| 2| + | Bob| 5| + +-----+--------+ + + Group-by 'name' and 'age', and calculate the number of rows in each group. + + >>> df.groupBy(["name", df.age]).count().sort("name", "age").show() + +-----+---+-----+ + | name|age|count| + +-----+---+-----+ + |Alice| 2| 1| + | Bob| 2| 2| + | Bob| 5| 1| + +-----+---+-----+ """ jgd = self._jdf.groupBy(self._jcols(*cols)) from pyspark.sql.group import GroupedData @@ -2642,6 +2997,7 @@ def rollup(self, *cols: "ColumnOrName") -> "GroupedData": # type: ignore[misc] Examples -------- + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.rollup("name", df.age).count().orderBy("name", "age").show() +-----+----+-----+ | name| age|count| @@ -2687,6 +3043,7 @@ def cube(self, *cols: "ColumnOrName") -> "GroupedData": # type: ignore[misc] Examples -------- + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df.cube("name", df.age).count().orderBy("name", "age").show() +-----+----+-----+ | name| age|count| @@ -2859,11 +3216,20 @@ def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame": Examples -------- - >>> df.agg({"age": "max"}).collect() - [Row(max(age)=5)] >>> from pyspark.sql import functions as F - >>> df.agg(F.min(df.age)).collect() - [Row(min(age)=2)] + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) + >>> df.agg({"age": "max"}).show() + +--------+ + |max(age)| + +--------+ + | 5| + +--------+ + >>> df.agg(F.min(df.age)).show() + +--------+ + |min(age)| + +--------+ + | 2| + +--------+ """ return self.groupBy().agg(*exprs) # type: ignore[arg-type] @@ -2922,6 +3288,7 @@ def observe( >>> from pyspark.sql.functions import col, count, lit, max >>> from pyspark.sql import Observation + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> observation = Observation("my metrics") >>> observed_df = df.observe(observation, count(lit(1)).alias("count"), max(col("age"))) >>> observed_df.count() @@ -2976,21 +3343,11 @@ def observe( else: raise ValueError("'observation' should be either `Observation` or `str`.") - @since(2.0) def union(self, other: "DataFrame") -> "DataFrame": """Return a new :class:`DataFrame` containing union of rows in this and another :class:`DataFrame`. - This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union - (that does deduplication of elements), use this function followed by :func:`distinct`. - - Also as standard in SQL, this function resolves columns by position (not by name). - - .. versionadded:: 2.0 - - See Also - -------- - DataFrame.unionAll + .. versionadded:: 2.0.0 Parameters ---------- @@ -3001,6 +3358,17 @@ def union(self, other: "DataFrame") -> "DataFrame": ------- :class:`DataFrame` + See Also + -------- + DataFrame.unionAll + + Notes + ----- + This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union + (that does deduplication of elements), use this function followed by :func:`distinct`. + + Also as standard in SQL, this function resolves columns by position (not by name). + Examples -------- >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"]) @@ -3019,28 +3387,15 @@ def union(self, other: "DataFrame") -> "DataFrame": | 1| 2| 3| | 1| 2| 3| +----+----+----+ - """ return DataFrame(self._jdf.union(other._jdf), self.sparkSession) - @since(1.3) def unionAll(self, other: "DataFrame") -> "DataFrame": """Return a new :class:`DataFrame` containing union of rows in this and another :class:`DataFrame`. - This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union - (that does deduplication of elements), use this function followed by :func:`distinct`. - - Also as standard in SQL, this function resolves columns by position (not by name). - - :func:`unionAll` is an alias to :func:`union` - .. versionadded:: 1.3.0 - See Also - -------- - DataFrame.union - Parameters ---------- other : :class:`DataFrame` @@ -3051,6 +3406,18 @@ def unionAll(self, other: "DataFrame") -> "DataFrame": :class:`DataFrame` Combined DataFrame + Notes + ----- + This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union + (that does deduplication of elements), use this function followed by :func:`distinct`. + + Also as standard in SQL, this function resolves columns by position (not by name). + + :func:`unionAll` is an alias to :func:`union` + + See Also + -------- + DataFrame.union """ return self.union(other) @@ -3067,6 +3434,10 @@ def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> ---------- other : :class:`DataFrame` Another :class:`DataFrame` that needs to be combined. + allowMissingColumns : bool, optional, default False + Specify whether to allow missing columns. + + .. versionadded:: 3.1.0 Returns ------- @@ -3102,21 +3473,16 @@ def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> | 1| 2| 3|null| |null| 4| 5| 6| +----+----+----+----+ - - .. versionchanged:: 3.1.0 - Added optional argument `allowMissingColumns` to specify whether to allow - missing columns. """ return DataFrame(self._jdf.unionByName(other._jdf, allowMissingColumns), self.sparkSession) - @since(1.3) def intersect(self, other: "DataFrame") -> "DataFrame": """Return a new :class:`DataFrame` containing rows only in both this :class:`DataFrame` and another :class:`DataFrame`. Note that any duplicates are removed. To preserve duplicates use :func:`intersectAll`. - This is equivalent to `INTERSECT` in SQL. + .. versionadded:: 1.3.0 Parameters ---------- @@ -3128,6 +3494,10 @@ def intersect(self, other: "DataFrame") -> "DataFrame": :class:`DataFrame` Combined DataFrame. + Notes + ----- + This is equivalent to `INTERSECT` in SQL. + Examples -------- >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"]) @@ -3165,7 +3535,6 @@ def intersectAll(self, other: "DataFrame") -> "DataFrame": -------- >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"]) >>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"]) - >>> df1.intersectAll(df2).sort("C1", "C2").show() +---+---+ | C1| C2| @@ -3177,12 +3546,11 @@ def intersectAll(self, other: "DataFrame") -> "DataFrame": """ return DataFrame(self._jdf.intersectAll(other._jdf), self.sparkSession) - @since(1.3) def subtract(self, other: "DataFrame") -> "DataFrame": """Return a new :class:`DataFrame` containing rows in this :class:`DataFrame` but not in another :class:`DataFrame`. - This is equivalent to `EXCEPT DISTINCT` in SQL. + .. versionadded:: 1.3.0 Parameters ---------- @@ -3194,11 +3562,14 @@ def subtract(self, other: "DataFrame") -> "DataFrame": :class:`DataFrame` Subtracted DataFrame. + Notes + ----- + This is equivalent to `EXCEPT DISTINCT` in SQL. + Examples -------- >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"]) >>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"]) - >>> df1.subtract(df2).show() +---+---+ | C1| C2| @@ -3235,10 +3606,14 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": Examples -------- >>> from pyspark.sql import Row - >>> df = sc.parallelize([ \\ - ... Row(name='Alice', age=5, height=80), \\ - ... Row(name='Alice', age=5, height=80), \\ - ... Row(name='Alice', age=10, height=80)]).toDF() + >>> df = spark.createDataFrame([ + ... Row(name='Alice', age=5, height=80), + ... Row(name='Alice', age=5, height=80), + ... Row(name='Alice', age=10, height=80) + ... ]) + + Deduplicate the same rows. + >>> df.dropDuplicates().show() +-----+---+------+ | name|age|height| @@ -3247,6 +3622,8 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": |Alice| 10| 80| +-----+---+------+ + Deduplicate values on 'name' and 'height' columns. + >>> df.dropDuplicates(['name', 'height']).show() +-----+---+------+ | name|age|height| @@ -3294,7 +3671,14 @@ def dropna( Examples -------- - >>> df4.na.drop().show() + >>> from pyspark.sql import Row + >>> df = spark.createDataFrame([ + ... Row(age=10, height=80, name="Alice"), + ... Row(age=5, height=None, name="Bob"), + ... Row(age=None, height=None, name="Tom"), + ... Row(age=None, height=None, name=None), + ... ]) + >>> df.na.drop().show() +---+------+-----+ |age|height| name| +---+------+-----+ @@ -3358,34 +3742,48 @@ def fillna( Examples -------- - >>> df4.na.fill(50).show() - +---+------+-----+ - |age|height| name| - +---+------+-----+ - | 10| 80|Alice| - | 5| 50| Bob| - | 50| 50| Tom| - | 50| 50| null| - +---+------+-----+ - - >>> df5.na.fill(False).show() - +----+-------+-----+ - | age| name| spy| - +----+-------+-----+ - | 10| Alice|false| - | 5| Bob|false| - |null|Mallory| true| - +----+-------+-----+ - - >>> df4.na.fill({'age': 50, 'name': 'unknown'}).show() - +---+------+-------+ - |age|height| name| - +---+------+-------+ - | 10| 80| Alice| - | 5| null| Bob| - | 50| null| Tom| - | 50| null|unknown| - +---+------+-------+ + >>> df = spark.createDataFrame([ + ... (10, 80.5, "Alice", None), + ... (5, None, "Bob", None), + ... (None, None, "Tom", None), + ... (None, None, None, True)], + ... schema=["age", "height", "name", "bool"]) + + Fill all null values with 50 for numeric columns. + + >>> df.na.fill(50).show() + +---+------+-----+----+ + |age|height| name|bool| + +---+------+-----+----+ + | 10| 80.5|Alice|null| + | 5| 50.0| Bob|null| + | 50| 50.0| Tom|null| + | 50| 50.0| null|true| + +---+------+-----+----+ + + Fill all null values with ``False`` for boolean columns. + + >>> df.na.fill(False).show() + +----+------+-----+-----+ + | age|height| name| bool| + +----+------+-----+-----+ + | 10| 80.5|Alice|false| + | 5| null| Bob|false| + |null| null| Tom|false| + |null| null| null| true| + +----+------+-----+-----+ + + Fill all null values with to 50 and "unknown" for 'age' and 'name' column respectively. + + >>> df.na.fill({'age': 50, 'name': 'unknown'}).show() + +---+------+-------+----+ + |age|height| name|bool| + +---+------+-------+----+ + | 10| 80.5| Alice|null| + | 5| null| Bob|null| + | 50| null| Tom|null| + | 50| null|unknown|true| + +---+------+-------+----+ """ if not isinstance(value, (float, int, str, bool, dict)): raise TypeError("value should be a float, int, string, bool or dict") @@ -3489,43 +3887,46 @@ def replace( # type: ignore[misc] Examples -------- - >>> df4.na.replace(10, 20).show() + >>> df = spark.createDataFrame([ + ... (10, 80, "Alice"), + ... (5, None, "Bob"), + ... (None, 10, "Tom"), + ... (None, None, None)], + ... schema=["age", "height", "name"]) + + Replace 10 to 20 in all columns. + + >>> df.na.replace(10, 20).show() +----+------+-----+ | age|height| name| +----+------+-----+ | 20| 80|Alice| | 5| null| Bob| - |null| null| Tom| + |null| 20| Tom| |null| null| null| +----+------+-----+ - >>> df4.na.replace('Alice', None).show() - +----+------+----+ - | age|height|name| - +----+------+----+ - | 10| 80|null| - | 5| null| Bob| - |null| null| Tom| - |null| null|null| - +----+------+----+ + Replace 'Alice' to null in all columns. - >>> df4.na.replace({'Alice': None}).show() + >>> df.na.replace('Alice', None).show() +----+------+----+ | age|height|name| +----+------+----+ | 10| 80|null| | 5| null| Bob| - |null| null| Tom| + |null| 10| Tom| |null| null|null| +----+------+----+ - >>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show() + Replace 'Alice' to 'A', and 'Bob' to 'B' in the 'name' column. + + >>> df.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show() +----+------+----+ | age|height|name| +----+------+----+ | 10| 80| A| | 5| null| B| - |null| null| Tom| + |null| 10| Tom| |null| null|null| +----+------+----+ """ @@ -3661,9 +4062,6 @@ def approxQuantile( Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna. - Note that null values will be ignored in numerical columns before calculation. - For columns only containing null values, an empty list is returned. - .. versionadded:: 2.0.0 Parameters @@ -3671,7 +4069,7 @@ def approxQuantile( col: str, tuple or list Can be a single column name, or a list of names for multiple columns. - .. versionchanged:: 2.2 + .. versionchanged:: 2.2.0 Added support for multiple columns. probabilities : list or tuple a list of quantile probabilities @@ -3686,11 +4084,18 @@ def approxQuantile( Returns ------- list - the approximate quantiles at the given probabilities. If - the input `col` is a string, the output is a list of floats. If the - input `col` is a list or tuple of strings, the output is also a - list, but each element in it is a list of floats, i.e., the output - is a list of list of floats. + the approximate quantiles at the given probabilities. + + * If the input `col` is a string, the output is a list of floats. + + * If the input `col` is a list or tuple of strings, the output is also a + list, but each element in it is a list of floats, i.e., the output + is a list of list of floats. + + Notes + ----- + Null values will be ignored in numerical columns before calculation. + For columns only containing null values, an empty list is returned. """ if not isinstance(col, (str, list, tuple)): @@ -3877,6 +4282,12 @@ def freqItems( :class:`DataFrame` DataFrame with frequent items. + Notes + ----- + This function is meant for exploratory data analysis, as we make no + guarantee about the backward compatibility of the schema of the resulting + :class:`DataFrame`. + Examples -------- >>> df = spark.createDataFrame([(1, 11), (1, 11), (3, 10), (4, 8), (4, 8)], ["c1", "c2"]) @@ -3886,12 +4297,6 @@ def freqItems( +------------+------------+ | [4, 1, 3]| [8, 11, 10]| +------------+------------+ - - Notes - ----- - This function is meant for exploratory data analysis, as we make no - guarantee about the backward compatibility of the schema of the resulting - :class:`DataFrame`. """ if isinstance(cols, tuple): cols = list(cols) @@ -3926,8 +4331,14 @@ def withColumns(self, *colsMap: Dict[str, Column]) -> "DataFrame": Examples -------- - >>> df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).collect() - [Row(age=2, name='Alice', age2=4, age3=5), Row(age=5, name='Bob', age2=7, age3=8)] + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) + >>> df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show() + +---+-----+----+----+ + |age| name|age2|age3| + +---+-----+----+----+ + | 2|Alice| 4| 5| + | 5| Bob| 7| 8| + +---+-----+----+----+ """ # Below code is to help enable kwargs in future. assert len(colsMap) == 1 @@ -3975,9 +4386,14 @@ def withColumn(self, colName: str, col: Column) -> "DataFrame": Examples -------- - >>> df.withColumn('age2', df.age + 2).collect() - [Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)] - + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) + >>> df.withColumn('age2', df.age + 2).show() + +---+-----+----+ + |age| name|age2| + +---+-----+----+ + | 2|Alice| 4| + | 5| Bob| 7| + +---+-----+----+ """ if not isinstance(col, Column): raise TypeError("col should be Column") @@ -4003,8 +4419,14 @@ def withColumnRenamed(self, existing: str, new: str) -> "DataFrame": Examples -------- - >>> df.withColumnRenamed('age', 'age2').collect() - [Row(age2=2, name='Alice'), Row(age2=5, name='Bob')] + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) + >>> df.withColumnRenamed('age', 'age2').show() + +----+-----+ + |age2| name| + +----+-----+ + | 2|Alice| + | 5| Bob| + +----+-----+ """ return DataFrame(self._jdf.withColumnRenamed(existing, new), self.sparkSession) @@ -4027,6 +4449,7 @@ def withMetadata(self, columnName: str, metadata: Dict[str, Any]) -> "DataFrame" Examples -------- + >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) >>> df_meta = df.withMetadata('age', {'foo': 'bar'}) >>> df_meta.schema['age'].metadata {'foo': 'bar'} @@ -4064,20 +4487,37 @@ def drop(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] Examples -------- - >>> df.drop('age').collect() - [Row(name='Alice'), Row(name='Bob')] - - >>> df.drop(df.age).collect() - [Row(name='Alice'), Row(name='Bob')] - - >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect() - [Row(age=5, height=85, name='Bob')] - - >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect() - [Row(age=5, name='Bob', height=85)] - - >>> df.join(df2, 'name', 'inner').drop('age', 'height').collect() - [Row(name='Bob')] + >>> from pyspark.sql import Row + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")]) + + >>> df.drop('age').show() + +-----+ + | name| + +-----+ + | Tom| + |Alice| + | Bob| + +-----+ + >>> df.drop(df.age).show() + +-----+ + | name| + +-----+ + | Tom| + |Alice| + | Bob| + +-----+ + + Drop the column that joined both DataFrames on. + + >>> df.join(df2, df.name == df2.name, 'inner').drop('name').show() + +---+------+ + |age|height| + +---+------+ + | 16| 85| + | 14| 80| + +---+------+ """ if len(cols) == 1: col = cols[0] @@ -4099,8 +4539,10 @@ def toDF(self, *cols: "ColumnOrName") -> "DataFrame": Parameters ---------- - cols : str - new column names + *cols : tuple + a tuple of string new column name or :class:`Column`. The length of the + list needs to be the same as the number of columns in the initial + :class:`DataFrame` Returns ------- @@ -4109,8 +4551,16 @@ def toDF(self, *cols: "ColumnOrName") -> "DataFrame": Examples -------- - >>> df.toDF('f1', 'f2').collect() - [Row(f1=2, f2='Alice'), Row(f1=5, f2='Bob')] + >>> df = spark.createDataFrame([(14, "Tom"), (23, "Alice"), + ... (16, "Bob")], ["age", "name"]) + >>> df.toDF('f1', 'f2').show() + +---+-----+ + | f1| f2| + +---+-----+ + | 14| Tom| + | 23|Alice| + | 16| Bob| + +---+-----+ """ jdf = self._jdf.toDF(self._jseq(cols)) return DataFrame(jdf, self.sparkSession) @@ -4153,6 +4603,7 @@ def transform(self, func: Callable[..., "DataFrame"], *args: Any, **kwargs: Any) | 1| 1| | 2| 2| +-----+---+ + >>> def add_n(input_df, n): ... return input_df.select([(col(col_name) + n).alias(col_name) ... for col_name in input_df.columns]) @@ -4256,8 +4707,18 @@ def inputFiles(self) -> List[str]: Examples -------- - >>> df = spark.read.load("examples/src/main/resources/people.json", format="json") - >>> len(df.inputFiles()) + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as d: + ... # Write a single-row DataFrame into a JSON file + ... spark.createDataFrame( + ... [{"age": 100, "name": "Hyukjin Kwon"}] + ... ).repartition(1).write.json(d, mode="overwrite") + ... + ... # Read the JSON file as a DataFrame. + ... df = spark.read.format("json").load(d) + ... + ... # Returns the number of input files. + ... len(df.inputFiles()) 1 """ return list(self._jdf.inputFiles()) @@ -4301,6 +4762,8 @@ def writeTo(self, table: str) -> DataFrameWriterV2: Examples -------- + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) >>> df.writeTo("catalog.db.table").append() # doctest: +SKIP >>> df.writeTo( # doctest: +SKIP ... "catalog.db.table" @@ -4345,29 +4808,23 @@ def pandas_api( Examples -------- - >>> df.show() # doctest: +SKIP - +----+----+ - |Col1|Col2| - +----+----+ - | a| 1| - | b| 2| - | c| 3| - +----+----+ + >>> df = spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) >>> df.pandas_api() # doctest: +SKIP - Col1 Col2 - 0 a 1 - 1 b 2 - 2 c 3 + age name + 0 14 Tom + 1 23 Alice + 2 16 Bob We can specify the index columns. - >>> df.pandas_api(index_col="Col1"): # doctest: +SKIP - Col2 - Col1 - a 1 - b 2 - c 3 + >>> df.pandas_api(index_col="age") # doctest: +SKIP + name + age + 14 Tom + 23 Alice + 16 Bob """ from pyspark.pandas.namespace import _get_index_map from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame @@ -4538,45 +4995,18 @@ def sampleBy( def _test() -> None: import doctest - from pyspark.context import SparkContext - from pyspark.sql import Row, SQLContext, SparkSession + from pyspark.sql import SparkSession import pyspark.sql.dataframe globs = pyspark.sql.dataframe.__dict__.copy() - sc = SparkContext("local[4]", "PythonTest") - globs["sc"] = sc - globs["sqlContext"] = SQLContext(sc) - globs["spark"] = SparkSession(sc) - globs["df"] = sc.parallelize([(2, "Alice"), (5, "Bob")]).toDF( - StructType([StructField("age", IntegerType()), StructField("name", StringType())]) - ) - globs["df2"] = sc.parallelize([Row(height=80, name="Tom"), Row(height=85, name="Bob")]).toDF() - globs["df3"] = sc.parallelize([Row(age=2, name="Alice"), Row(age=5, name="Bob")]).toDF() - globs["df4"] = sc.parallelize( - [ - Row(age=10, height=80, name="Alice"), - Row(age=5, height=None, name="Bob"), - Row(age=None, height=None, name="Tom"), - Row(age=None, height=None, name=None), - ] - ).toDF() - globs["df5"] = sc.parallelize( - [ - Row(age=10, name="Alice", spy=False), - Row(age=5, name="Bob", spy=None), - Row(age=None, name="Mallory", spy=True), - ] - ).toDF() - globs["sdf"] = sc.parallelize( - [Row(name="Tom", time=1479441846), Row(name="Bob", time=1479442946)] - ).toDF() - + spark = SparkSession.builder.master("local[4]").appName("sql.dataframe tests").getOrCreate() + globs["spark"] = spark (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF, ) - globs["sc"].stop() + spark.stop() if failure_count: sys.exit(-1)