Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions python/docs/source/user_guide/sql/python_data_source.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ Method that needs to be implemented for a capability:
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)

def writer(self, schema: StructType, overwrite: bool):
return FakeDataSourceWriter(self.options)

def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)

Expand All @@ -83,8 +86,8 @@ Method that needs to be implemented for a capability:
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)

Implementing Reader for Python Data Source
------------------------------------------
Implementing Batch Reader and Writer for Python Data Source
-----------------------------------------------------------
**Implement the Reader**

Define the reader logic to generate synthetic data. Use the `faker` library to populate each field in the schema.
Expand All @@ -109,6 +112,43 @@ Define the reader logic to generate synthetic data. Use the `faker` library to p
row.append(value)
yield tuple(row)

**Implement the Writer**

Create a fake data source writer that processes each partition of data, counts the rows, and either
prints the total count of rows after a successful write or the number of failed tasks if the writing process fails.

.. code-block:: python

from dataclasses import dataclass
from typing import Iterator, List
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from typing import Iterator, List
from typing import Iterator, List

per PEP 8


from pyspark.sql.types import Row
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int

class FakeDataSourceWriter(DataSourceWriter):

def write(self, rows: Iterator[Row]) -> SimpleCommitMessage:
from pyspark import TaskContext
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can import this on the top together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import actually needs to be inside the write method otherwise it will throw a serialization error.


context = TaskContext.get()
partition_id = context.partitionId()
cnt = sum(1 for _ in rows)
return SimpleCommitMessage(partition_id=partition_id, count=cnt)

def commit(self, messages: List[SimpleCommitMessage]) -> None:
total_count = sum(message.count for message in messages)
print(f"Total number of rows: {total_count}")

def abort(self, messages: List[SimpleCommitMessage]) -> None:
failed_count = sum(message is None for message in messages)
print(f"Number of failed tasks: {failed_count}")


Implementing Streaming Reader and Writer for Python Data Source
---------------------------------------------------------------
**Implement the Stream Reader**
Expand Down Expand Up @@ -267,7 +307,9 @@ After defining your data source, it must be registered before usage.

spark.dataSource.register(FakeDataSource)

Use the fake datasource with the default schema and options:
**Read From a Python Data Source**

Read from the fake datasource with the default schema and options:

.. code-block:: python

Expand All @@ -281,7 +323,7 @@ Use the fake datasource with the default schema and options:
# | Amy Martin|1988-10-28| 68076| Oregon|
# +-----------+----------+-------+-------+

Use the fake datasource with a custom schema:
Read from the fake datasource with a custom schema:

.. code-block:: python

Expand All @@ -295,7 +337,7 @@ Use the fake datasource with a custom schema:
# |Mrs. Jacqueline Brown|Maynard Inc |
# +---------------------+--------------+

Use the fake datasource with a different number of rows:
Read from the fake datasource with a different number of rows:

.. code-block:: python

Expand All @@ -311,6 +353,18 @@ Use the fake datasource with a different number of rows:
# | Douglas James|2007-01-18| 46226| Alabama|
# +--------------+----------+-------+------------+

**Write To a Python Data Source**

To write data to a custom location, make sure that you specify the `mode()` clause. Supported modes are `append` and `overwrite`.

.. code-block:: python

df = spark.range(0, 10, 1, 5)
df.write.format("fake").mode("append").save()

# You can check the Spark log (standard error) to see the output of the write operation.
# Total number of rows: 10

**Use a Python Data Source in Streaming Query**

Once we register the python data source, we can also use it in streaming queries as source of readStream() or sink of writeStream() by passing short name or full name to format().
Expand All @@ -319,7 +373,7 @@ Start a query that read from fake python data source and write to console

.. code-block:: python

query = spark.readStream.format("fake").load().writeStream().format("console").start()
query = spark.readStream.format("fake").load().writeStream.format("console").start()

# +---+
# | id|
Expand All @@ -338,4 +392,4 @@ We can also use the same data source in streaming reader and writer

.. code-block:: python

query = spark.readStream.format("fake").load().writeStream().format("fake").start("/output_path")
query = spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")