-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy pathaccess_cosmos_with_aad_async.py
More file actions
115 lines (94 loc) · 5.11 KB
/
access_cosmos_with_aad_async.py
File metadata and controls
115 lines (94 loc) · 5.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from azure.cosmos.aio import CosmosClient
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import PartitionKey
from azure.identity.aio import ClientSecretCredential, DefaultAzureCredential
import config
import asyncio
# ----------------------------------------------------------------------------------------------------------
# Prerequisites -
#
# 1. An Azure Cosmos account -
# https://learn.microsoft.com/azure/cosmos-db/create-sql-api-python#create-a-database-account
#
# 2. Microsoft Azure Cosmos
# pip install azure-cosmos>=4.3.0b4
# ----------------------------------------------------------------------------------------------------------
# Sample - demonstrates how to authenticate and use your database account using AAD credentials
# Read more about operations allowed for this authorization method: https://aka.ms/cosmos-native-rbac
# ----------------------------------------------------------------------------------------------------------
# Note:
# This sample creates a Container to your database account.
# Each time a Container is created the account will be billed for 1 hour of usage based on
# the provisioned throughput (RU/s) of that account.
# ----------------------------------------------------------------------------------------------------------
# <configureConnectivity>
HOST = config.settings["host"]
MASTER_KEY = config.settings["master_key"]
TENANT_ID = config.settings["tenant_id"]
CLIENT_ID = config.settings["client_id"]
CLIENT_SECRET = config.settings["client_secret"]
DATABASE_ID = config.settings["database_id"]
CONTAINER_ID = config.settings["container_id"]
PARTITION_KEY = PartitionKey(path="/id")
def get_test_item(num):
test_item = {
'id': 'Item_' + str(num),
'test_object': True,
'lastName': 'Smith'
}
return test_item
async def create_sample_resources():
print("creating sample resources")
async with CosmosClient(HOST, MASTER_KEY) as client:
db = await client.create_database(DATABASE_ID)
await db.create_container(id=CONTAINER_ID, partition_key=PARTITION_KEY)
async def delete_sample_resources():
print("deleting sample resources")
async with CosmosClient(HOST, MASTER_KEY) as client:
await client.delete_database(DATABASE_ID)
async def run_sample():
# Since Azure Cosmos DB data plane SDK does not cover management operations, we have to create our resources
# with a master key authenticated client for this sample.
await create_sample_resources()
# With this done, you can use your AAD service principal id and secret to create your ClientSecretCredential.
# The async ClientSecretCredentials, like the async client, also have a context manager,
# and as such should be used with the `async with` keywords.
async with ClientSecretCredential(
tenant_id=TENANT_ID,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET) as aad_credentials:
# Use your credentials to authenticate your client.
async with CosmosClient(HOST, aad_credentials) as aad_client:
print("Showed ClientSecretCredential, now showing DefaultAzureCredential")
# You can also utilize DefaultAzureCredential rather than directly passing in the id's and secrets.
# This is the recommended method of authentication, and uses environment variables rather than in-code strings.
async with DefaultAzureCredential() as aad_credentials:
# Use your credentials to authenticate your client.
async with CosmosClient(HOST, aad_credentials) as aad_client:
# Do any R/W data operations with your authorized AAD client.
db = aad_client.get_database_client(DATABASE_ID)
container = db.get_container_client(CONTAINER_ID)
print("Container info: " + str(container.read()))
await container.create_item(get_test_item(879))
print("Point read result: " + str(container.read_item(item='Item_0', partition_key='Item_0')))
query_results = [item async for item in
container.query_items(query='select * from c', partition_key='Item_0')]
assert len(query_results) == 1
print("Query result: " + str(query_results[0]))
await container.delete_item(item='Item_0', partition_key='Item_0')
# Attempting to do management operations will return a 403 Forbidden exception.
try:
await aad_client.delete_database(DATABASE_ID)
except exceptions.CosmosHttpResponseError as e:
assert e.status_code == 403
print("403 error assertion success")
# To clean up the sample, we use a master key client again to get access to deleting containers/ databases.
await delete_sample_resources()
print("end of sample")
if __name__ == "__main__":
asyncio.run(run_sample())