Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,12 @@
--endpoint /subscriptions/{SubID}/resourceGroups/TestRG/providers/Microsoft.Storage/storageAccounts/sa1/queueservices/default/queues/q1 \\
--storage-queue-msg-ttl 300

- name: Create a new event subscription for an Event Grid topic using StorageQueue with systemassigned MSI identity as destination
text: |
az eventgrid event-subscription create -n {EventSubscriptionName} --source-resource-id /subscriptions/{SubID}/resourceGroups/{RG}/providers/Microsoft.EventGrid/topics/topic1 \\
--delivery-identity-endpoint-type StorageQueue --delivery-identity systemassigned --delivery-identity-endpoint /subscriptions/{SubID}/resourceGroups/TestRG/providers/Microsoft.Storage/storageAccounts/sa1/queueservices/default/queues/q1 \\
--storage-queue-msg-ttl 300

- name: Create a new event subscription for an Event Grid topic with advanced filtering enabled on arrays
text: |
az eventgrid event-subscription create -n es1 \\
Expand Down Expand Up @@ -1315,7 +1321,7 @@
- name: Create a new topic in Azure.
text: az eventgrid topic create -g rg1 --name topic1 -l westus2 --kind azure
- name: Create a new topic in AzureArc targeting a custom location.
text: az eventgrid topic create -g rg1 --name topic1 -l eastus2euap --kind azurearc --extended-location-name /subscriptions/<subid>/resourcegroups/<rgname>/providers/microsoft.extendedlocation/customlocations/<cust-loc-name> --extended-location-type customlocation
text: az eventgrid topic create -g rg1 --name topic1 -l eastus2euap --kind azurearc --extended-location-name /subscriptions/<subid>/resourcegroups/<rgname>/providers/microsoft.extendedlocation/customlocations/<cust-loc-name> --extended-location-type customlocation --input-schema CloudEventSchemaV1_0
"""

helps['eventgrid topic delete'] = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ def load_arguments(self, _): # pylint: disable=too-many-statements
with self.argument_context('eventgrid event-subscription') as c:
c.argument('event_subscription_name', arg_type=name_type, help='Name of the event subscription.')
c.argument('event_delivery_schema', arg_type=get_enum_type(['eventgridschema', 'custominputschema', 'cloudeventschemav1_0']), help='The schema in which events should be delivered for this event subscription. By default, events will be delivered in the same schema in which they are published (based on the corresponding topic\'s input schema).')
c.argument('max_delivery_attempts', help="Maximum number of delivery attempts. Must be a number between 1 and 30.")
c.argument('max_events_per_batch', help="Maximum number of events in a batch. Must be a number between 1 and 5000.")
c.argument('preferred_batch_size_in_kilobytes', help="Preferred batch size in kilobytes. Must be a number between 1 and 1024.")
c.argument('event_ttl', help="Event time to live (in minutes). Must be a number between 1 and 1440.")
c.argument('max_delivery_attempts', type=int, help="Maximum number of delivery attempts. Must be a number between 1 and 30.")
c.argument('max_events_per_batch', type=int, help="Maximum number of events in a batch. Must be a number between 1 and 5000.")
c.argument('preferred_batch_size_in_kilobytes', type=int, help="Preferred batch size in kilobytes. Must be a number between 1 and 1024.")
c.argument('event_ttl', type=int, help="Event time to live (in minutes). Must be a number between 1 and 1440.")
c.argument('deadletter_endpoint', help="The Azure resource ID of an Azure Storage blob container destination where EventGrid should deadletter undeliverable events for this event subscription.")
c.argument('deadletter_identity_endpoint', help="The Azure resource ID of an Azure Storage blob container destination with identity where EventGrid should deadletter undeliverable events for this event subscription.")
c.argument('advanced_filter', arg_group="Filtering", action=EventSubscriptionAddFilter, nargs='+')
Expand Down
116 changes: 66 additions & 50 deletions src/azure-cli/azure/cli/command_modules/eventgrid/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ def cli_topic_create_or_update(
sku_info = ResourceSku(name=sku_name)
identity_info = None

identity_info = _get_identity_info(identity)
kind_name = _get_kind(kind)
extended_location = _get_extended_location(kind, extended_location_name, extended_location_type)

identity_info = _get_identity_info(identity, kind)
topic_info = Topic(
location=location,
tags=tags,
Expand Down Expand Up @@ -929,10 +930,11 @@ def _get_event_subscription_info( # pylint: disable=too-many-locals,too-many-
enable_advanced_filtering_on_arrays=None,
delivery_attribute_mapping=None):

if endpoint is None and delivery_identity_endpoint is None or \
endpoint is not None and delivery_identity_endpoint is not None:
raise CLIError('usage error: either --endpoint or --delivery-identity-endpoint should '
'be specified at one time, not both.')
_validate_delivery_identity_args(
endpoint,
delivery_identity,
delivery_identity_endpoint,
delivery_identity_endpoint_type)

if deadletter_endpoint is not None and deadletter_identity_endpoint is not None:
raise CLIError('usage error: either --deadletter_endpoint or --deadletter_identity_endpoint '
Expand All @@ -947,24 +949,20 @@ def _get_event_subscription_info( # pylint: disable=too-many-locals,too-many-
included_event_types = None

# Construct RetryPolicy based on max_delivery_attempts and event_ttl
max_delivery_attempts = int(max_delivery_attempts)
event_ttl = int(event_ttl)
_validate_retry_policy(max_delivery_attempts, event_ttl)
retry_policy = RetryPolicy(max_delivery_attempts=max_delivery_attempts, event_time_to_live_in_minutes=event_ttl)

if max_events_per_batch is not None:
if endpoint_type not in (WEBHOOK_DESTINATION, AZUREFUNCTION_DESTINATION):
raise CLIError('usage error: max-events-per-batch is applicable only for '
'endpoint types WebHook and AzureFunction.')
max_events_per_batch = int(max_events_per_batch)
if max_events_per_batch > 5000:
raise CLIError('usage error: max-events-per-batch must be a number between 1 and 5000.')

if preferred_batch_size_in_kilobytes is not None:
if endpoint_type not in (WEBHOOK_DESTINATION, AZUREFUNCTION_DESTINATION):
raise CLIError('usage error: preferred-batch-size-in-kilobytes is applicable only for '
'endpoint types WebHook and AzureFunction.')
preferred_batch_size_in_kilobytes = int(preferred_batch_size_in_kilobytes)
if preferred_batch_size_in_kilobytes > 1024:
raise CLIError('usage error: preferred-batch-size-in-kilobytes must be a number '
'between 1 and 1024.')
Expand All @@ -985,15 +983,6 @@ def _get_event_subscription_info( # pylint: disable=too-many-locals,too-many-
raise CLIError('usage error: azure-active-directory-tenant-id is missing. '
'It should include an Azure Active Directory Tenant Id.')

condition1 = delivery_identity is not None and \
(delivery_identity_endpoint is None or delivery_identity_endpoint_type is None)
condition2 = delivery_identity is None and \
(delivery_identity_endpoint is not None or delivery_identity_endpoint_type is not None)
if endpoint is None and (condition1 or condition2):
raise CLIError('usage error: one or more delivery identity information is missing. '
'If delivery_identity is specified, both delivery_identity_endpoint and '
'delivery_identity_endpoint_type should be specified.')

condition1 = deadletter_identity is not None and deadletter_identity_endpoint is None
condition2 = deadletter_identity is None and deadletter_identity_endpoint is not None
if condition1 or condition2:
Expand All @@ -1010,10 +999,10 @@ def _get_event_subscription_info( # pylint: disable=too-many-locals,too-many-
tennant_id = azure_active_directory_tenant_id
application_id = azure_active_directory_application_id_or_uri

_validate_destination_attribute(endpoint_type, storage_queue_msg_ttl, delivery_attribute_mapping)

destination = None
delivery_with_resource_identity = None
if endpoint is not None:
_validate_destination_attribute(endpoint_type, storage_queue_msg_ttl, delivery_attribute_mapping)
destination = _get_endpoint_destination(
endpoint_type,
endpoint,
Expand All @@ -1023,12 +1012,13 @@ def _get_event_subscription_info( # pylint: disable=too-many-locals,too-many-
application_id,
storage_queue_msg_ttl,
delivery_attribute_mapping)

delivery_with_resource_identity = None

if delivery_identity_endpoint is not None:
elif delivery_identity_endpoint is not None:
identity_type_name = _get_event_subscription_identity_type(delivery_identity)
delivery_identity_info = EventSubscriptionIdentity(type=identity_type_name)
_validate_destination_attribute(
delivery_identity_endpoint_type,
storage_queue_msg_ttl,
delivery_attribute_mapping)
destination_with_identity = _get_endpoint_destination(
delivery_identity_endpoint_type,
delivery_identity_endpoint,
Expand Down Expand Up @@ -1287,26 +1277,25 @@ def _update_event_subscription_internal( # pylint: disable=too-many-locals,too-
enable_advanced_filtering_on_arrays=None,
delivery_attribute_mapping=None):

_validate_delivery_identity(
_validate_delivery_identity_args(
endpoint,
delivery_identity,
delivery_identity_endpoint,
delivery_identity_endpoint_type)

_validate_deadletter_identity(
_validate_deadletter_identity_args(
deadletter_identity,
deadletter_endpoint)

event_subscription_destination = instance.destination
event_subscription_destination_with_resource_identity = None

if instance.delivery_with_resource_identity is not None:
event_subscription_destination_with_resource_identity = instance.delivery_with_resource_identity.destination

deadletter_destination = None
instance_delivery_with_resource_identity = instance.delivery_with_resource_identity
event_subscription_destination = instance.destination
event_subscription_labels = instance.labels
event_subscription_filter = instance.filter

event_delivery_schema = instance.event_delivery_schema
retry_policy = instance.retry_policy

Expand Down Expand Up @@ -1348,20 +1337,28 @@ def _update_event_subscription_internal( # pylint: disable=too-many-locals,too-
max_events_per_batch = event_subscription_destination_with_resource_identity.max_events_per_batch
preferred_batch_size_in_kilobytes = event_subscription_destination_with_resource_identity.preferred_batch_size_in_kilobytes # pylint: disable=line-too-long

if endpoint is None and \
storage_queue_msg_ttl is not None or delivery_attribute_mapping is not None:
_validate_destination_attribute(
event_subscription_destination.endpoint_type,
storage_queue_msg_ttl,
delivery_attribute_mapping)
delivery_with_resource_identity = None

_set_event_subscription_destination(
event_subscription_destination,
# if endpoint and delivery_identity_endpoint is not specified then use the instance value
if endpoint is None and delivery_identity_endpoint is None:
if event_subscription_destination is not None:
_validate_and_update_destination(
event_subscription_destination.endpoint_type,
event_subscription_destination,
storage_queue_msg_ttl,
delivery_attribute_mapping)
elif event_subscription_destination_with_resource_identity is not None:
_validate_and_update_destination(
event_subscription_destination_with_resource_identity.endpoint_type,
instance_delivery_with_resource_identity.destination,
storage_queue_msg_ttl,
delivery_attribute_mapping)
delivery_with_resource_identity = instance_delivery_with_resource_identity
elif endpoint is not None:
_validate_destination_attribute(
endpoint_type,
storage_queue_msg_ttl,
delivery_attribute_mapping)

elif endpoint is not None:
_validate_destination_attribute(endpoint_type, storage_queue_msg_ttl, delivery_attribute_mapping)
event_subscription_destination = _get_endpoint_destination(
endpoint_type,
endpoint,
Expand All @@ -1371,10 +1368,7 @@ def _update_event_subscription_internal( # pylint: disable=too-many-locals,too-
application_id,
storage_queue_msg_ttl,
delivery_attribute_mapping)

delivery_with_resource_identity = None

if delivery_identity_endpoint is not None:
elif delivery_identity_endpoint is not None:
identity_type_name = _get_event_subscription_identity_type(delivery_identity)
delivery_identity_info = EventSubscriptionIdentity(type=identity_type_name)
destination_with_identity = _get_endpoint_destination(
Expand Down Expand Up @@ -1751,12 +1745,15 @@ def _validate_subscription_id_matches_default_subscription_id(
' use az account set ID_OR_NAME, or use the global argument --subscription ')


def _get_identity_info(identity=None):
def _get_identity_info(identity=None, kind=None):
if (identity is not None and identity.lower() != IDENTITY_NONE.lower()):
identity_type_name = _get_identity_type(identity)
identity_info = IdentityInfo(type=identity_type_name)
else:
identity_info = IdentityInfo(type=IDENTITY_NONE)
if kind is None or kind.lower() == KIND_AZURE.lower():
identity_info = IdentityInfo(type=IDENTITY_NONE)
else:
identity_info = None
return identity_info


Expand Down Expand Up @@ -1831,24 +1828,43 @@ def _ensure_extended_location_is_valid(extended_location_name=None, extended_loc
" and extended-location-type value must be 'customLocation'.")


def _validate_delivery_identity(
def _validate_delivery_identity_args(
endpoint,
delivery_identity,
delivery_identity_endpoint,
delivery_identity_endpoint_type):

condition1 = delivery_identity is not None and \
(delivery_identity_endpoint is None or delivery_identity_endpoint_type is None)

condition2 = delivery_identity is None and \
(delivery_identity_endpoint is not None or delivery_identity_endpoint_type is not None)

if endpoint is None and (condition1 or condition2):
raise CLIError('usage error: one or more delivery identity information is missing. '
'If delivery_identity is specified, both delivery_identity_endpoint and '
'delivery_identity_endpoint_type should be specified.')
'If --delivery-identity is specified, both --delivery-identity-endpoint and '
'--delivery-identity-endpoint-type should be specified.')

if endpoint is not None and (condition1 or condition2):
raise CLIError('usage error: Cannot specify both --delivery-identity and --endpoint.'
' If --endpoint is specified then none of the --delivery-identity properties can be specified.')


def _validate_deadletter_identity(deadletter_identity, deadletter_identity_endpoint):
def _validate_deadletter_identity_args(deadletter_identity, deadletter_identity_endpoint):
condition1 = deadletter_identity is not None and deadletter_identity_endpoint is None
condition2 = deadletter_identity is None and deadletter_identity_endpoint is not None
if condition1 or condition2:
raise CLIError('usage error: one or more deadletter identity information is missing. If '
'deadletter_identity is specified, deadletter_identity_endpoint should be specified.')


def _validate_and_update_destination(endpoint_type, destination, storage_queue_msg_ttl, delivery_attribute_mapping):
_validate_destination_attribute(
endpoint_type,
storage_queue_msg_ttl,
delivery_attribute_mapping)

_set_event_subscription_destination(
destination,
storage_queue_msg_ttl,
delivery_attribute_mapping)
Loading