Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add back storage azure stack
  • Loading branch information
kairu-ms committed Apr 25, 2025
commit 700736e14b9200532eb89d6f892fbc95868b294b
66 changes: 65 additions & 1 deletion src/azure-cli/azure/cli/command_modules/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ def load_arguments(self, command):
load_arguments(self, command)


class AzureStackStorageCommandsLoader(AzCommandsLoader):
def __init__(self, cli_ctx=None):
from azure.cli.core.commands import CliCommandType

storage_custom = CliCommandType(operations_tmpl='azure.cli.command_modules.storage.custom#{}')
super().__init__(cli_ctx=cli_ctx,
resource_type=ResourceType.DATA_STORAGE,
custom_command_type=storage_custom,
command_group_cls=AzureStackStorageCommandGroup,
argument_context_cls=StorageArgumentContext)

def load_command_table(self, args):
super().load_command_table(args)
from azure.cli.command_modules.storage.commands_azure_stack import load_command_table
load_command_table(self, args)
return self.command_table

def load_arguments(self, command):
super().load_arguments(command)
from azure.cli.command_modules.storage._params_azure_stack import load_arguments
load_arguments(self, command)


class StorageArgumentContext(AzArgumentContext):
def register_sas_arguments(self):
from azure.cli.command_modules.storage._validators import ipv4_range_type, get_datetime_type
Expand Down Expand Up @@ -359,6 +382,44 @@ def _register_data_plane_oauth_arguments(self, command_name):
'allowed data actions, even if there are ACLs in place for those files/directories.')


class AzureStackStorageCommandGroup(StorageCommandGroup):

@classmethod
def get_handler_suppress_some_400(cls):
def handler(ex):
if hasattr(ex, 'status_code') and ex.status_code == 403:
# TODO: Revisit the logic here once the service team updates their response
if 'AuthorizationPermissionMismatch' in ex.args[0]:
message = """
You do not have the required permissions needed to perform this operation.
Depending on your operation, you may need to be assigned one of the following roles:
"Storage Blob Data Contributor"
"Storage Blob Data Reader"
"Storage Queue Data Contributor"
"Storage Queue Data Reader"
"Storage Table Data Contributor"
"Storage Table Data Reader"

If you want to use the old authentication method and allow querying for the right account key, please use the "--auth-mode" parameter and "key" value.
"""
ex.args = (message,)
elif 'AuthorizationFailure' in ex.args[0]:
message = """
The request may be blocked by network rules of storage account. Please check network rule set using 'az storage account show -n accountname --query networkRuleSet'.
If you want to change the default action to apply when no rule matches, please use 'az storage account update'.
"""
ex.args = (message,)
elif 'AuthenticationFailed' in ex.args[0]:
message = """
Authentication failure. This may be caused by either invalid account key, connection string or sas token value provided for your storage account.
"""
ex.args = (message,)
if hasattr(ex, 'status_code') and ex.status_code == 409 and 'NoPendingCopyOperation' in ex.args[0]:
pass

return handler


def _merge_new_exception_handler(kwargs, handler):
first = kwargs.get('exception_handler')

Expand All @@ -370,5 +431,8 @@ def new_handler(ex):
kwargs['exception_handler'] = new_handler


def get_command_loader(_):
def get_command_loader(cli_ctx):
if cli_ctx.cloud.profile.lower() != 'latest':
return AzureStackStorageCommandsLoader

return StorageCommandsLoader
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from azure.cli.core.commands.client_factory import get_mgmt_service_client, get_data_service_client
from azure.cli.core.profiles import ResourceType, get_sdk

from azure.cli.command_modules.storage.sdkutil import get_table_data_type

MISSING_CREDENTIALS_ERROR_MESSAGE = """
Missing credentials to access storage service. The following variations are accepted:
(1) account name and key (--account-name and --account-key options or
set AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_KEY environment variables)
(2) account name and SAS token (--sas-token option used with either the --account-name
option or AZURE_STORAGE_ACCOUNT environment variable)
(3) account name (--account-name option or AZURE_STORAGE_ACCOUNT environment variable;
this will make calls to query for a storage account key using login credentials)
(4) connection string (--connection-string option or
set AZURE_STORAGE_CONNECTION_STRING environment variable); some shells will require
quoting to preserve literal character interpretation.
"""


def get_storage_data_service_client(cli_ctx, service, name=None, key=None, connection_string=None, sas_token=None,
socket_timeout=None, token_credential=None):
return get_data_service_client(cli_ctx, service, name, key, connection_string, sas_token,
socket_timeout=socket_timeout,
token_credential=token_credential,
endpoint_suffix=cli_ctx.cloud.suffixes.storage_endpoint)


