|
14 | 14 | # See the License for the specific language governing permissions and
|
15 | 15 | # limitations under the License.
|
16 | 16 |
|
17 |
| -import mock |
18 | 17 | import os
|
19 | 18 | import pytest
|
20 |
| -import time |
21 | 19 |
|
22 | 20 | from google.api_core.exceptions import AlreadyExists
|
23 | 21 | from google.cloud import pubsub_v1
|
|
29 | 27 | TOPIC = 'quickstart-sub-test-topic'
|
30 | 28 | SUBSCRIPTION = 'quickstart-sub-test-topic-sub'
|
31 | 29 |
|
32 |
| - |
33 |
| -@pytest.fixture(scope='module') |
34 |
| -def publisher_client(): |
35 |
| - yield pubsub_v1.PublisherClient() |
| 30 | +publisher_client = pubsub_v1.PublisherClient() |
| 31 | +subscriber_client = pubsub_v1.SubscriberClient() |
36 | 32 |
|
37 | 33 |
|
38 | 34 | @pytest.fixture(scope='module')
|
39 |
| -def topic_path(publisher_client): |
| 35 | +def topic_path(): |
40 | 36 | topic_path = publisher_client.topic_path(PROJECT, TOPIC)
|
41 | 37 |
|
42 | 38 | try:
|
43 |
| - publisher_client.create_topic(topic_path) |
| 39 | + topic = publisher_client.create_topic(topic_path) |
| 40 | + return topic.name |
44 | 41 | except AlreadyExists:
|
45 |
| - pass |
46 |
| - |
47 |
| - yield topic_path |
48 |
| - |
49 |
| - |
50 |
| -@pytest.fixture(scope='module') |
51 |
| -def subscriber_client(): |
52 |
| - yield pubsub_v1.SubscriberClient() |
| 42 | + return topic_path |
53 | 43 |
|
54 | 44 |
|
55 | 45 | @pytest.fixture(scope='module')
|
56 |
| -def subscription(subscriber_client, topic_path): |
| 46 | +def subscription_path(topic_path): |
57 | 47 | subscription_path = subscriber_client.subscription_path(
|
58 | 48 | PROJECT, SUBSCRIPTION)
|
59 | 49 |
|
60 | 50 | try:
|
61 |
| - subscriber_client.create_subscription(subscription_path, topic_path) |
| 51 | + subscription = subscriber_client.create_subscription( |
| 52 | + subscription_path, topic_path) |
| 53 | + return subscription.name |
62 | 54 | except AlreadyExists:
|
63 |
| - pass |
64 |
| - |
65 |
| - yield SUBSCRIPTION |
| 55 | + return subscription_path |
66 | 56 |
|
67 | 57 |
|
68 |
| -@pytest.fixture |
69 |
| -def to_delete(publisher_client, subscriber_client): |
70 |
| - doomed = [] |
71 |
| - yield doomed |
72 |
| - for client, item in doomed: |
| 58 | +def _to_delete(resource_paths): |
| 59 | + for item in resource_paths: |
73 | 60 | if 'topics' in item:
|
74 | 61 | publisher_client.delete_topic(item)
|
75 | 62 | if 'subscriptions' in item:
|
76 | 63 | subscriber_client.delete_subscription(item)
|
77 | 64 |
|
78 | 65 |
|
79 |
| -def _make_sleep_patch(): |
80 |
| - real_sleep = time.sleep |
| 66 | +def _publish_messages(topic_path): |
| 67 | + publish_future = publisher_client.publish(topic_path, data=b'Hello World!') |
| 68 | + publish_future.result() |
| 69 | + |
81 | 70 |
|
82 |
| - def new_sleep(period): |
83 |
| - if period == 60: |
84 |
| - real_sleep(10) |
85 |
| - raise RuntimeError('sigil') |
86 |
| - else: |
87 |
| - real_sleep(period) |
| 71 | +def _sub_timeout(project_id, subscription_name): |
| 72 | + # This is an exactly copy of `sub.py` except |
| 73 | + # StreamingPullFuture.result() will time out after 10s. |
| 74 | + client = pubsub_v1.SubscriberClient() |
| 75 | + subscription_path = client.subscription_path( |
| 76 | + project_id, subscription_name) |
88 | 77 |
|
89 |
| - return mock.patch('time.sleep', new=new_sleep) |
| 78 | + def callback(message): |
| 79 | + print('Received message {} of message ID {}\n'.format( |
| 80 | + message, message.message_id)) |
| 81 | + message.ack() |
| 82 | + print('Acknowledged message {}\n'.format(message.message_id)) |
90 | 83 |
|
| 84 | + streaming_pull_future = client.subscribe( |
| 85 | + subscription_path, callback=callback) |
| 86 | + print('Listening for messages on {}..\n'.format(subscription_path)) |
| 87 | + |
| 88 | + try: |
| 89 | + streaming_pull_future.result(timeout=10) |
| 90 | + except: # noqa |
| 91 | + streaming_pull_future.cancel() |
91 | 92 |
|
92 |
| -def test_sub(publisher_client, |
93 |
| - topic_path, |
94 |
| - subscriber_client, |
95 |
| - subscription, |
96 |
| - to_delete, |
97 |
| - capsys): |
98 | 93 |
|
99 |
| - publisher_client.publish(topic_path, data=b'Hello, World!') |
| 94 | +def test_sub(monkeypatch, topic_path, subscription_path, capsys): |
| 95 | + monkeypatch.setattr(sub, 'sub', _sub_timeout) |
100 | 96 |
|
101 |
| - to_delete.append((publisher_client, topic_path)) |
| 97 | + _publish_messages(topic_path) |
102 | 98 |
|
103 |
| - with _make_sleep_patch(): |
104 |
| - with pytest.raises(RuntimeError, match='sigil'): |
105 |
| - sub.sub(PROJECT, subscription) |
| 99 | + sub.sub(PROJECT, SUBSCRIPTION) |
106 | 100 |
|
107 |
| - to_delete.append((subscriber_client, |
108 |
| - 'projects/{}/subscriptions/{}'.format(PROJECT, |
109 |
| - SUBSCRIPTION))) |
| 101 | + # Clean up resources. |
| 102 | + _to_delete([topic_path, subscription_path]) |
110 | 103 |
|
111 | 104 | out, _ = capsys.readouterr()
|
112 | 105 | assert "Received message" in out
|
|
0 commit comments