-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Identity sas #7020
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Identity sas #7020
Changes from 1 commit
512d5a4
8bed721
22b801a
bfd3fbd
7707acf
e945a70
8c821a6
8c5c22c
d46e3dc
fcf0ec6
3c6b3e0
62dc377
3d00c52
9afed1d
bd9f32e
99a07ad
e7811c1
a565b90
aed02d4
17540b4
9c327cc
450a66e
969816b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -133,6 +133,7 @@ def __init__( | |
| except AttributeError: | ||
| raise ValueError("Blob URL must be a string.") | ||
| parsed_url = urlparse(blob_url.rstrip('/')) | ||
| self.account_name = parsed_url.hostname.split('.')[0] | ||
|
||
| if not parsed_url.path and not (container and blob): | ||
| raise ValueError("Please specify a container and blob name.") | ||
| if not parsed_url.netloc: | ||
|
|
@@ -231,7 +232,8 @@ def generate_shared_access_signature( | |
| content_disposition=None, # type: Optional[str] | ||
| content_encoding=None, # type: Optional[str] | ||
| content_language=None, # type: Optional[str] | ||
| content_type=None # type: Optional[str] | ||
| content_type=None, # type: Optional[str] | ||
| user_delegation_key=None | ||
xiafu-msft marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ): | ||
| # type: (...) -> Any | ||
| """ | ||
|
|
@@ -289,12 +291,20 @@ def generate_shared_access_signature( | |
| :param str content_type: | ||
| Response header value for Content-Type when resource is accessed | ||
| using this shared access signature. | ||
| :param ~azure.storage.blob._shared.models.UserDelegationKey user_delegation_key: | ||
| Instead of an account key, the user could pass in a user delegation key. | ||
| A user delegation key can be obtained from the service by authenticating with an AAD identity; | ||
| this can be accomplished by calling get_user_delegation_key. | ||
| When present, the SAS is signed with the user delegation key instead. | ||
| :return: A Shared Access Signature (sas) token. | ||
| :rtype: str | ||
| """ | ||
| if not hasattr(self.credential, 'account_key') or not self.credential.account_key: | ||
| raise ValueError("No account SAS key available.") | ||
| sas = BlobSharedAccessSignature(self.credential.account_name, self.credential.account_key) | ||
| if user_delegation_key is not None: | ||
| sas = BlobSharedAccessSignature(self.account_name, user_delegation_key=user_delegation_key) | ||
|
||
| else: | ||
| if not hasattr(self.credential, 'account_key') or not self.credential.account_key: | ||
| raise ValueError("No account SAS key available.") | ||
| sas = BlobSharedAccessSignature(self.credential.account_name, self.credential.account_key) | ||
| return sas.generate_blob( | ||
| self.container_name, | ||
| self.blob_name, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,7 @@ def __init__( | |
| except AttributeError: | ||
| raise ValueError("Container URL must be a string.") | ||
| parsed_url = urlparse(container_url.rstrip('/')) | ||
| self.account_name = parsed_url.hostname.split('.')[0] | ||
|
||
| if not parsed_url.path and not container: | ||
| raise ValueError("Please specify a container name.") | ||
| if not parsed_url.netloc: | ||
|
|
@@ -192,7 +193,8 @@ def generate_shared_access_signature( | |
| content_disposition=None, # type: Optional[str] | ||
| content_encoding=None, # type: Optional[str] | ||
| content_language=None, # type: Optional[str] | ||
| content_type=None # type: Optional[str] | ||
| content_type=None, # type: Optional[str] | ||
| user_delegation_key=None # type Optional[] | ||
| ): | ||
| # type: (...) -> Any | ||
| """Generates a shared access signature for the container. | ||
|
|
@@ -248,6 +250,11 @@ def generate_shared_access_signature( | |
| :param str content_type: | ||
| Response header value for Content-Type when resource is accessed | ||
| using this shared access signature. | ||
| :param ~azure.storage.blob._shared.models.UserDelegationKey user_delegation_key: | ||
| Instead of an account key, the user could pass in a user delegation key. | ||
| A user delegation key can be obtained from the service by authenticating with an AAD identity; | ||
| this can be accomplished by calling get_user_delegation_key. | ||
| When present, the SAS is signed with the user delegation key instead. | ||
| :return: A Shared Access Signature (sas) token. | ||
| :rtype: str | ||
|
|
||
|
|
@@ -259,9 +266,12 @@ def generate_shared_access_signature( | |
| :dedent: 12 | ||
| :caption: Generating a sas token. | ||
| """ | ||
| if not hasattr(self.credential, 'account_key') and not self.credential.account_key: | ||
| raise ValueError("No account SAS key available.") | ||
| sas = BlobSharedAccessSignature(self.credential.account_name, self.credential.account_key) | ||
| if user_delegation_key is not None: | ||
| sas = BlobSharedAccessSignature(self.account_name, user_delegation_key=user_delegation_key) | ||
| else: | ||
| if not hasattr(self.credential, 'account_key') and not self.credential.account_key: | ||
| raise ValueError("No account SAS key available.") | ||
| sas = BlobSharedAccessSignature(self.credential.account_name, self.credential.account_key) | ||
| return sas.generate_container( | ||
| self.container_name, | ||
| permission=permission, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1264,6 +1264,67 @@ def test_account_sas(self): | |
| self.assertEqual(self.byte_data, blob_response.content) | ||
| self.assertTrue(container_response.ok) | ||
|
|
||
| def test_get_user_delegation_key(self): | ||
| if TestMode.need_recording_file(self.test_mode): | ||
| return | ||
|
||
| # Act | ||
| token_credential = self.generate_oauth_token() | ||
|
|
||
| # Action 1: make sure token works | ||
| service = BlobServiceClient(self._get_oauth_account_url(), credential=token_credential) | ||
|
|
||
| start = datetime.utcnow() | ||
| expiry = datetime.utcnow() + timedelta(hours=1) | ||
| user_delegation_key_1 = service.get_user_delegation_key(key_start_time=start, key_expiry_time=expiry) | ||
| user_delegation_key_2 = service.get_user_delegation_key(key_start_time=start, key_expiry_time=expiry) | ||
|
|
||
| # Assert key1 is valid | ||
| self.assertIsNotNone(user_delegation_key_1.signed_oid) | ||
| self.assertIsNotNone(user_delegation_key_1.signed_tid) | ||
| self.assertIsNotNone(user_delegation_key_1.signed_start) | ||
| self.assertIsNotNone(user_delegation_key_1.signed_expiry) | ||
| self.assertIsNotNone(user_delegation_key_1.signed_version) | ||
| self.assertIsNotNone(user_delegation_key_1.signed_service) | ||
| self.assertIsNotNone(user_delegation_key_1.value) | ||
|
|
||
| # Assert key1 and key2 are equal, since they have the exact same start and end times | ||
| self.assertEqual(user_delegation_key_1.signed_oid, user_delegation_key_2.signed_oid) | ||
| self.assertEqual(user_delegation_key_1.signed_tid, user_delegation_key_2.signed_tid) | ||
| self.assertEqual(user_delegation_key_1.signed_start, user_delegation_key_2.signed_start) | ||
| self.assertEqual(user_delegation_key_1.signed_expiry, user_delegation_key_2.signed_expiry) | ||
| self.assertEqual(user_delegation_key_1.signed_version, user_delegation_key_2.signed_version) | ||
| self.assertEqual(user_delegation_key_1.signed_service, user_delegation_key_2.signed_service) | ||
| self.assertEqual(user_delegation_key_1.value, user_delegation_key_2.value) | ||
|
|
||
| def test_user_delegation_sas_for_blob(self): | ||
| # SAS URL is calculated from storage key, so this test runs live only | ||
| if TestMode.need_recording_file(self.test_mode): | ||
| return | ||
|
|
||
| # Arrange | ||
| token_credential = self.generate_oauth_token() | ||
| service_client = BlobServiceClient(self._get_oauth_account_url(), credential=token_credential) | ||
| user_delegation_key = service_client.get_user_delegation_key(datetime.utcnow(), | ||
| datetime.utcnow() + timedelta(hours=1)) | ||
|
|
||
| container_client = service_client.create_container(self.get_resource_name('oauthcontainer')) | ||
| blob_client = container_client.get_blob_client(self.get_resource_name('oauthblob')) | ||
| blob_client.upload_blob(self.byte_data, length=len(self.byte_data)) | ||
|
|
||
| token = blob_client.generate_shared_access_signature( | ||
| permission=BlobPermissions.READ, | ||
| expiry=datetime.utcnow() + timedelta(hours=1), | ||
| user_delegation_key=user_delegation_key, | ||
| ) | ||
|
|
||
| # Act | ||
| # Use the generated identity sas | ||
| new_blob_client = BlobClient(blob_client.url, credential=token) | ||
| content = new_blob_client.download_blob() | ||
|
|
||
| # Assert | ||
| self.assertEqual(self.byte_data, b"".join(list(content))) | ||
|
|
||
| @record | ||
| def test_token_credential(self): | ||
| token_credential = self.generate_oauth_token() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| AccessPolicy | ||
| ) | ||
|
|
||
| from azure.identity import ClientSecretCredential | ||
| from testcase import StorageTestCase, TestMode, record, LogCaptured | ||
|
|
||
| #------------------------------------------------------------------------------ | ||
|
|
@@ -71,6 +72,14 @@ def _create_container(self, prefix=TEST_CONTAINER_PREFIX): | |
| pass | ||
| return container | ||
|
|
||
| def _generate_oauth_token(self): | ||
|
||
|
|
||
| return ClientSecretCredential( | ||
| self.settings.ACTIVE_DIRECTORY_APPLICATION_ID, | ||
| self.settings.ACTIVE_DIRECTORY_APPLICATION_SECRET, | ||
| self.settings.ACTIVE_DIRECTORY_TENANT_ID | ||
| ) | ||
|
|
||
| #--Test cases for containers ----------------------------------------- | ||
| @record | ||
| def test_create_container(self): | ||
|
|
@@ -1068,6 +1077,34 @@ def test_web_container_normal_operations_working(self): | |
| # delete container | ||
| container.delete_container() | ||
|
|
||
| def test_user_delegation_sas_for_container(self): | ||
| # SAS URL is calculated from storage key, so this test runs live only | ||
| if TestMode.need_recording_file(self.test_mode): | ||
| return | ||
|
|
||
| # Arrange | ||
| token_credential = self.generate_oauth_token() | ||
| service_client = BlobServiceClient(self._get_oauth_account_url(), credential=token_credential) | ||
| user_delegation_key = service_client.get_user_delegation_key(datetime.utcnow(), | ||
| datetime.utcnow() + timedelta(hours=1)) | ||
|
|
||
| container_client = service_client.create_container(self.get_resource_name('oauthcontainer')) | ||
| token = container_client.generate_shared_access_signature( | ||
| expiry=datetime.utcnow() + timedelta(hours=1), | ||
| permission=ContainerPermissions.READ, | ||
| user_delegation_key=user_delegation_key, | ||
| ) | ||
|
|
||
| blob_client = container_client.get_blob_client(self.get_resource_name('oauthblob')) | ||
| blob_content = self.get_random_text_data(1024) | ||
| blob_client.upload_blob(blob_content, length=len(blob_content)) | ||
|
|
||
| # Act | ||
| new_blob_client = BlobClient(blob_client.url, credential=token) | ||
| content = new_blob_client.download_blob() | ||
|
|
||
| # Assert | ||
| self.assertEqual(blob_content, b"".join(list(content)).decode('utf-8')) | ||
|
|
||
| #------------------------------------------------------------------------------ | ||
| if __name__ == '__main__': | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not always work - a customer could be using a custom URL, it wont always be the storage account name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See how this is handled in
generate_shared_access_signaturefor reference