Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/models/customer_revenue_by_day.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ WHERE
o.ds BETWEEN @start_ds AND @end_ds
GROUP BY
o.customer_id,
o.ds
o.ds
2 changes: 1 addition & 1 deletion example/models/customers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ MODEL (

SELECT DISTINCT
customer_id::INT AS customer_id
FROM sushi.orders AS o
FROM sushi.orders AS o
2 changes: 1 addition & 1 deletion example/models/raw_items.py → example/models/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@model(
"""
MODEL(
name raw.items,
name sushi.items,
kind incremental,
time_column ds,
start 'Jan 1 2022',
Expand Down
19 changes: 0 additions & 19 deletions example/models/items.sql

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
@model(
"""
MODEL(
name raw.order_items,
name sushi.order_items,
kind incremental,
time_column ds,
depends_on [raw.orders, raw.items],
depends_on [sushi.orders, sushi.items],
cron '@daily',
batch_size 30,
columns (
Expand All @@ -40,7 +40,7 @@ def execute(
dfs = []

raw_orders = (
snapshots["raw.orders"].table_name if snapshots else mapping["raw.orders"]
snapshots["sushi.orders"].table_name if snapshots else mapping["sushi.orders"]
)

for dt in iter_dates(start, end):
Expand Down
18 changes: 0 additions & 18 deletions example/models/order_items.sql

This file was deleted.

3 changes: 2 additions & 1 deletion example/models/raw_orders.py → example/models/orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

@model(
"""
-- Table of sushi orders.
MODEL(
name raw.orders,
name sushi.orders,
kind incremental,
time_column ds,
start '2022-01-01',
Expand Down
19 changes: 0 additions & 19 deletions example/models/orders.sql

This file was deleted.

2 changes: 1 addition & 1 deletion example/models/top_waiters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ WHERE
)
ORDER BY
revenue DESC
LIMIT 10
LIMIT 10
2 changes: 1 addition & 1 deletion example/models/waiter_revenue_by_day.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ WHERE
o.ds BETWEEN @start_ds AND @end_ds
GROUP BY
o.waiter_id,
o.ds
o.ds
2 changes: 1 addition & 1 deletion example/models/waiters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ SELECT DISTINCT
ds::TEXT AS ds
FROM sushi.orders AS o
WHERE
ds BETWEEN @start_ds AND @end_ds
ds BETWEEN @start_ds AND @end_ds
91 changes: 0 additions & 91 deletions example/tests/test_orders.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"requests",
"rich",
"ruamel.yaml",
"sqlglot>=10.2.0",
"sqlglot>=10.2.1",
],
extras_require={
"dev": [
Expand Down
41 changes: 28 additions & 13 deletions sqlmesh/core/engine_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def replace_query(self, table_name: str, query_or_df: QueryOrDF):
if self.supports_partitions:
self.insert_overwrite(table_name, query_or_df)
else:
if isinstance(query_or_df, pd.DataFrame):
query_or_df = next(pandas_to_sql(query_or_df))
self.execute(
exp.Create(
kind="TABLE",
Expand All @@ -104,29 +106,42 @@ def replace_query(self, table_name: str, query_or_df: QueryOrDF):
def create_table(
self,
table_name: str,
columns: t.Dict[str, exp.DataType],
query_or_columns: Query | t.Dict[str, exp.DataType],
exists: bool = True,
**kwargs,
) -> None:
"""Create a table using a DDL statement
"""Create a table using a DDL statement or a CTAS.

If a query is passed in instead of column type map, CREATE TABLE AS will be used.

Args:
table_name: The name of the table to create. Can be fully qualified or just table name
columns: A mapping between the column name and its data type
exists: Indicates if you to include an IF EXISTS check
table_name: The name of the table to create. Can be fully qualified or just table name.
query_columns: A query or mapping between the column name and its data type.
exists: Indicates if you to include an IF EXISTS check.
kwargs: Optional create table properties.
"""
properties = self._create_table_properties(**kwargs)

schema = exp.Schema(
this=exp.to_table(table_name),
expressions=[
exp.ColumnDef(this=exp.to_identifier(column), kind=kind)
for column, kind in columns.items()
],
)
query = None
schema: t.Optional[exp.Schema | exp.Table] = exp.to_table(table_name)

if isinstance(query_or_columns, dict):
schema = exp.Schema(
this=schema,
expressions=[
exp.ColumnDef(this=exp.to_identifier(column), kind=kind)
for column, kind in query_or_columns.items()
],
)
else:
query = query_or_columns

create_expression = exp.Create(
this=schema, kind="TABLE", exists=exists, properties=properties
this=schema,
kind="TABLE",
exists=exists,
properties=properties,
expression=query,
)
self.execute(create_expression)

Expand Down
Loading