def generic_data_service_factory(cli_ctx, service, name=None, key=None, connection_string=None, sas_token=None,
socket_timeout=None, token_credential=None):
try:
return get_storage_data_service_client(cli_ctx, service, name, key, connection_string, sas_token,
socket_timeout, token_credential)
except ValueError as val_exception:
_ERROR_STORAGE_MISSING_INFO = get_sdk(cli_ctx, ResourceType.DATA_STORAGE,
'common._error#_ERROR_STORAGE_MISSING_INFO')
message = str(val_exception)
if message == _ERROR_STORAGE_MISSING_INFO:
message = MISSING_CREDENTIALS_ERROR_MESSAGE
from knack.util import CLIError
raise CLIError(message)


def storage_client_factory(cli_ctx, **_):
return get_mgmt_service_client(cli_ctx, ResourceType.MGMT_STORAGE)


def file_data_service_factory(cli_ctx, kwargs):
t_file_svc = get_sdk(cli_ctx, ResourceType.DATA_STORAGE, 'file#FileService')
return generic_data_service_factory(cli_ctx, t_file_svc, kwargs.pop('account_name', None),
kwargs.pop('account_key', None),
connection_string=kwargs.pop('connection_string', None),
sas_token=kwargs.pop('sas_token', None))


def page_blob_service_factory(cli_ctx, kwargs):
t_page_blob_service = get_sdk(cli_ctx, ResourceType.DATA_STORAGE, 'blob.pageblobservice#PageBlobService')
return generic_data_service_factory(cli_ctx, t_page_blob_service, kwargs.pop('account_name', None),
kwargs.pop('account_key', None),
connection_string=kwargs.pop('connection_string', None),
sas_token=kwargs.pop('sas_token', None),
token_credential=kwargs.pop('token_credential', None))


def blob_data_service_factory(cli_ctx, kwargs):
if 'encryption_scope' in kwargs and kwargs['encryption_scope']:
return cf_blob_client(cli_ctx, kwargs)
from azure.cli.command_modules.storage.sdkutil import get_blob_service_by_type
blob_type = kwargs.get('blob_type')
blob_service = get_blob_service_by_type(cli_ctx, blob_type) or get_blob_service_by_type(cli_ctx, 'block')

return generic_data_service_factory(cli_ctx, blob_service, kwargs.pop('account_name', None),
kwargs.pop('account_key', None),
connection_string=kwargs.pop('connection_string', None),
sas_token=kwargs.pop('sas_token', None),
socket_timeout=kwargs.pop('socket_timeout', None),
token_credential=kwargs.pop('token_credential', None))


def table_data_service_factory(cli_ctx, kwargs):
return generic_data_service_factory(cli_ctx,
get_table_data_type(cli_ctx, 'table', 'TableService'),
kwargs.pop('account_name', None),
kwargs.pop('account_key', None),
connection_string=kwargs.pop('connection_string', None),
sas_token=kwargs.pop('sas_token', None))


def queue_data_service_factory(cli_ctx, kwargs):
t_queue_service = get_sdk(cli_ctx, ResourceType.DATA_STORAGE, 'queue#QueueService')
return generic_data_service_factory(
cli_ctx, t_queue_service,
kwargs.pop('account_name', None),
kwargs.pop('account_key', None),
connection_string=kwargs.pop('connection_string', None),
sas_token=kwargs.pop('sas_token', None),
token_credential=kwargs.pop('token_credential', None))


def cloud_storage_account_service_factory(cli_ctx, kwargs):
t_cloud_storage_account = get_sdk(cli_ctx, ResourceType.DATA_STORAGE, 'common#CloudStorageAccount')
account_name = kwargs.pop('account_name', None)
account_key = kwargs.pop('account_key', None)
sas_token = kwargs.pop('sas_token', None)
kwargs.pop('connection_string', None)
return t_cloud_storage_account(account_name, account_key, sas_token)


def multi_service_properties_factory(cli_ctx, kwargs):
"""Create multiple data services properties instance based on the services option"""
from .services_wrapper_azure_stack import ServiceProperties

t_base_blob_service, t_file_service, t_queue_service, = get_sdk(cli_ctx, ResourceType.DATA_STORAGE,
'blob.baseblobservice#BaseBlobService',
'file#FileService', 'queue#QueueService')

t_table_service = get_table_data_type(cli_ctx, 'table', 'TableService')

account_name = kwargs.pop('account_name', None)
account_key = kwargs.pop('account_key', None)
connection_string = kwargs.pop('connection_string', None)
sas_token = kwargs.pop('sas_token', None)
services = kwargs.pop('services', [])

def get_creator(name, service_type):
return lambda: ServiceProperties(cli_ctx, name, service_type, account_name, account_key, connection_string,
sas_token)

creators = {'b': get_creator('blob', t_base_blob_service), 'f': get_creator('file', t_file_service),
'q': get_creator('queue', t_queue_service), 't': get_creator('table', t_table_service)}

return [creators[s]() for s in services]


def cf_sa(cli_ctx, _):
return storage_client_factory(cli_ctx).storage_accounts


