Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ hdf5

Note that also are additional software to be installed.

parquet
For using :ref:`Parquet and other Arrow formats <io_pyarrow>` via PyArrow.

remote
For reading and writing from :ref:`Remote Sources <io_remotes>` with `fsspec`.

Expand Down
13 changes: 11 additions & 2 deletions docs/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,17 @@ Avro files (fastavro)
:start-after: begin_complex_schema
:end-before: end_complex_schema

.. module:: petl.io.gsheet
.. _io_gsheet:
.. module:: petl.io.pyarrow
.. _io_pyarrow:


Parquet files
^^^^^^^^^^^^^

These functions read and write Parquet (and other Arrow formats) via PyArrow:

.. autofunction:: petl.io.fromarrow
.. autofunction:: petl.io.toarrow

Google Sheets (gspread)
^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 2 additions & 0 deletions petl/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@
from petl.io.remotes import SMBSource

from petl.io.gsheet import fromgsheet, togsheet, appendgsheet

from petl.io.arrow import fromarrow, toarrow
62 changes: 62 additions & 0 deletions petl/io/arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division

# internal dependencies
from petl.util.base import Table, header, data

__all__ = (
'fromarrow', 'toarrow',
)

def fromarrow(source, **kwargs):
# Lazy import so module can load on Python 2
import pyarrow as pa
import pyarrow.dataset as ds
fmt = kwargs.pop('format', 'parquet')
cols_opt = kwargs.pop('columns', None)
dataset = ds.dataset(source, format=fmt, **kwargs)
column_names = [field.name for field in dataset.schema]

def all_rows():
# header row
yield tuple(column_names)
# data rows
for batch in dataset.to_batches(columns=cols_opt):
for rec in batch.to_pylist():
yield tuple(rec.get(c) for c in column_names)

return Table(all_rows())


def toarrow(table, target, **kwargs):
# Lazy imports so module can load on Python 2
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
fmt = kwargs.pop('format', 'parquet')
schema = kwargs.pop('schema', None)

hdr = header(table)
rows = data(table)

# accumulate data by column
arrays = {c: [] for c in hdr}
for row in rows:
for c, v in zip(hdr, row):
arrays[c].append(v)

# build Arrow Table
arrow_tbl = pa.Table.from_pydict(arrays, schema=schema)

if fmt == 'parquet':
# single-file Parquet write
pq.write_table(arrow_tbl, target, **kwargs)
else:
# directory-based dataset write for other formats
ds.write_dataset(arrow_tbl, target, format=fmt, **kwargs)

return table

# attach to Table class
Table.fromarrow = staticmethod(fromarrow)
Table.toarrow = staticmethod(toarrow)
82 changes: 82 additions & 0 deletions petl/test/io/test_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division

# internal dependencies
from petl.util.base import Table, header, data # core PETL Table helpers

__all__ = (
'fromarrow', 'toarrow',
)

def fromarrow(source, **kwargs):
"""
Extract data from an Arrow-compatible dataset into a PETL Table.

:param source: file path, list of paths, or directory
:param format: dataset format (e.g., 'parquet', 'orc', 'ipc'); default 'parquet'
:param columns: list of columns to load; default None (all)
:param kwargs: other keyword arguments passed to pyarrow.dataset.dataset
:returns: a PETL Table with streaming rows
"""
# Lazy imports for PyArrow
import pyarrow.dataset as ds
fmt = kwargs.pop('format', 'parquet')
cols_opt = kwargs.pop('columns', None)

dataset = ds.dataset(source, format=fmt, **kwargs)
column_names = [field.name for field in dataset.schema]

def all_rows():
# header row
yield tuple(column_names)
# data rows
for batch in dataset.to_batches(columns=cols_opt):
for rec in batch.to_pylist():
yield tuple(rec.get(c) for c in column_names)

return Table(all_rows())


def toarrow(table, target, **kwargs):
"""
Write a PETL Table to an Arrow dataset or file.

:param table: PETL Table (first row is header)
:param target: output file path or directory
:param format: format name (e.g., 'parquet', 'ipc', 'orc'); default 'parquet'
:param schema: optional pa.Schema; default None (infer schema)
:param kwargs: passed to writer (pq.write_table or ds.write_dataset)
:returns: the original PETL Table
"""
# Lazy imports for PyArrow
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

fmt = kwargs.pop('format', 'parquet')
schema = kwargs.pop('schema', None)

hdr = header(table)
rows = data(table)

# accumulate data by column
arrays = {c: [] for c in hdr}
for row in rows:
for c, v in zip(hdr, row):
arrays[c].append(v)

# build Arrow Table
arrow_tbl = pa.Table.from_pydict(arrays, schema=schema)

if fmt == 'parquet':
# single-file Parquet write
pq.write_table(arrow_tbl, target, **kwargs)
else:
# directory-based dataset write for other formats
ds.write_dataset(arrow_tbl, target, format=fmt, **kwargs)

