Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Sphinx and generic type hints don't work together well
  • Loading branch information
Bret Ambrose committed Jun 27, 2025
commit 6e288cf523bb8a153d553e36eec9a51f542e0dd8
10 changes: 6 additions & 4 deletions awsiot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
'ServiceStreamOptions'
]

import awscrt
from awscrt import mqtt, mqtt5, mqtt_request_response
from concurrent.futures import Future
from dataclasses import dataclass
import json
from typing import Any, Callable, Dict, Generic, Optional, Tuple, TypeVar


__version__ = '1.0.0-dev'

T = TypeVar('T')
Expand Down Expand Up @@ -238,18 +238,19 @@ def __init__(self, message: str, inner_error: 'Optional[Exception]', payload: Op
self.inner_error = inner_error
self.payload = payload


@dataclass
class ServiceStreamOptions(Generic[T]):
"""
Configuration options for an MQTT-based service streaming operation.

Args:
incoming_event_listener (Callable[[T], None]): function object to invoke when a stream message is successfully deserialized
subscription_status_listener (Optional[awscrt.mqtt_request_response.SubscriptionStatusListener]): function object to invoke when the operation's subscription status changes
subscription_status_listener (Optional[Callable[[awscrt.mqtt_request_response.SubscriptionStatusEvent], None]]): function object to invoke when the operation's subscription status changes
deserialization_failure_listener (Optional[Callable[[V2DeserializationFailure], None]]): function object to invoke when a publish is received on the streaming subscription that cannot be deserialized into the stream's output type. Should never happen.
"""
incoming_event_listener: 'Callable[[T], None]'
subscription_status_listener: 'Optional[mqtt_request_response.SubscriptionStatusListener]' = None
subscription_status_listener: 'Optional[Callable[[awscrt.mqtt_request_response.SubscriptionStatusEvent], None]]' = None
deserialization_failure_listener: 'Optional[Callable[[V2DeserializationFailure], None]]' = None

def _validate(self):
Expand All @@ -260,6 +261,7 @@ def _validate(self):
assert callable(self.subscription_status_listener) or self.subscription_status_listener is None
assert callable(self.deserialization_failure_listener) or self.deserialization_failure_listener is None


def create_streaming_unmodeled_options(stream_options: ServiceStreamOptions[T], subscription_topic: str, event_name: str, event_class):
def modeled_event_callback(unmodeled_event : mqtt_request_response.IncomingPublishEvent):
try:
Expand All @@ -271,4 +273,4 @@ def modeled_event_callback(unmodeled_event : mqtt_request_response.IncomingPubli
failure_event = V2DeserializationFailure(f"{event_name} stream deserialization failure", e, unmodeled_event.payload)
stream_options.deserialization_failure_listener(failure_event)

return mqtt_request_response.StreamingOperationOptions(subscription_topic, stream_options.subscription_status_listener, modeled_event_callback)
return mqtt_request_response.StreamingOperationOptions(subscription_topic, stream_options.subscription_status_listener, modeled_event_callback)
4 changes: 4 additions & 0 deletions docsrc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,8 @@
# and private classes aren't documented, but we want sphinx to document base classes by default.
# Ignoring the warnings seems like a reasonable compromise
('py:class', r'awsiot\.greengrasscoreipc\.model\._.*Operation'),

# sphinx does not handle generic type parameters well
('py:class', r'T'),
('py:obj', r'awsiot\.T')
]
Loading