Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
40a999f
Refactored blob shared utils
annatisch Jul 9, 2019
438b101
Refactored file shared utils
annatisch Jul 9, 2019
81fe88e
Refactored queue shared utils
annatisch Jul 9, 2019
7663070
Refactored downloads
annatisch Jul 10, 2019
42a4579
Refactored file downloads
annatisch Jul 10, 2019
0dcd679
Started async downloads
annatisch Jul 11, 2019
5d1ddbd
Async Files API
annatisch Jul 15, 2019
70854c4
Flatten copy polling
annatisch Jul 15, 2019
2337824
Renamed uploads
annatisch Jul 15, 2019
8db1a24
Fixes samples based on vendor feedback (#6357)
iscai-msft Jul 15, 2019
6ee03c9
Upload refactor
annatisch Jul 15, 2019
1dd0602
Release approval docs (#6361)
scbedd Jul 16, 2019
aacf90a
Updated async pipeline
annatisch Jul 16, 2019
26a7f15
Avoid surprising aiohttp with unexpected kwargs (#6355)
chlowell Jul 16, 2019
f69bdbe
Add challenge authentication to azure-keyvault-keys (#6244)
chlowell Jul 17, 2019
be84bd3
Add decorator (#6299)
SuyogSoti Jul 17, 2019
a62e09c
Added async file tests
annatisch Jul 17, 2019
1964544
Consolidate Key Vault shared code (#6384)
chlowell Jul 17, 2019
fb46ff2
Download tests
annatisch Jul 17, 2019
f261414
Service property tests
annatisch Jul 17, 2019
8508816
No recordings
annatisch Jul 17, 2019
e197b59
Add credential wrapping MSAL ConfidentialClientApplication (#6358)
chlowell Jul 17, 2019
47c24b5
Add policy (#6379)
SuyogSoti Jul 18, 2019
337db3c
adding dockerfile (#6393)
Jul 18, 2019
932cf73
Update cheatsheet.md
Jul 18, 2019
62aa8e9
Async share tests
annatisch Jul 18, 2019
04fafe9
Async directory tests
annatisch Jul 18, 2019
91d3678
Fixed some tests
annatisch Jul 18, 2019
b427140
aiohttp socket timeout
annatisch Jul 18, 2019
d02d82b
Merge remote-tracking branch 'upstream/master'
annatisch Jul 18, 2019
a9cc2bc
Patch azure core
annatisch Jul 18, 2019
1ce4e64
CI fixes
annatisch Jul 18, 2019
bee9ee4
Fix async tests for py35
annatisch Jul 18, 2019
dae678a
Python 3.5 support
annatisch Jul 18, 2019
d68d48c
Clean pylint
annatisch Jul 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Download tests
  • Loading branch information
annatisch committed Jul 17, 2019
commit fb46ff2b361a219cd603f3bcb01d7263e466e362
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
async def process_content(data, start_offset, end_offset, encryption):
if data is None:
raise ValueError("Response cannot be None.")
content = b""
async for chunk in data:
content += chunk
content = data.response.body
if encryption.get('key') is not None or encryption.get('resolver') is not None:
try:
return decrypt_blob(
Expand Down Expand Up @@ -52,6 +50,7 @@ def __init__(
start_range=None,
end_range=None,
stream=None,
parallel=None,
validate_content=None,
encryption_options=None,
**kwargs):
Expand All @@ -66,6 +65,12 @@ def __init__(

# the destination that we will write to
self.stream = stream
self.stream_lock = asyncio.Lock() if parallel else None
self.progress_lock = asyncio.Lock() if parallel else None

# for a parallel download, the stream is always seekable, so we note down the current position
# in order to seek to the right place when out-of-order chunks come in
self.stream_start = stream.tell() if parallel else None

# download progress so far
self.progress_total = current_progress
Expand Down Expand Up @@ -96,19 +101,25 @@ async def process_chunk(self, chunk_start):
length = chunk_end - chunk_start
if length > 0:
await self._write_to_stream(chunk_data, chunk_start)
self._update_progress(length)
await self._update_progress(length)

async def yield_chunk(self, chunk_start):
chunk_start, chunk_end = self._calculate_range(chunk_start)
return await self._download_chunk(chunk_start, chunk_end)

async def _update_progress(self, length):
async with self.progress_lock:
if self.progress_lock:
async with self.progress_lock:
self.progress_total += length
else:
self.progress_total += length

async def _write_to_stream(self, chunk_data, chunk_start):
async with self.stream_lock:
self.stream.seek(self.stream_start + (chunk_start - self.start_index))
if self.stream_lock:
async with self.stream_lock:
self.stream.seek(self.stream_start + (chunk_start - self.start_index))
self.stream.write(chunk_data)
else:
self.stream.write(chunk_data)

async def _download_chunk(self, chunk_start, chunk_end):
Expand Down Expand Up @@ -177,7 +188,6 @@ def __init__(

self.initial_range, self.initial_offset = process_range_and_offset(
initial_request_start, initial_request_end, self.length, self.encryption_options)

self.download_size = None
self.file_size = None
self.response = None
Expand Down Expand Up @@ -240,6 +250,7 @@ async def _async_data_iterator(self):
start_range=self.initial_range[1] + 1, # start where the first download ended
end_range=data_end,
stream=stream,
parallel=False,
validate_content=self.validate_content,
encryption_options=self.encryption_options,
use_location=self.location_mode,
Expand Down Expand Up @@ -347,7 +358,8 @@ async def download_to_stream(self, stream, max_connections=1):
:rtype: Any
"""
# the stream must be seekable if parallel download is required
if max_connections > 1:
parallel = max_connections > 1
if parallel:
error_message = "Target stream handle must be seekable."
if sys.version_info >= (3,) and not stream.seekable():
raise ValueError(error_message)
Expand Down Expand Up @@ -382,6 +394,7 @@ async def download_to_stream(self, stream, max_connections=1):
start_range=self.initial_range[1] + 1, # start where the first download ended
end_range=data_end,
stream=stream,
parallel=parallel,
validate_content=self.validate_content,
encryption_options=self.encryption_options,
use_location=self.location_mode,
Expand All @@ -392,7 +405,7 @@ async def download_to_stream(self, stream, max_connections=1):
asyncio.ensure_future(downloader.process_chunk(d))
for d in islice(dl_tasks, 0, max_connections)
]
while True:
while running_futures:
# Wait for some download to finish before adding a new one
_done, running_futures = await asyncio.wait(
running_futures, return_when=asyncio.FIRST_COMPLETED)
Expand All @@ -403,6 +416,7 @@ async def download_to_stream(self, stream, max_connections=1):
else:
running_futures.add(asyncio.ensure_future(downloader.process_chunk(next_chunk)))

# Wait for the remaining downloads to finish
await asyncio.wait(running_futures)
if running_futures:
# Wait for the remaining downloads to finish
await asyncio.wait(running_futures)
return self.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ async def send(self, request):
request.context.options.pop('raw_response_hook', self._response_callback)

response = await self.next.send(request)
await response.http_response.load_body()
response.http_response.internal_response.body = response.http_response.body()

will_retry = is_retry(response, request.context.options.get('mode'))
if not will_retry and download_stream_current is not None:
download_stream_current += int(response.http_response.headers.get('Content-Length', 0))
Expand Down
17 changes: 8 additions & 9 deletions sdk/storage/azure-storage-file/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ def _create_remote_file(self, file_data=None):
remote_file.upload_file(file_data)
return remote_file

def _wait_for_async_copy(self, share_name, dir_name, file_name):
def _wait_for_async_copy(self, share_name, file_path):
count = 0
file = self.fs.get_file_properties(share_name, dir_name, file_name)
while file.properties.copy.status != 'success':
share_client = self.fsc.get_share_client(share_name)
file_client = share_client.get_file_client(file_path)
properties = file_client.get_file_properties()
while properties.copy.status != 'success':
count = count + 1
if count > 10:
self.fail('Timed out waiting for async copy to complete.')
self.sleep(6)
file = self.fs.get_file_properties(share_name, dir_name, file_name)
self.assertEqual(file.properties.copy.status, 'success')
properties = file_client.get_file_properties()
self.assertEqual(properties.copy.status, 'success')

def assertFileEqual(self, file_client, expected_data):
actual_data = file_client.download_file().content_as_bytes()
Expand Down Expand Up @@ -708,10 +710,7 @@ def test_copy_file_async_private_file_with_sas(self):

# Assert
self.assertTrue(copy_resp['copy_status'] in ['success', 'pending'])
while copy_resp['copy_status'] =! 'success':
properties = file_client.get_file_properties()
copy_resp['copy_status'] = properties.copy.status
time.sleep(3)
self._wait_for_async_copy(self.share_name, target_file_name)

actual_data = file_client.download_file().content_as_bytes()
self.assertEqual(actual_data, data)
Expand Down
Loading