diff --git a/src/azure-cli/azure/cli/command_modules/role/_multi_api_adaptor.py b/src/azure-cli/azure/cli/command_modules/role/_multi_api_adaptor.py deleted file mode 100644 index e030404b27b..00000000000 --- a/src/azure-cli/azure/cli/command_modules/role/_multi_api_adaptor.py +++ /dev/null @@ -1,89 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -from azure.cli.core.profiles import ResourceType, get_sdk, supported_api_version - - -class MultiAPIAdaptor: - # We will bridge all the code difference here caused by SDK breaking changes - def __init__(self, cli_ctx): - self.old_api = supported_api_version(cli_ctx, resource_type=ResourceType.MGMT_AUTHORIZATION, - max_api='2015-07-01') - self.cli_ctx = cli_ctx - - def _init_individual_permission(self, cfg): - Permission = get_sdk(self.cli_ctx, ResourceType.MGMT_AUTHORIZATION, 'Permission', mod='models', - operation_group='role_definitions') - permission = Permission(actions=cfg.get('actions', None), - not_actions=cfg.get('notActions', None)) - if not self.old_api: - permission.data_actions = cfg.get('dataActions', None) - permission.not_data_actions = cfg.get('notDataActions', None) - return permission - - def _init_permissions(self, role_definition_input): - # we will handle with or w/o 'permissions' - if 'permissions' in role_definition_input: - return [self._init_individual_permission(p) for p in role_definition_input['permissions']] - return [self._init_individual_permission(role_definition_input)] - - def create_role_definition(self, client, role_name, role_id, role_definition_input): - RoleDefinitionBase = get_sdk(self.cli_ctx, ResourceType.MGMT_AUTHORIZATION, 'RoleDefinition', - mod='models', operation_group='role_definitions') - role_configuration = RoleDefinitionBase(role_name=role_name, - description=role_definition_input.get('description', None), - type='CustomRole', - assignable_scopes=role_definition_input['assignableScopes'], - permissions=self._init_permissions(role_definition_input)) - scope = role_definition_input['assignableScopes'][0] - return client.create_or_update(role_definition_id=role_id, scope=scope, role_definition=role_configuration) - - def create_role_assignment(self, client, assignment_name, role_id, object_id, scope, assignee_principal_type=None, - description=None, condition=None, condition_version=None): - RoleAssignmentCreateParameters = get_sdk( - self.cli_ctx, ResourceType.MGMT_AUTHORIZATION, - 'RoleAssignmentProperties' if self.old_api else 'RoleAssignmentCreateParameters', - mod='models', operation_group='role_assignments') - - # In 2022-04-01 API, principal_type is by default 'User', so we have to explicitly set it to None if we can't - # resolve principal type from Graph - # https://github.com/Azure/azure-rest-api-specs/issues/21664 - parameters = RoleAssignmentCreateParameters( - role_definition_id=role_id, principal_id=object_id, principal_type=assignee_principal_type, - description=description, condition=condition, condition_version=condition_version) - - return client.create(scope, assignment_name, parameters) - - def get_role_property(self, obj, property_name): # pylint: disable=no-self-use - """Get property for RoleDefinition and RoleAssignment object.""" - # 2015-07-01 RoleDefinition: flattened, RoleAssignment: unflattened - # 2018-01-01-preview RoleDefinition: flattened - # 2020-04-01-preview RoleAssignment: flattened - # 2022-04-01 RoleDefinition: flattened RoleAssignment: flattened - # Get property_name from properties if the model is unflattened. - if isinstance(obj, dict): - if 'properties' in obj: - obj = obj['properties'] - return obj[property_name] - - if hasattr(obj, 'properties'): - obj = obj.properties - return getattr(obj, property_name) - - def set_role_property(self, obj, property_name, property_value): # pylint: disable=no-self-use - """Set property for RoleDefinition and RoleAssignment object. - Luckily this function is only called for an RoleAssignment `obj` returned by the service, and `properties` - has been processed, either by being flattened or set. We can definitively know whether `obj` is flattened - or not. - There is NO use case where `obj` is provided by the user and `properties` has not been processed. - In such case, we won't be able to decide if `obj` is flattened or not.""" - if isinstance(obj, dict): - if 'properties' in obj: - obj = obj['properties'] - obj[property_name] = property_value - else: - if hasattr(obj, 'properties'): - obj = obj.properties - obj.property_name = property_value diff --git a/src/azure-cli/azure/cli/command_modules/role/custom.py b/src/azure-cli/azure/cli/command_modules/role/custom.py index b0bd46abca6..ecd65306fe2 100644 --- a/src/azure-cli/azure/cli/command_modules/role/custom.py +++ b/src/azure-cli/azure/cli/command_modules/role/custom.py @@ -25,12 +25,11 @@ from knack.util import CLIError, todict from azure.core.exceptions import HttpResponseError -from azure.cli.core.profiles import ResourceType +from azure.cli.core.profiles import ResourceType, get_sdk from azure.cli.core.util import get_file_json, shell_safe_json_parse, is_guid from azure.cli.core.azclierror import ArgumentUsageError from ._client_factory import _auth_client_factory, _graph_client_factory -from ._multi_api_adaptor import MultiAPIAdaptor from ._msgrpah import GraphError, set_object_properties # ARM RBAC's principalType @@ -62,7 +61,7 @@ def list_role_definitions(cmd, name=None, resource_group_name=None, scope=None, definitions_client = _auth_client_factory(cmd.cli_ctx, scope).role_definitions scope = _build_role_scope(resource_group_name, scope, definitions_client._config.subscription_id) - return _search_role_definitions(cmd.cli_ctx, definitions_client, name, [scope], custom_role_only) + return _search_role_definitions(definitions_client, name, [scope], custom_role_only) def show_role_definition(cmd, scope=None, name=None, role_id=None): @@ -102,7 +101,7 @@ def _create_update_role_definition(cmd, role_definition, for_update): new_name = n[:1].lower() + n[1:] role_definition[new_name] = role_definition.pop(n) - worker = MultiAPIAdaptor(cmd.cli_ctx) + worker = RoleApiHelper(cmd.cli_ctx) if for_update: # for update, we need to use guid style unique name role_resource_id = role_definition.get('id') if not role_resource_id: @@ -116,14 +115,14 @@ def _create_update_role_definition(cmd, role_definition, for_update): role_id = parse_resource_id(role_resource_id)['name'] role_name = role_definition['roleName'] else: - matched = _search_role_definitions(cmd.cli_ctx, definitions_client, role_definition['name'], scopes) + matched = _search_role_definitions(definitions_client, role_definition['name'], scopes) if len(matched) > 1: raise CLIError('More than 2 definitions are found with the name of "{}"'.format( role_definition['name'])) if not matched: raise CLIError('No definition was found with the name of "{}"'.format(role_definition['name'])) role_id = role_definition['name'] = matched[0].name - role_name = worker.get_role_property(matched[0], 'role_name') + role_name = matched[0].role_name else: # for create definitions_client = _auth_client_factory(cmd.cli_ctx).role_definitions role_id = _gen_guid() @@ -142,23 +141,22 @@ def delete_role_definition(cmd, name, resource_group_name=None, scope=None, definitions_client = _auth_client_factory(cmd.cli_ctx, scope).role_definitions scope = _build_role_scope(resource_group_name, scope, definitions_client._config.subscription_id) - roles = _search_role_definitions(cmd.cli_ctx, definitions_client, name, [scope], custom_role_only) + roles = _search_role_definitions(definitions_client, name, [scope], custom_role_only) for r in roles: definitions_client.delete(role_definition_id=r.name, scope=scope) -def _search_role_definitions(cli_ctx, definitions_client, name, scopes, custom_role_only=False): +def _search_role_definitions(definitions_client, name, scopes, custom_role_only=False): for scope in scopes: # name argument matches the role definition's name (GUID) or roleName (e.g. 'Reader') property. # Only roleName can be used as a filter in Role Definitions - List API. # If name is a GUID, the filtering is performed on the client side. filter_query = f"roleName eq '{name}'" if name and not is_guid(name) else None roles = list(definitions_client.list(scope, filter=filter_query)) - worker = MultiAPIAdaptor(cli_ctx) if name: - roles = [r for r in roles if r.name == name or worker.get_role_property(r, 'role_name') == name] + roles = [rd for rd in roles if rd.name == name or rd.role_name == name] if custom_role_only: - roles = [r for r in roles if worker.get_role_property(r, 'role_type') == 'CustomRole'] + roles = [rd for rd in roles if rd.role_type == 'CustomRole'] if roles: return roles return [] @@ -220,7 +218,7 @@ def _create_role_assignment(cli_ctx, role, assignee, resource_group_name=None, s role_id = _resolve_role_id(role, scope, definitions_client) object_id = _resolve_object_id(cli_ctx, assignee) if resolve_assignee else assignee - worker = MultiAPIAdaptor(cli_ctx) + worker = RoleApiHelper(cli_ctx) return worker.create_role_assignment(assignments_client, assignment_name, role_id, object_id, scope, assignee_principal_type, description=description, condition=condition, condition_version=condition_version) @@ -251,7 +249,7 @@ def list_role_assignments(cmd, # pylint: disable=too-many-locals, too-many-bran if assignee and not assignee_object_id: assignee_object_id = _resolve_object_id(cmd.cli_ctx, assignee, fallback_to_object_id=True) - assignments = _search_role_assignments(cmd.cli_ctx, assignments_client, definitions_client, + assignments = _search_role_assignments(assignments_client, definitions_client, scope, assignee_object_id, role, include_inherited, include_groups) @@ -262,34 +260,23 @@ def list_role_assignments(cmd, # pylint: disable=too-many-locals, too-many-bran # Fill in role definition names if fill_role_definition_name: - worker = MultiAPIAdaptor(cmd.cli_ctx) role_defs = list(definitions_client.list( scope=scope or ('/subscriptions/' + definitions_client._config.subscription_id))) - role_dics = {i.id: worker.get_role_property(i, 'role_name') for i in role_defs} - for i in results: - if not i.get('roleDefinitionName'): - if role_dics.get(worker.get_role_property(i, 'roleDefinitionId')): - worker.set_role_property(i, 'roleDefinitionName', - role_dics[worker.get_role_property(i, 'roleDefinitionId')]) - else: - i['roleDefinitionName'] = None # the role definition might have been deleted + role_dics = {rd.id: rd.role_name for rd in role_defs} + for ra in results: + ra['roleDefinitionName'] = role_dics.get(ra['roleDefinitionId']) # Fill in principal names if fill_principal_name: - worker = MultiAPIAdaptor(cmd.cli_ctx) - principal_ids = set(worker.get_role_property(i, 'principalId') - for i in results if worker.get_role_property(i, 'principalId')) + principal_ids = set(ra['principalId'] for ra in results) if principal_ids: try: principals = _get_object_stubs(graph_client, principal_ids) principal_dics = {i[ID]: _get_displayable_name(i) for i in principals} - for i in [r for r in results if not r.get('principalName')]: - i['principalName'] = '' - if principal_dics.get(worker.get_role_property(i, 'principalId')): - worker.set_role_property(i, 'principalName', - principal_dics[worker.get_role_property(i, 'principalId')]) + for ra in results: + ra['principalName'] = principal_dics.get(ra['principalId']) or '' except (HttpResponseError, GraphError) as ex: # failure on resolving principal due to graph permission should not fail the whole thing logger.info("Failed to resolve graph object information per error '%s'", ex) @@ -307,9 +294,6 @@ def update_role_assignment(cmd, role_assignment): else: role_assignment = shell_safe_json_parse(role_assignment) - # Updating role assignment is only supported after 2020-04-01-preview, so we don't need to use MultiAPIAdaptor. - from azure.cli.core.profiles import get_sdk - RoleAssignment = get_sdk(cmd.cli_ctx, ResourceType.MGMT_AUTHORIZATION, 'RoleAssignment', mod='models', operation_group='role_assignments') assignment = RoleAssignment.from_dict(role_assignment) @@ -379,7 +363,6 @@ def _get_assignment_events(cli_ctx, start_time=None, end_time=None): def list_role_assignment_change_logs(cmd, start_time=None, end_time=None): # pylint: disable=too-many-branches # pylint: disable=too-many-nested-blocks, too-many-statements result = [] - worker = MultiAPIAdaptor(cmd.cli_ctx) start_events, end_events = _get_assignment_events(cmd.cli_ctx, start_time, end_time) # Use the resource `name` of roleDefinitions as keys, instead of `id`, because `id` can be inherited. @@ -387,7 +370,7 @@ def list_role_assignment_change_logs(cmd, start_time=None, end_time=None): # py # id: /subscriptions/0b1f6471-1bf0-4dda-aec3-cb9272f09590/providers/Microsoft.Authorization/roleDefinitions/b24988ac-6180-42a0-ab88-20f7382dd24c # pylint: disable=line-too-long if start_events: # Only query Role Definitions and Graph when there are events returned - role_defs = {d.name: worker.get_role_property(d, 'role_name') for d in list_role_definitions(cmd)} + role_defs = {rd.name: rd.role_name for rd in list_role_definitions(cmd)} for k, v in start_events.items(): e = end_events.get(k, None) @@ -514,7 +497,7 @@ def delete_role_assignments(cmd, ids=None, # Delay resolving object ID, because if ids are provided, no need to resolve if assignee and not assignee_object_id: assignee_object_id = _resolve_object_id(cmd.cli_ctx, assignee, fallback_to_object_id=True) - assignments = _search_role_assignments(cmd.cli_ctx, assignments_client, definitions_client, + assignments = _search_role_assignments(assignments_client, definitions_client, scope, assignee_object_id, role, include_inherited, include_groups=False) @@ -525,7 +508,7 @@ def delete_role_assignments(cmd, ids=None, raise CLIError('No matched assignments were found to delete') -def _search_role_assignments(cli_ctx, assignments_client, definitions_client, +def _search_role_assignments(assignments_client, definitions_client, scope, assignee_object_id, role, include_inherited, include_groups): # https://learn.microsoft.com/en-us/azure/role-based-access-control/role-assignments-list-rest # "atScope()" and "principalId eq '{value}'" query cannot be used together (API limitation). @@ -544,26 +527,25 @@ def _search_role_assignments(cli_ctx, assignments_client, definitions_client, else: assignments = list(assignments_client.list_for_subscription()) - worker = MultiAPIAdaptor(cli_ctx) if assignments: - assignments = [a for a in assignments if ( + assignments = [ra for ra in assignments if ( # If no scope, list all assignments not scope or # If scope is provided with include_inherited, list assignments at and above the scope. # Note that assignments below the scope are already excluded by atScope() include_inherited or # If scope is provided, list assignments at the scope - worker.get_role_property(a, 'scope').lower() == scope.lower() + ra.scope.lower() == scope.lower() )] if role: role_id = _resolve_role_id(role, scope, definitions_client) - assignments = [i for i in assignments if worker.get_role_property(i, 'role_definition_id') == role_id] + assignments = [ra for ra in assignments if ra.role_definition_id == role_id] # filter the assignee if "include_groups" is not provided because service side # does not accept filter "principalId eq and atScope()" if assignee_object_id and not include_groups: - assignments = [i for i in assignments if worker.get_role_property(i, 'principal_id') == assignee_object_id] + assignments = [ra for ra in assignments if ra.principal_id == assignee_object_id] return assignments @@ -2002,3 +1984,49 @@ def _get_member_groups(get_member_group_func, identifier, security_enabled_only) "securityEnabledOnly": security_enabled_only } return get_member_group_func(identifier, body) + + +class RoleApiHelper: + def __init__(self, cli_ctx): + self.cli_ctx = cli_ctx + + def _init_individual_permission(self, cfg): + Permission = get_sdk(self.cli_ctx, ResourceType.MGMT_AUTHORIZATION, 'Permission', mod='models', + operation_group='role_definitions') + permission = Permission(actions=cfg.get('actions', None), + not_actions=cfg.get('notActions', None), + data_actions=cfg.get('dataActions', None), + not_data_actions=cfg.get('notDataActions', None)) + return permission + + def _init_permissions(self, role_definition_input): + # we will handle with or w/o 'permissions' + if 'permissions' in role_definition_input: + return [self._init_individual_permission(p) for p in role_definition_input['permissions']] + return [self._init_individual_permission(role_definition_input)] + + def create_role_definition(self, client, role_name, role_id, role_definition_input): + RoleDefinition = get_sdk(self.cli_ctx, ResourceType.MGMT_AUTHORIZATION, 'RoleDefinition', + mod='models', operation_group='role_definitions') + role_definition = RoleDefinition(role_name=role_name, + description=role_definition_input.get('description', None), + type='CustomRole', + assignable_scopes=role_definition_input['assignableScopes'], + permissions=self._init_permissions(role_definition_input)) + scope = role_definition_input['assignableScopes'][0] + return client.create_or_update(role_definition_id=role_id, scope=scope, role_definition=role_definition) + + def create_role_assignment(self, client, assignment_name, role_id, object_id, scope, assignee_principal_type=None, + description=None, condition=None, condition_version=None): + RoleAssignmentCreateParameters = get_sdk( + self.cli_ctx, ResourceType.MGMT_AUTHORIZATION, 'RoleAssignmentCreateParameters', + mod='models', operation_group='role_assignments') + + # In 2022-04-01 API, principal_type is by default 'User', so we have to explicitly set it to None if we can't + # resolve principal type from Graph + # https://github.com/Azure/azure-rest-api-specs/issues/21664 + parameters = RoleAssignmentCreateParameters( + role_definition_id=role_id, principal_id=object_id, principal_type=assignee_principal_type, + description=description, condition=condition, condition_version=condition_version) + + return client.create(scope, assignment_name, parameters)