Skip to content
Merged
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
Fokko committed Jul 8, 2024
commit a0c0c573f777f55cb115b56db21e2a7885db8f82
14 changes: 10 additions & 4 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,11 +1059,17 @@ def _task_to_table(
positional_deletes: Optional[List[ChunkedArray]],
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
) -> pa.Table:
batches = _task_to_record_batches(
fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping
) -> Optional[pa.Table]:
batches = list(
_task_to_record_batches(
fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping
)
)
return pa.Table.from_batches(batches)

if len(batches) > 0:
return pa.Table.from_batches(batches)
else:
return None


def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
Expand Down
5 changes: 3 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1884,8 +1884,9 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:

from pyiceberg.io.pyarrow import project_batches, schema_to_pyarrow

target_schema = schema_to_pyarrow(self.projection())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we are making an opinionated decision on whether we are using large or small type as the pyarrow schema when reading the Iceberg table as a RecordBatchReader. Is there a reason why we don't want to do the same for the table API? I've noticed that we've changed the return type of the Table API to Optional[pa.Table] in order to avoid having to use schema_to_pyarrow.

Similarly, other libraries like polars use the approach of choosing one type over the other (large types in the case of polars).

>>> strings = pa.array(["a", "b"])
>>> pydict = {"strings": strings}
>>> pa.Table.from_pydict(pydict)
pyarrow.Table
strings: string
----
strings: [["a","b"]]
>>> pq.write_table(pa.Table.from_pydict(pydict), "strings.parquet")
>>> pldf = pl.read_parquet("strings.parquet", use_pyarrow=True)
>>> pldf.dtypes
[String]
>>> pldf.to_arrow()
pyarrow.Table
strings: large_string
----
strings: [["a","b"]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to let Arrow decide. For Polars it is different because they are also the query engine. Casting the types will recompute the buffers, consuming additional memory/CPU, which I would rather avoid.

For the table, we first materialize all the batches in memory, so if one of them is large, it will automatically upcast, otherwise, it will keep the small types.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My knowledge on Parquet data to Arrow buffer conversion is less versed, so please do check me if I am not making much sense 🙂

But are we actually casting the types on read?

We make a decision on whether we are choosing to read with large or small types when instantiating the fragment scanner, which loads the parquet data into the Arrow buffers. The schema_to_pyarrow() calls to pa.Table or pa.RecordBatchReader or in to_requested_schema following that all represent the Table schema in the consistent (large or small) format which shouldn't result in any additional casting and reassigning of buffers.

I think the only time we are casting the types is on write, where we may want to downcast it for forward compatibility. It looks like we have to choose a schema to use on write anyways, because using a schema for the ParquetWriter that isn't consistent with the schema within the dataframe results in an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only time we are casting the types is on write, where we may want to downcast it for forward compatibility.

+1 Currently, we use "large_*" types during write. I think it could be better if we can write file based on the input pyarrow dataframe schema: if the dataframe is string, we also write with string

return pa.RecordBatchReader.from_batches(
schema_to_pyarrow(self.projection()),
target_schema,
project_batches(
self.plan_files(),
self.table_metadata,
Expand All @@ -1895,7 +1896,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
case_sensitive=self.case_sensitive,
limit=self.limit,
),
)
).cast(target_schema=target_schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I had originally worked on #786 I thought of this approach as well, but ran into issues like:

tests/integration/test_reads.py::test_pyarrow_batches_deletes[session_catalog_hive] - pyarrow.lib.ArrowTypeError: Field 0 cannot be cast from date32[day] to date32[day]

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/ipc.pxi", line 800, in pyarrow.lib.RecordBatchReader.cast
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Field 0 cannot be cast from date32[day] to date32[day]

As a workaround, I opted to cast each pa.Array individually within to_requested_schema, rather than using this API.

This issue is fixed in apache/arrow#41884, but until we use the new release, I don't think we will be able to use this approach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, good to see that it has been fixed. I was also pushing a patch: apache/arrow#43183 The RC0 of Arrow 17 has been cut, so the release should be there anytime soon


def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
return self.to_arrow().to_pandas(**kwargs)
Expand Down