Skip to content

Commit bb7cb1a

Browse files
Feat: add risingwave engine adapter support for sqlmesh. (#3436)
1 parent 39754d7 commit bb7cb1a

13 files changed

Lines changed: 251 additions & 0 deletions

File tree

.circleci/continue_config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ workflows:
298298
- spark
299299
- clickhouse
300300
- clickhouse-cluster
301+
- risingwave
301302
- engine_tests_cloud:
302303
name: cloud_engine_<< matrix.engine >>
303304
context:

.circleci/wait-for-db.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ trino_ready() {
5555
docker compose -f tests/core/engine_adapter/integration/docker/compose.trino.yaml exec trino /bin/bash -c '/usr/lib/trino/bin/health-check'
5656
}
5757

58+
risingwave_ready() {
59+
probe_port 4566
60+
}
61+
5862
echo "Waiting for $ENGINE to be ready..."
5963

6064
READINESS_FUNC="${ENGINE}_ready"

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ spark-test: engine-spark-up
187187
trino-test: engine-trino-up
188188
pytest -n auto -x -m "trino or trino_iceberg or trino_delta or trino_nessie" --retries 3 --junitxml=test-results/junit-trino.xml
189189

190+
risingwave-test: engine-risingwave-up
191+
pytest -n auto -x -m "risingwave" --retries 3 --junitxml=test-results/junit-risingwave.xml
192+
190193
#################
191194
# Cloud Engines #
192195
#################
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# RisingWave
2+
3+
This page provides information about how to use SQLMesh with the RisingWave streaming database engine.
4+
5+
## Connection options
6+
7+
| Option | Description | Type | Required |
8+
|----------------|--------------------------------------------------------------|:------:|:--------:|
9+
| `type` | Engine type name - must be `risingwave` | string | Y |
10+
| `host` | The hostname of the RisingWave engine | string | Y |
11+
| `user` | The username to use for authentication with the RisingWave engine | string | Y |
12+
| `password` | The password to use for authentication with the RisingWave engine | string | N |
13+
| `port` | The port number of the RisingWave engine server | int | Y |
14+
| `database` | The name of the database instance to connect to | string | Y |
15+
| `role` | The role to use for authentication with the RisingWave server | string | N |
16+
| `sslmode` | The security of the connection to the RisingWave server | string | N |
17+
18+
## Usage
19+
RisingWave engine has some different features as streaming database. You can create a resource that RisingWave can read data from with `CREATE SOURCE`. You can also create an external target where you can send data processed in RisingWave with `CREATE SINK`.
20+
21+
To use this in SQLMesh, you can refer to optional pre-statements and post-statements as [SQL models doc](https://sqlmesh.readthedocs.io/en/stable/concepts/models/sql_models/) here specify.
22+
23+
Below is an example of creating sink in SQLMesh models as post-statement.
24+
25+
```sql
26+
MODEL (
27+
name sqlmesh_example.view_model,
28+
kind VIEW (
29+
materialized true
30+
)
31+
);
32+
33+
SELECT
34+
item_id,
35+
COUNT(DISTINCT id) AS num_orders,
36+
FROM
37+
sqlmesh_example.incremental_model
38+
GROUP BY item_id;
39+
40+
CREATE
41+
SINK IF NOT EXISTS kafka_sink
42+
FROM
43+
@this_model
44+
WITH (
45+
connector='kafka',
46+
"properties.bootstrap.server"='localhost:9092',
47+
topic='test1',
48+
)
49+
FORMAT PLAIN
50+
ENCODE JSON (force_append_only=true);
51+
```
52+
53+
here `@this_model` macro is used to represent "sqlmesh_example.view_model" model.

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@
160160
"sse-starlette>=0.2.2",
161161
"pyarrow",
162162
],
163+
"risingwave": [
164+
"psycopg2",
165+
],
163166
},
164167
classifiers=[
165168
"Intended Audience :: Developers",

sqlmesh/core/config/connection.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1770,6 +1770,52 @@ def get_catalog(self) -> t.Optional[str]:
17701770
return self.catalog_name
17711771

17721772

1773+
class RisingwaveConnectionConfig(ConnectionConfig):
1774+
host: str
1775+
user: str
1776+
password: t.Optional[str] = None
1777+
port: int
1778+
database: str
1779+
role: t.Optional[str] = None
1780+
sslmode: t.Optional[str] = None
1781+
1782+
concurrent_tasks: int = 4
1783+
register_comments: bool = True
1784+
pre_ping: bool = True
1785+
1786+
type_: t.Literal["risingwave"] = Field(alias="type", default="risingwave")
1787+
1788+
@property
1789+
def _connection_kwargs_keys(self) -> t.Set[str]:
1790+
return {
1791+
"host",
1792+
"user",
1793+
"password",
1794+
"port",
1795+
"database",
1796+
"role",
1797+
"sslmode",
1798+
}
1799+
1800+
@property
1801+
def _engine_adapter(self) -> t.Type[EngineAdapter]:
1802+
return engine_adapter.RisingwaveEngineAdapter
1803+
1804+
@property
1805+
def _connection_factory(self) -> t.Callable:
1806+
from psycopg2 import connect
1807+
1808+
return connect
1809+
1810+
@property
1811+
def _cursor_init(self) -> t.Optional[t.Callable[[t.Any], None]]:
1812+
def init(cursor: t.Any) -> None:
1813+
sql = "SET RW_IMPLICIT_FLUSH TO true;"
1814+
cursor.execute(sql)
1815+
1816+
return init
1817+
1818+
17731819
CONNECTION_CONFIG_TO_TYPE = {
17741820
# Map all subclasses of ConnectionConfig to the value of their `type_` field.
17751821
tpe.all_field_infos()["type_"].default: tpe

sqlmesh/core/engine_adapter/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter
1919
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
2020
from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter
21+
from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter
2122

2223
DIALECT_TO_ENGINE_ADAPTER = {
2324
"hive": SparkEngineAdapter,
@@ -33,6 +34,7 @@
3334
"mssql": MSSQLEngineAdapter,
3435
"trino": TrinoEngineAdapter,
3536
"athena": AthenaEngineAdapter,
37+
"risingwave": RisingwaveEngineAdapter,
3638
}
3739

3840
DIALECT_ALIASES = {
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import typing as t
5+
6+
7+
from sqlglot import exp
8+
9+
from sqlmesh.core.engine_adapter.postgres import PostgresEngineAdapter
10+
from sqlmesh.core.engine_adapter.shared import (
11+
set_catalog,
12+
CatalogSupport,
13+
CommentCreationView,
14+
CommentCreationTable,
15+
)
16+
17+
18+
if t.TYPE_CHECKING:
19+
from sqlmesh.core._typing import TableName
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
@set_catalog()
25+
class RisingwaveEngineAdapter(PostgresEngineAdapter):
26+
DIALECT = "risingwave"
27+
DEFAULT_BATCH_SIZE = 400
28+
CATALOG_SUPPORT = CatalogSupport.SINGLE_CATALOG_ONLY
29+
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
30+
COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
31+
SUPPORTS_MATERIALIZED_VIEWS = True
32+
SUPPORTS_TRANSACTIONS = False
33+
34+
def _truncate_table(self, table_name: TableName) -> None:
35+
return self.execute(exp.Delete(this=exp.to_table(table_name)))

tests/core/engine_adapter/integration/__init__.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ def supports_merge(self) -> bool:
160160
if self.dialect == "athena":
161161
return "hive" not in self.mark
162162

163+
if self.dialect == "risingwave":
164+
return False
165+
163166
return True
164167

165168
@property
@@ -348,6 +351,20 @@ def get_table_comment(
348351
"""
349352
elif self.dialect == "clickhouse":
350353
query = f"SELECT name, comment FROM system.tables WHERE database = '{schema_name}' AND name = '{table_name}'"
354+
elif self.dialect == "risingwave":
355+
query = f"""
356+
SELECT
357+
c.relname,
358+
d.description
359+
FROM pg_class c
360+
INNER JOIN pg_description d ON c.oid = d.objoid AND d.objsubid = 0
361+
INNER JOIN pg_namespace n ON c.relnamespace = n.oid
362+
WHERE
363+
c.relname = '{table_name}'
364+
AND n.nspname= '{schema_name}'
365+
AND c.relkind = '{'v' if table_kind == "VIEW" else 'r'}'
366+
;
367+
"""
351368

352369
result = self.engine_adapter.fetchall(query)
353370

@@ -439,6 +456,24 @@ def get_column_comments(
439456
schema_name = '{schema_name}'
440457
AND table_name = '{table_name}'
441458
"""
459+
elif self.dialect == "risingwave":
460+
query = f"""
461+
SELECT
462+
a.attname AS column_name, d.description
463+
FROM
464+
pg_class c
465+
INNER JOIN pg_namespace n ON c.relnamespace = n.oid
466+
INNER JOIN pg_attribute a ON c.oid = a.attrelid
467+
INNER JOIN pg_description d
468+
ON
469+
a.attnum = d.objsubid
470+
AND d.objoid = c.oid
471+
WHERE
472+
n.nspname = '{schema_name}'
473+
AND c.relname = '{table_name}'
474+
AND c.relkind = '{'v' if table_kind == "VIEW" else 'r'}'
475+
;
476+
"""
442477

443478
result = self.engine_adapter.fetchall(query)
444479

tests/core/engine_adapter/integration/config.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ gateways:
100100
cluster: cluster1
101101
state_connection:
102102
type: duckdb
103+
inttest_risingwave:
104+
connection:
105+
type: risingwave
106+
user: root
107+
database: dev
108+
host: {{ env_var('DOCKER_HOSTNAME', 'localhost') }}
109+
port: 4566
103110

104111

105112
# Cloud databases

0 commit comments

Comments
 (0)