-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathtest_reference_cte.py
More file actions
64 lines (56 loc) · 2.53 KB
/
test_reference_cte.py
File metadata and controls
64 lines (56 loc) · 2.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import re
from sqlmesh.core.context import Context
from sqlmesh.lsp.context import LSPContext, ModelTarget
from sqlmesh.lsp.reference import CTEReference, get_references
from sqlmesh.lsp.uri import URI
from lsprotocol.types import Range, Position
import typing as t
def test_cte_parsing():
context = Context(paths=["examples/sushi"])
lsp_context = LSPContext(context)
# Find model URIs
sushi_customers_path = next(
path
for path, info in lsp_context.map.items()
if isinstance(info, ModelTarget) and "sushi.customers" in info.names
)
with open(sushi_customers_path, "r", encoding="utf-8") as file:
read_file = file.readlines()
# Find position of the cte reference
ranges = find_ranges_from_regex(read_file, r"current_marketing(?!_outer)")
assert len(ranges) == 2
position = Position(line=ranges[1].start.line, character=ranges[1].start.character + 4)
references = get_references(lsp_context, URI.from_path(sushi_customers_path), position)
assert len(references) == 1
assert references[0].path == sushi_customers_path
assert isinstance(references[0], CTEReference)
assert (
references[0].range.start.line == ranges[1].start.line
) # The reference location (where we clicked)
assert (
references[0].target_range.start.line == ranges[0].start.line
) # The CTE definition location
# Find the position of the current_marketing_outer reference
ranges = find_ranges_from_regex(read_file, r"current_marketing_outer")
assert len(ranges) == 2
position = Position(line=ranges[1].start.line, character=ranges[1].start.character + 4)
references = get_references(lsp_context, URI.from_path(sushi_customers_path), position)
assert len(references) == 1
assert references[0].path == sushi_customers_path
assert isinstance(references[0], CTEReference)
assert (
references[0].range.start.line == ranges[1].start.line
) # The reference location (where we clicked)
assert (
references[0].target_range.start.line == ranges[0].start.line
) # The CTE definition location
def find_ranges_from_regex(read_file: t.List[str], regex: str) -> t.List[Range]:
"""Find all ranges in the read file that match the regex."""
return [
Range(
start=Position(line=line_number, character=match.start()),
end=Position(line=line_number, character=match.end()),
)
for line_number, line in enumerate(read_file)
for match in [m for m in [re.search(regex, line)] if m]
]