-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
115 lines (91 loc) · 3.83 KB
/
__init__.py
File metadata and controls
115 lines (91 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os
import zipfile
import polars as pl
import py7zr
from frictionless import describe
def unpack_file(csv_fn: str, zip_fn: str = None) -> None:
"""Unpack a file from a 7z or ZIP archive and warn if the expected
file does not exist.
Args:
zip_fn (str): Path to the 7z or ZIP archive.
csv_fn (str): Path to the expected file inside the archive.
Raises:
FileNotFoundError: If the archive does not exist or the file is not found
inside the archive.
Raises:
ImportError: If the required libraries for unpacking are not available.
"""
if zip_fn:
if not os.path.exists(zip_fn):
raise FileNotFoundError(f"File '{zip_fn}' does not exist.")
if not os.path.exists(csv_fn):
# If the zip_fn is a 7z archive, use py7zr to extract it.
with py7zr.SevenZipFile(zip_fn, mode="r") as z:
z.extractall()
# If the zip_fn is a zip file, you can use the zipfile module.
with zipfile.ZipFile(zip_fn, "r") as z:
z.extract(csv_fn)
if not os.path.exists(csv_fn):
raise FileNotFoundError(f"File '{csv_fn}' does not exist inside '{zip_fn}'.")
def read_csv_in_chunks(csv_fn, sep=",", chunk_size=100000):
"""Yield Pandas DataFrames in chunks from a CSV file.
The CSV can be local or on AWS S3.
The CSV is read by Polars in lazy mode for efficient memory usage.
Args:
csv_fn (str): Path to the CSV file.
sep (str): Separator used in the CSV file. Default is ','.
chunk_size (int): Number of rows per chunk. Default is 100,000.
Yields:
pd.DataFrame: A chunk of the CSV file as a Pandas DataFrame.
"""
df_lazy = pl.read_csv(
csv_fn,
has_header=True,
separator=sep,
quote_char='"',
encoding="utf-8",
infer_schema_length=100,
try_parse_dates=True,
low_memory=True,
rechunk=False,
).lazy()
for i in range(0, df_lazy.collect().height, chunk_size):
yield df_lazy.slice(i, chunk_size).collect().to_pandas()
def make_coldef(csv_fn: str) -> dict:
"""Create a column definition dictionary from a CSV file.
Args:
csv_fn (str): Path to the CSV file.
Returns:
dict: A dictionary with column names as keys and their data
types as values to be used in SQLMesh model DDLs as `columns` spec.
"""
if not os.path.exists(csv_fn):
raise FileNotFoundError(f"File '{csv_fn}' does not exist.")
# Use frictionless describe to generate a JSON schema.
schema = describe(csv_fn, type="schema")
# Translate the frictionless schema to a name: type coldef.
return {f.name: f.type for f in schema.fields}
def make_model_name(csv_fn: str, prefix=None, sep="_") -> str:
"""Generate a model name based on the CSV file name.
Args:
csv_fn (str): Path to the CSV file.
prefix (str, optional): A prefix to prepend to the model name. Defaults to None.
sep (str, optional): Separator to use between prefix and file name.
Defaults to '_' which results in a one tier database structure.
Use "." for a two tier database structure, or include a "." in the prefix
for a three tier database structure.
Returns:
str: A model name derived from the CSV file name.
"""
# from sqlmesh.core.signal import signal as _signal_decorator
# def _print_registered_signals():
# print("🔍 Helper called inside model DDL - Registered signals:")
# for s in _signal_decorator.get_registry().values():
# print(f" - {s.name}")
# print("🔍 Helper called inside model DDL end")
# _print_registered_signals()
base_name = os.path.basename(csv_fn)
name, _ = os.path.splitext(base_name)
if prefix:
name = f"{prefix}{sep}{name}"
return name