This repository was archived by the owner on Feb 15, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathtest_api.py
More file actions
129 lines (109 loc) · 3.7 KB
/
Copy pathtest_api.py
File metadata and controls
129 lines (109 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import asyncio
import json
from threading import Timer
import httpx
import pytest
from ariadne.asgi import GQL_CONNECTION_INIT, GQL_START
from websockets import connect
my_storage = {}
@pytest.fixture
def storage():
return my_storage
@pytest.mark.asyncio
async def test_create_user(host, credentials, storage):
query = """
mutation createUser($email: String!, $password: String!) {
createUser(email: $email, password: $password) {
id,
errors
}
}
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"http://{host}/",
timeout=60,
json={"query": query, "variables": credentials},
)
json_response = json.loads(response.text)
assert ("errors" in json_response) == False
assert json_response["data"]["createUser"]["id"] is not None
storage["user_id"] = json_response["data"]["createUser"]["id"]
@pytest.mark.asyncio
async def test_auth_user(host, credentials, storage):
query = """
mutation authUser($email: String!, $password: String!) {
createToken(email: $email, password: $password) {
errors,
token
}
}
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"http://{host}/",
headers={},
timeout=60,
json={"query": query, "variables": credentials},
)
json_response = json.loads(response.text)
assert ("errors" in json_response) == False
assert json_response["data"]["createToken"]["token"] is not None
storage["token"] = json_response["data"]["createToken"]["token"]
async def create_blog(host, storage):
query = """
mutation createblog($title: String!, $description: String!) {
createblog(title: $title, description: $description) {
errors
id
}
}
"""
token = storage["token"]
async with httpx.AsyncClient() as client:
response = await client.post(
f"http://{host}/",
headers={"Authorization": f"Bearer {token}"},
timeout=60,
json={
"query": query,
"variables": {"title": "title", "description": "description"},
},
)
json_response = json.loads(response.text)
assert ("errors" in json_response) == True
assert json_response["data"]["createblog"]["id"] is not None
@pytest.mark.asyncio
async def test_create_blog(server, host, storage):
await create_blog(host, storage)
@pytest.mark.asyncio
async def test_subscription(server, host, storage):
query = """
subscription reviewblog($token: String!) {
reviewblog(token: $token) {
errors
id
}
}
"""
variables = {"token": f'Bearer {storage["token"]}'}
ws = await connect(f"ws://{host}/", subprotocols=["graphql-ws"])
await ws.send(json.dumps({"type": GQL_CONNECTION_INIT}))
await ws.send(
json.dumps(
{"type": GQL_START, "payload": {"query": query, "variables": variables},}
)
)
received = await ws.recv()
assert received == '{"type": "connection_ack"}'
def delay_create_blog(server, host):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(create_blog(server, host))
timer = Timer(1.0, delay_create_blog, (server, host, storage))
timer.start()
received = await ws.recv()
await ws.close()
json_response = json.loads(received)
assert ("errors" in json_response) == False
assert json_response["payload"]["data"]["reviewblog"]["id"] is not None