Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Checkpoint
  • Loading branch information
Bret Ambrose committed May 2, 2025
commit 8f8d16a950d4677636b8710605d257b55c523423
13 changes: 13 additions & 0 deletions awsiot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,16 @@ def validate(self):
assert callable(self.incoming_event_listener)
assert callable(self.subscription_status_listener) or self.subscription_status_listener is None
assert callable(self.deserialization_failure_listener) or self.deserialization_failure_listener is None

def create_streaming_unmodeled_options(stream_options: ServiceStreamOptions[T], subscription_topic: str, event_name: str, event_class):
def modeled_event_callback(unmodeled_event : mqtt_request_response.IncomingPublishEvent):
try:
payload_as_json = json.loads(unmodeled_event.payload.decode())
modeled_event = event_class.from_payload(payload_as_json)
stream_options.incoming_event_listener(modeled_event)
except Exception as e:
if stream_options.deserialization_failure_listener is not None:
failure_event = V2DeserializationFailure(f"{event_name} stream deserialization failure", e, unmodeled_event.payload)
stream_options.deserialization_failure_listener(failure_event)

return mqtt_request_response.StreamingOperationOptions(subscription_topic, stream_options.subscription_status_listener, modeled_event_callback)
36 changes: 7 additions & 29 deletions awsiot/iotshadow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@

# This file is generated

from awscrt import mqtt, mqtt5, mqtt_request_response, exceptions
from awscrt import mqtt, mqtt5, mqtt_request_response
import awsiot
import concurrent.futures
import datetime
import json
import typing
from uuid import uuid4

import pdb
import uuid


class IotShadowClient(awsiot.MqttServiceClient):
Expand Down Expand Up @@ -1788,7 +1786,7 @@ def get_shadow(self, request: GetShadowRequest):
rejected_topic = topic_prefix + "/rejected"
subscription1 = topic_prefix + "/+"

correlation_token = str(uuid4())
correlation_token = str(uuid.uuid4())
request.client_token = correlation_token

request_options = mqtt_request_response.RequestOptions(
Expand Down Expand Up @@ -1823,7 +1821,7 @@ def delete_shadow(self, request: DeleteShadowRequest):
rejected_topic = topic_prefix + "/rejected"
subscription1 = topic_prefix + "/+"

correlation_token = str(uuid4())
correlation_token = str(uuid.uuid4())
request.client_token = correlation_token

request_options = mqtt_request_response.RequestOptions(
Expand Down Expand Up @@ -1859,7 +1857,7 @@ def update_shadow(self, request: UpdateShadowRequest):
subscription1 = accepted_topic
subscription2 = rejected_topic

correlation_token = str(uuid4())
correlation_token = str(uuid.uuid4())
request.client_token = correlation_token

request_options = mqtt_request_response.RequestOptions(
Expand Down Expand Up @@ -1892,17 +1890,7 @@ def create_shadow_delta_updated_stream(self, config : ShadowDeltaUpdatedSubscrip

subscription_topic = f"$aws/things/{config.thing_name}/shadow/update/delta"

def modeled_event_callback(unmodeled_event : mqtt_request_response.IncomingPublishEvent):
try:
payload_as_json = json.loads(unmodeled_event.payload.decode())
modeled_event = ShadowDeltaUpdatedEvent.from_payload(payload_as_json)
stream_options.incoming_event_listener(modeled_event)
except Exception as e:
if stream_options.deserialization_failure_listener is not None:
failure_event = awsiot.V2DeserializationFailure(f"shadow_delta_updated stream deserialization failure", e, unmodeled_event.payload)
stream_options.deserialization_failure_listener(failure_event)

unmodeled_options = mqtt_request_response.StreamingOperationOptions(subscription_topic, stream_options.subscription_status_listener, modeled_event_callback)
unmodeled_options = awsiot.create_streaming_unmodeled_options(stream_options, subscription_topic, "ShadowDeltaUpdatedEvent", ShadowDeltaUpdatedEvent)

return self._rr_client.create_stream(unmodeled_options)

Expand All @@ -1912,17 +1900,7 @@ def create_shadow_updated_stream(self, config : ShadowUpdatedSubscriptionRequest

subscription_topic = f"$aws/things/{config.thing_name}/shadow/update/documents"

def modeled_event_callback(unmodeled_event : mqtt_request_response.IncomingPublishEvent):
try:
payload_as_json = json.loads(unmodeled_event.payload.decode())
modeled_event = ShadowUpdatedEvent.from_payload(payload_as_json)
stream_options.incoming_event_listener(modeled_event)
except Exception as e:
if stream_options.deserialization_failure_listener is not None:
failure_event = awsiot.V2DeserializationFailure(f"shadow_updated stream deserialization failure", e, unmodeled_event.payload)
stream_options.deserialization_failure_listener(failure_event)

unmodeled_options = mqtt_request_response.StreamingOperationOptions(subscription_topic, stream_options.subscription_status_listener, modeled_event_callback)
unmodeled_options = awsiot.create_streaming_unmodeled_options(stream_options, subscription_topic, "ShadowUpdatedEvent", ShadowUpdatedEvent)

return self._rr_client.create_stream(unmodeled_options)

Loading