return table

# Attach methods to the PETL Table class
Table.fromarrow = staticmethod(fromarrow)
Table.toarrow = staticmethod(toarrow)
47 changes: 35 additions & 12 deletions petl/util/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,34 +240,57 @@ def __repr__(self):
return r




import operator

Check warning

Code scanning / Ruff (reported by Codacy)

Module level import not at top of file (E402)

Module level import not at top of file (E402)

Check warning

Code scanning / Ruff (reported by Codacy)

Redefinition of unused `operator` from line 8 (F811)

Redefinition of unused `operator` from line 8 (F811)

Check warning

Code scanning / Prospector (reported by Codacy)

Reimport 'operator' (imported line 8) (reimported)

Reimport 'operator' (imported line 8) (reimported)

Check warning

Code scanning / Prospector (reported by Codacy)

redefinition of unused 'operator' from line 8 (F811)

redefinition of unused 'operator' from line 8 (F811)

Check warning

Code scanning / Prospector (reported by Codacy)

Import "import operator" should be placed at the top of the module (wrong-import-position)

Import "import operator" should be placed at the top of the module (wrong-import-position)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Imports from package operator are not grouped

Imports from package operator are not grouped

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Reimport 'operator' (imported line 8)

Reimport 'operator' (imported line 8)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Import "import operator" should be placed at the top of the module

Import "import operator" should be placed at the top of the module

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

standard import "operator" should be placed before first party imports "petl.compat.imap", "petl.errors.FieldSelectionError", "petl.comparison.comparable_itemgetter"

standard import "operator" should be placed before first party imports "petl.compat.imap", "petl.errors.FieldSelectionError", "petl.comparison.comparable_itemgetter"

Check warning

Code scanning / Pylint (reported by Codacy)

standard import "import operator" should be placed before "from petl.compat import imap, izip, izip_longest, ifilter, ifilterfalse, reduce, next, string_types, text_type"

standard import "import operator" should be placed before "from petl.compat import imap, izip, izip_longest, ifilter, ifilterfalse, reduce, next, string_types, text_type"

Check warning

Code scanning / Pylint (reported by Codacy)

Import "import operator" should be placed at the top of the module

Import "import operator" should be placed at the top of the module

Check notice

Code scanning / Pylint (reported by Codacy)

Reimport 'operator' (imported line 8)

Reimport 'operator' (imported line 8)

Check warning

Code scanning / Pylint (reported by Codacy)

Imports from package operator are not grouped

Imports from package operator are not grouped

def itervalues(table, field, **kwargs):
"""
Iterate over the value(s) in the given field(s).

If field == (), and the table has exactly one column, yields 1-tuples
of each value so that `tbl.values()` on a single-column table returns
[(v,), (v,)...]. Otherwise, behaves exactly as before.
"""
missing = kwargs.get('missing', None)
it = iter(table)
try:
hdr = next(it)
except StopIteration:
hdr = []

# which column(s) were requested?
indices = asindices(hdr, field)
assert len(indices) > 0, 'no field selected'
getvalue = operator.itemgetter(*indices)

# special case: no field & single-column table -> default to that column
if not indices and field == () and len(hdr) == 1:
indices = [0]

assert indices, 'no field selected'

Check warning

Code scanning / Bandit (reported by Codacy)

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

getter = operator.itemgetter(*indices)
for row in it:
try:
value = getvalue(row)
yield value
result = getter(row)
except IndexError:
# handle short rows
if len(indices) > 1:
# try one at a time
value = list()
for i in indices:
if i < len(row):
value.append(row[i])
else:
value.append(missing)
yield tuple(value)
vals = [
row[i] if i < len(row) else missing
for i in indices
]
yield tuple(vals)
else:
yield missing
else:
# wrap single result in tuple only for our special single-column case
if len(indices) == 1 and field == ():
yield (result,)
else:
yield result





class TableWrapper(Table):
Expand Down
4 changes: 4 additions & 0 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ rinohtype

setuptools
setuptools-scm

# add pyarrow dependencies
pandas
pyarrow
1 change: 1 addition & 0 deletions requirements-formats.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ intervaltree>=3.0.2
lxml>=4.6.5
openpyxl>=2.6.2
pandas
pyarrow
Whoosh>=2.7.4
xlrd>=2.0.1
xlwt>=1.3.0
Expand Down
4 changes: 3 additions & 1 deletion requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ pytest>=4.6.6,<7.0.0
tox
coverage
coveralls
mock; python_version < '3.0'
mock; python_version < '3.0'
pandas
pyarrow
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'xlsx': ['openpyxl>=2.6.2'],
'xpath': ['lxml>=4.4.0'],
'whoosh': ['whoosh'],
"parquet": ["pyarrow>=4.0.0"]
},
use_scm_version={
"version_scheme": "guess-next-dev",
Expand Down
Loading