def cf_sa_for_keys(cli_ctx, _):
from knack.log import get_logger
logger = get_logger(__name__)
logger.debug('Disable HTTP logging to avoid having storage keys in debug logs')
client = storage_client_factory(cli_ctx)
return client.storage_accounts


def cf_mgmt_policy(cli_ctx, _):
return storage_client_factory(cli_ctx).management_policies


def cf_blob_container_mgmt(cli_ctx, _):
return storage_client_factory(cli_ctx).blob_containers


def cf_mgmt_blob_services(cli_ctx, _):
return storage_client_factory(cli_ctx).blob_services


def cf_mgmt_file_services(cli_ctx, _):
return storage_client_factory(cli_ctx).file_services


def cf_mgmt_file_shares(cli_ctx, _):
return storage_client_factory(cli_ctx).file_shares


def cf_blob_data_gen_update(cli_ctx, kwargs):
return blob_data_service_factory(cli_ctx, kwargs.copy())


def cf_private_link(cli_ctx, _):
return storage_client_factory(cli_ctx).private_link_resources


def cf_private_endpoint(cli_ctx, _):
return storage_client_factory(cli_ctx).private_endpoint_connections


def cf_mgmt_encryption_scope(cli_ctx, _):
return storage_client_factory(cli_ctx).encryption_scopes


def get_account_url(cli_ctx, account_name, service):
from knack.util import CLIError
if account_name is None:
raise CLIError("Please provide storage account name or connection string.")
storage_endpoint = cli_ctx.cloud.suffixes.storage_endpoint
return "https://{}.{}.{}".format(account_name, service, storage_endpoint)


def cf_blob_service(cli_ctx, kwargs):
from knack.util import CLIError
t_blob_service = get_sdk(cli_ctx, ResourceType.DATA_STORAGE_BLOB,
'_blob_service_client#BlobServiceClient')
connection_string = kwargs.pop('connection_string', None)
account_name = kwargs.pop('account_name', None)
account_key = kwargs.pop('account_key', None)
token_credential = kwargs.pop('token_credential', None)
sas_token = kwargs.pop('sas_token', None)
if connection_string:
return t_blob_service.from_connection_string(conn_str=connection_string)

account_url = get_account_url(cli_ctx, account_name=account_name, service='blob')
credential = account_key or sas_token or token_credential

if account_url and credential:
return t_blob_service(account_url=account_url, credential=credential)
raise CLIError("Please provide valid connection string, or account name with account key, "
"sas token or login auth mode.")


def cf_blob_client(cli_ctx, kwargs):
return cf_blob_service(cli_ctx, kwargs).get_blob_client(container=kwargs['container_name'],
blob=kwargs['blob_name'],
snapshot=kwargs.pop('snapshot', None))


def cf_adls_service(cli_ctx, kwargs):
t_adls_service = get_sdk(cli_ctx, ResourceType.DATA_STORAGE_FILEDATALAKE,
'_data_lake_service_client#DataLakeServiceClient')
connection_string = kwargs.pop('connection_string', None)
account_key = kwargs.pop('account_key', None)
token_credential = kwargs.pop('token_credential', None)
sas_token = kwargs.pop('sas_token', None)
if connection_string:
return t_adls_service.from_connection_string(connection_string=connection_string)

account_url = get_account_url(cli_ctx, account_name=kwargs.pop('account_name', None), service='dfs')
credential = account_key or sas_token or token_credential

if account_url and credential:
return t_adls_service(account_url=account_url, credential=credential)
return None


def cf_adls_file_system(cli_ctx, kwargs):
return cf_adls_service(cli_ctx, kwargs).get_file_system_client(file_system=kwargs.pop('file_system_name'))


def cf_adls_directory(cli_ctx, kwargs):
return cf_adls_file_system(cli_ctx, kwargs).get_directory_client(directory=kwargs.pop('directory_path'))


def cf_adls_file(cli_ctx, kwargs):
return cf_adls_service(cli_ctx, kwargs).get_file_client(file_system=kwargs.pop('file_system_name', None),
file_path=kwargs.pop('path', None))
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from knack.log import get_logger
from azure.cli.core.profiles import get_sdk, ResourceType

logger = get_logger(__name__)


def transform_file_directory_result(cli_ctx):
"""
Transform a the result returned from file and directory listing API.
This transformer add and remove properties from File and Directory objects in the given list
in order to align the object's properties so as to offer a better view to the file and dir
list.
"""
def transformer(result):
if getattr(result, 'next_marker', None):
logger.warning('Next Marker:')
logger.warning(result.next_marker)

t_file, t_dir = get_sdk(cli_ctx, ResourceType.DATA_STORAGE, 'File', 'Directory', mod='file.models')
return_list = []
for each in result:
if isinstance(each, t_file):
delattr(each, 'content')
setattr(each, 'type', 'file')
elif isinstance(each, t_dir):
setattr(each, 'type', 'dir')
return_list.append(each)

return return_list
return transformer
Loading
Loading