Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Added sample for synchronous pull
  • Loading branch information
anguillanneuf committed Aug 29, 2018
commit 8bed87fb2d877397c839929ef651f405030dda18
31 changes: 31 additions & 0 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,29 @@ def callback(message):
# [END pubsub_subscriber_flow_settings]


def receive_messages_synchronously(project, subscription_name):
"""Pulling messages synchronously."""
# [START pubsub_subscriber_sync_pull]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Copy link
Contributor

@tswast tswast Aug 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a public subscription name we can use instead of using this as a parameter? I recall I saw one for taxis in the Dataflow templates quickstart.

If not, we should show a comment like # subscription_name = '/does/a/sub/name/look/like/a/path?'

Copy link
Member Author

@anguillanneuf anguillanneuf Aug 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subscription_path method will build /projects/{project}/subscription/{subscription_name}.

Could we leave subscription_name as it is because it's just a string that's the subscription name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear that it's a string without some kind of comment or example. (Some APIs like Bigtable use bytes for names, so it's not obvious from context)


# Builds a pull request with a specified number of messages to return.
response = subscriber.pull(
subscription_path,
max_messages=3,
return_immediately=False) # Waits until three messages are available.

ack_ids = []
for received_message in response.received_messages:
print("Received: {}".format(received_message.message.data))
ack_ids.append(received_message.ack_id)

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)
# [END pubsub_subscriber_sync_pull]


def listen_for_errors(project, subscription_name):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
Expand Down Expand Up @@ -281,6 +304,11 @@ def callback(message):
help=receive_messages_with_flow_control.__doc__)
receive_with_flow_control_parser.add_argument('subscription_name')

receive_messages_synchronously_parser = subparsers.add_parser(
'receive-synchronously',
help=receive_messages_synchronously.__doc__)
receive_messages_synchronously_parser.add_argument('subscription_name')

listen_for_errors_parser = subparsers.add_parser(
'listen_for_errors', help=listen_for_errors.__doc__)
listen_for_errors_parser.add_argument('subscription_name')
Expand Down Expand Up @@ -314,5 +342,8 @@ def callback(message):
elif args.command == 'receive-flow-control':
receive_messages_with_flow_control(
args.project, args.subscription_name)
elif args.command == 'receive-synchronously':
receive_messages_synchronously(
args.project, args.subscription_name)
elif args.command == 'listen_for_errors':
listen_for_errors(args.project, args.subscription_name)
15 changes: 15 additions & 0 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,18 @@ def test_receive_with_flow_control(
assert 'Listening' in out
assert subscription in out
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
subscriber.receive_messages_with_flow_control(
PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Message 1' in out
assert 'Message 2' in out
assert 'Message 3' in out