diff --git a/sdk/storage/Azure.Storage.Common/src/Shared/ContentRange.cs b/sdk/storage/Azure.Storage.Common/src/Shared/ContentRange.cs index f656382efad2..35bccf87d76c 100644 --- a/sdk/storage/Azure.Storage.Common/src/Shared/ContentRange.cs +++ b/sdk/storage/Azure.Storage.Common/src/Shared/ContentRange.cs @@ -153,7 +153,8 @@ public static HttpRange ToHttpRange(ContentRange contentRange) { // Because constructing HttpRange is the start value, and the length of the range // increment 1 on the end value, since the end value is the end index (not the length). - return new HttpRange(contentRange.Start.Value, contentRange.End.Value + 1); + long length = contentRange.End.Value - contentRange.Start.Value + 1; + return new HttpRange(contentRange.Start.Value, length); } return default; } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml index d77d2310bcf3..d2baad6e9a7e 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf-tests.yml @@ -15,6 +15,7 @@ Tests: Arguments: &sizes - --size 1024 --count 5000 --duration 60 --concurrency 64 - --size 10485760 --count 500 --duration 90 --concurrency 64 + - --size 52428800 --count 200 --duration 120 --concurrency 64 - --size 1073741824 --count 5 --duration 120 --concurrency 64 - Test: upload diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs index 6df462c6c90b..da3c954f7812 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions { public StorageResourceWriteToOffsetOptions() { } public string BlockId { get { throw null; } set { } } + public bool Initial { get { throw null; } set { } } public long? Position { get { throw null; } set { } } public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs index 6df462c6c90b..da3c954f7812 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net8.0.cs @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions { public StorageResourceWriteToOffsetOptions() { } public string BlockId { get { throw null; } set { } } + public bool Initial { get { throw null; } set { } } public long? Position { get { throw null; } set { } } public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs index 6df462c6c90b..da3c954f7812 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs @@ -220,6 +220,7 @@ public partial class StorageResourceWriteToOffsetOptions { public StorageResourceWriteToOffsetOptions() { } public string BlockId { get { throw null; } set { } } + public bool Initial { get { throw null; } set { } } public long? Position { get { throw null; } set { } } public Azure.Storage.DataMovement.StorageResourceItemProperties SourceProperties { get { throw null; } set { } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 7e7a7164ff15..0f59fea48999 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -2,33 +2,24 @@ // Licensed under the MIT License. using System; -using System.Collections.Concurrent; -using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using Azure.Core; -using Azure.Core.Pipeline; using Azure.Storage.Common; namespace Azure.Storage.DataMovement { internal class DownloadChunkHandler : IDisposable { - // Indicates whether the current thread is processing stage chunks. - private static Task _processDownloadRangeEvents; - #region Delegate Definitions - public delegate Task CopyToDestinationFileInternal(long offset, long length, Stream stream, long expectedLength); - public delegate Task CopyToChunkFileInternal(string chunkFilePath, Stream stream); + public delegate Task CopyToDestinationFileInternal(long offset, long length, Stream stream, long expectedLength, bool initial); public delegate void ReportProgressInBytes(long bytesWritten); public delegate Task QueueCompleteFileDownloadInternal(); public delegate Task InvokeFailedEventHandlerInternal(Exception ex); #endregion Delegate Definitions private readonly CopyToDestinationFileInternal _copyToDestinationFile; - private readonly CopyToChunkFileInternal _copyToChunkFile; private readonly ReportProgressInBytes _reportProgressInBytes; private readonly InvokeFailedEventHandlerInternal _invokeFailedEventHandler; private readonly QueueCompleteFileDownloadInternal _queueCompleteFileDownload; @@ -36,46 +27,22 @@ internal class DownloadChunkHandler : IDisposable public struct Behaviors { public CopyToDestinationFileInternal CopyToDestinationFile { get; set; } - - public CopyToChunkFileInternal CopyToChunkFile { get; set; } public ReportProgressInBytes ReportProgressInBytes { get; set; } - public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; } - public QueueCompleteFileDownloadInternal QueueCompleteFileDownload { get; set; } } - private event SyncAsyncEventHandler _downloadChunkEventHandler; - internal SyncAsyncEventHandler GetDownloadChunkHandler() => _downloadChunkEventHandler; - /// - /// Create channel of to keep track of that are - /// waiting to update the bytesTransferred and other required operations. + /// Create channel of to keep track to handle + /// writing downloaded chunks to the destination as well as tracking overall progress. /// - private readonly Channel _downloadRangeChannel; - private CancellationToken _cancellationToken; + private readonly Channel _downloadRangeChannel; + private readonly Task _processDownloadRangeEvents; + private readonly CancellationToken _cancellationToken; private long _bytesTransferred; private readonly long _expectedLength; - /// - /// List that holds all ranges of chunks to process. - /// - private readonly IList _ranges; - private int _rangesCount; - /// - /// Holds which range we are currently waiting on to download. - /// - private int _currentRangeIndex; - - /// - /// If any download chunks come in early before the chunk before it - /// to copy to the file, let's hold it in order here before we copy it over. - /// - private ConcurrentDictionary _rangesCompleted; - - internal ClientDiagnostics ClientDiagnostics { get; } - /// /// The controller for downloading the chunks to each file. /// @@ -85,15 +52,9 @@ public struct Behaviors /// /// The expected length of the content to be downloaded in bytes. /// - /// - /// List that holds the expected ranges the chunk ranges will come back as. - /// /// /// Contains all the supported function calls. /// - /// - /// ClientDiagnostics for handler logging. - /// /// /// Cancellation token of the job part or job to cancel any ongoing waiting in the /// download chunk handler to prevent infinite waiting. @@ -102,89 +63,51 @@ public struct Behaviors public DownloadChunkHandler( long currentTransferred, long expectedLength, - IList ranges, Behaviors behaviors, - ClientDiagnostics clientDiagnostics, CancellationToken cancellationToken) { // Set bytes transferred to the length of bytes we got back from the initial // download request _bytesTransferred = currentTransferred; - _currentRangeIndex = 0; - // Create channel of finished Stage Chunk Args to update the bytesTransferred - // and for ending tasks like commit block. // The size of the channel should never exceed 50k (limit on blocks in a block blob). // and that's in the worst case that we never read from the channel and had a maximum chunk blob. - _downloadRangeChannel = Channel.CreateUnbounded( + _downloadRangeChannel = Channel.CreateUnbounded( new UnboundedChannelOptions() { - // Single reader is required as we can only read and write to bytesTransferred value + // Single reader is required as we can only have one writer to the destination. SingleReader = true, }); - _processDownloadRangeEvents = Task.Run(() => NotifyOfPendingChunkDownloadEvents()); + _processDownloadRangeEvents = Task.Run(NotifyOfPendingChunkDownloadEvents); _cancellationToken = cancellationToken; _expectedLength = expectedLength; - _ranges = ranges; if (expectedLength <= 0) { throw Errors.InvalidExpectedLength(expectedLength); } - Argument.AssertNotNullOrEmpty(ranges, nameof(ranges)); Argument.AssertNotNull(behaviors, nameof(behaviors)); - Argument.AssertNotNull(clientDiagnostics, nameof(clientDiagnostics)); // Set values _copyToDestinationFile = behaviors.CopyToDestinationFile ?? throw Errors.ArgumentNull(nameof(behaviors.CopyToDestinationFile)); - _copyToChunkFile = behaviors.CopyToChunkFile - ?? throw Errors.ArgumentNull(nameof(behaviors.CopyToChunkFile)); _reportProgressInBytes = behaviors.ReportProgressInBytes ?? throw Errors.ArgumentNull(nameof(behaviors.ReportProgressInBytes)); _invokeFailedEventHandler = behaviors.InvokeFailedHandler ?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler)); _queueCompleteFileDownload = behaviors.QueueCompleteFileDownload ?? throw Errors.ArgumentNull(nameof(behaviors.QueueCompleteFileDownload)); - - _rangesCount = ranges.Count; - // Set size of the list of null streams - _rangesCompleted = new ConcurrentDictionary(); - - _downloadChunkEventHandler += DownloadChunkEvent; - ClientDiagnostics = clientDiagnostics; } public void Dispose() { _downloadRangeChannel.Writer.TryComplete(); - DisposeHandlers(); - } - - private void DisposeHandlers() - { - _downloadChunkEventHandler -= DownloadChunkEvent; } - private async Task DownloadChunkEvent(DownloadRangeEventArgs args) + public void QueueChunk(QueueDownloadChunkArgs args) { - try - { - if (args.Success) - { - _downloadRangeChannel.Writer.TryWrite(args); - } - else - { - // Report back failed event. - throw args.Exception; - } - } - catch (Exception ex) - { - await InvokeFailedEvent(ex).ConfigureAwait(false); - } + _downloadRangeChannel.Writer.TryWrite(args); } private async Task NotifyOfPendingChunkDownloadEvents() @@ -194,154 +117,41 @@ private async Task NotifyOfPendingChunkDownloadEvents() while (await _downloadRangeChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false)) { // Read one event argument at a time. - DownloadRangeEventArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); - long currentRangeOffset = _ranges[_currentRangeIndex].Offset; - if (currentRangeOffset < args.Offset) - { - // One of the chunks finished downloading before the chunk(s) - // before it (early bird, or the last chunk) - // Save the chunk to a temporary file to append later - string chunkFilePath = Path.GetTempFileName(); - using (Stream chunkContent = args.Result) - { - await _copyToChunkFile(chunkFilePath, chunkContent).ConfigureAwait(false); - } - if (!_rangesCompleted.TryAdd(args.Offset, chunkFilePath)) - { - // Throw an error here that we were unable to idenity the - // the range that has come back to us. We should never see this error - // since we were the ones who calculated the range. - throw Errors.InvalidDownloadOffset(args.Offset, args.BytesTransferred); - } - } - else if (currentRangeOffset == args.Offset) - { - // Start Copying the response to the file stream and any other chunks after - // Most of the time we will always get the next chunk first so the loop - // on averages runs once. - using (Stream content = args.Result) - { - await _copyToDestinationFile( - args.Offset, - args.BytesTransferred, - content, - _expectedLength).ConfigureAwait(false); - } - UpdateBytesAndRange(args.BytesTransferred); + QueueDownloadChunkArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); - await AppendEarlyChunksToFile().ConfigureAwait(false); - - // Check if we finished downloading the blob - if (_bytesTransferred == _expectedLength) - { - await _queueCompleteFileDownload().ConfigureAwait(false); - } - } - else + // Copy the current chunk to the destination + using (Stream content = args.Content) { - // We should never reach this point because that means - // the range that came back was less than the next range that is supposed - // to be copied to the file - throw Errors.InvalidDownloadOffset(args.Offset, args.BytesTransferred); + await _copyToDestinationFile( + args.Offset, + args.Length, + content, + _expectedLength, + initial: _bytesTransferred == 0).ConfigureAwait(false); } - } - } - catch (Exception ex) - { - await InvokeFailedEvent(ex).ConfigureAwait(false); - } - } - - public async Task InvokeEvent(DownloadRangeEventArgs args) - { - // There's a race condition where the event handler was disposed and an event - // was already invoked, we should skip over this as the download chunk handler - // was already disposed, and we should just ignore any more incoming events. - if (_downloadChunkEventHandler != null) - { - await _downloadChunkEventHandler.RaiseAsync( - args, - nameof(DownloadChunkHandler), - nameof(_downloadChunkEventHandler), - ClientDiagnostics) - .ConfigureAwait(false); - } - } + UpdateBytesAndRange(args.Length); - private async Task AppendEarlyChunksToFile() - { - // If there are any other chunks that have already been downloaded that - // can be appended to the file, let's do it now. - while ((_bytesTransferred < _expectedLength) && - (_currentRangeIndex < _rangesCount) && - _rangesCompleted.ContainsKey(_ranges[_currentRangeIndex].Offset)) - { - HttpRange currentRange = _ranges[_currentRangeIndex]; - if (_rangesCompleted.TryRemove(currentRange.Offset, out string chunkFilePath)) - { - if (File.Exists(chunkFilePath)) - { - using (Stream content = File.OpenRead(chunkFilePath)) - { - await _copyToDestinationFile( - currentRange.Offset, - currentRange.Length.Value, - content, - _expectedLength).ConfigureAwait(false); - } - // Delete the temporary chunk file that's no longer needed - File.Delete(chunkFilePath); - } - else + // Check if we finished downloading the blob + if (_bytesTransferred == _expectedLength) { - throw Errors.TempChunkFileNotFound( - offset: currentRange.Offset, - length: currentRange.Length.Value, - filePath: chunkFilePath); + await _queueCompleteFileDownload().ConfigureAwait(false); } } - else - { - throw Errors.InvalidDownloadOffset(currentRange.Offset, currentRange.Length.Value); - } - - // Increment the current range we are expect, if it's null then - // that's the next one we have to wait on. - UpdateBytesAndRange((long)_ranges[_currentRangeIndex].Length); } - } - - private async Task InvokeFailedEvent(Exception ex) - { - foreach (HttpRange range in _ranges) + catch (Exception ex) { - if (_rangesCompleted.TryRemove(range.Offset, out string tempChunkFile)) - { - if (File.Exists(tempChunkFile)) - { - try - { - File.Delete(tempChunkFile); - } - catch (Exception deleteException) - { - await _invokeFailedEventHandler(deleteException).ConfigureAwait(false); - } - } - } + // This will trigger the job part to call Dispose on this object + await _invokeFailedEventHandler(ex).ConfigureAwait(false); } - await _invokeFailedEventHandler(ex).ConfigureAwait(false); } /// - /// Update the progress handler and the current range we are waiting on. + /// Moves the downloader to the next range and updates/reports bytes transferred. /// /// private void UpdateBytesAndRange(long bytesDownloaded) { - // don't need to use Interlocked since this is the only thread reading and updating these values _bytesTransferred += bytesDownloaded; - _currentRangeIndex++; _reportProgressInBytes(bytesDownloaded); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs deleted file mode 100644 index 2a1ecddce9dd..000000000000 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadRangeEventArgs.cs +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System; -using System.IO; -using System.Threading; -using Azure.Core; -using Azure.Storage.Common; - -namespace Azure.Storage.DataMovement -{ - internal class DownloadRangeEventArgs : DataTransferEventArgs - { - public bool Success { get; } - - public long Offset { get; } - - /// - /// Will be 0 if Success is false - /// - public long BytesTransferred { get; } - - /// - /// Stream results of the range downloaded if Sucess is true - /// - public Stream Result { get; } - - /// - /// If is false, this value will be populated - /// with the exception that was thrown. - /// - public Exception Exception { get; } - - /// - /// Constructor - /// - /// - /// Id of the transfer - /// - /// - /// Whether or not the download range call was successful - /// - /// - /// - /// - /// - /// - /// - public DownloadRangeEventArgs( - string transferId, - bool success, - long offset, - long bytesTransferred, - Stream result, - Exception exception, - bool isRunningSynchronously, - CancellationToken cancellationToken) : - base(transferId, isRunningSynchronously, cancellationToken) - { - if (success && exception != null) - { - Argument.AssertNull(exception, nameof(exception)); - } - else if (!success && exception == null) - { - Argument.AssertNotNull(exception, nameof(exception)); - } - Success = success; - Offset = offset; - BytesTransferred = bytesTransferred; - Result = result; - Exception = exception; - } - } -} diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs index 1d13dfeed123..68e77fb501b9 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs @@ -126,7 +126,7 @@ protected internal override async Task CopyFromStreamAsync( CancellationHelper.ThrowIfCancellationRequested(cancellationToken); long position = options?.Position != default ? options.Position.Value : 0; - if (position == 0) + if (options?.Initial == true) { Create(overwrite); } @@ -134,9 +134,9 @@ protected internal override async Task CopyFromStreamAsync( { // Appends incoming stream to the local file resource using (FileStream fileStream = new FileStream( - _uri.LocalPath, - FileMode.OpenOrCreate, - FileAccess.Write)) + _uri.LocalPath, + FileMode.Open, + FileAccess.Write)) { if (position > 0) { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs b/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs new file mode 100644 index 000000000000..b770af22af07 --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/src/QueueDownloadChunkArgs.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.IO; + +namespace Azure.Storage.DataMovement +{ + internal class QueueDownloadChunkArgs + { + public long Offset { get; } + public long Length { get; } + public Stream Content { get; } + + public QueueDownloadChunkArgs( + long offset, + long length, + Stream content) + { + Offset = offset; + Length = length; + Content = content; + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceWriteToOffsetOptions.cs b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceWriteToOffsetOptions.cs index a041f20a07f6..9d5bd235726d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceWriteToOffsetOptions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceWriteToOffsetOptions.cs @@ -24,6 +24,11 @@ public class StorageResourceWriteToOffsetOptions /// public long? Position { get; set; } + /// + /// Optional. Specifies whether this write is for the initial chunk. + /// + public bool Initial { get; set; } + /// /// Optional. Specifies the source properties to set in the destination. /// diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index ecaed741f19a..d159d85781c3 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -218,9 +218,9 @@ public override async Task ProcessPartToChunkAsync() internal async Task UnknownDownloadInternal() { Task initialTask = _sourceResource.ReadStreamAsync( - position: 0, - length: _initialTransferSize, - _cancellationToken); + position: 0, + length: _initialTransferSize, + _cancellationToken); try { @@ -254,7 +254,8 @@ internal async Task UnknownDownloadInternal() offset: 0, sourceLength: initialLength.Value, source: initialResult.Content, - expectedLength: totalLength).ConfigureAwait(false); + expectedLength: totalLength, + initial: true).ConfigureAwait(false); if (successfulInitialCopy) { ReportBytesWritten(initialLength.Value); @@ -306,7 +307,8 @@ internal async Task LengthKnownDownloadInternal() offset: 0, sourceLength: downloadLength, source: result.Content, - expectedLength: totalLength).ConfigureAwait(false); + expectedLength: totalLength, + initial: true).ConfigureAwait(false); if (successfulCopy) { ReportBytesWritten(downloadLength); @@ -324,20 +326,17 @@ internal async Task LengthKnownDownloadInternal() #region PartitionedDownloader private async Task QueueChunksToChannel(long initialLength, long totalLength) { - // Get list of ranges of the blob - IList ranges = GetRangesList(initialLength, totalLength, _transferChunkSize); - // Create Download Chunk event handler to manage when the ranges finish downloading - _downloadChunkHandler = GetDownloadChunkHandler( + _downloadChunkHandler = new DownloadChunkHandler( currentTransferred: initialLength, expectedLength: totalLength, - ranges: ranges, - jobPart: this); + GetDownloadChunkHandlerBehaviors(this), + _cancellationToken); // Fill the queue with tasks to download each of the remaining // ranges in the file _queueingTasks = true; - foreach (HttpRange httpRange in ranges) + foreach (HttpRange httpRange in GetRanges(initialLength, totalLength, _transferChunkSize)) { if (_cancellationToken.IsCancellationRequested) { @@ -394,40 +393,26 @@ internal async Task DownloadStreamingInternal(HttpRange range) (long)range.Length, _cancellationToken).ConfigureAwait(false); - // The chunk handler may have been disposed in failure case - if (_downloadChunkHandler != null) + // Stream the data from the network before queueing disk IO. + MemoryStream content = new((int)result.ContentLength.Value); + using (Stream dataStream = result.Content) { - await _downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: _dataTransfer.Id, - success: true, - offset: range.Offset, - bytesTransferred: (long)range.Length, - result: result.Content, - exception: default, - false, - _cancellationToken)).ConfigureAwait(false); + await dataStream.CopyToAsync( + content, + DataMovementConstants.DefaultStreamCopyBufferSize, + _cancellationToken).ConfigureAwait(false); } + content.Position = 0; + + // The chunk handler may have been disposed in failure case + _downloadChunkHandler?.QueueChunk(new QueueDownloadChunkArgs( + offset: range.Offset, + length: (long)range.Length, + content: content)); } catch (Exception ex) { - if (_downloadChunkHandler != null) - { - await _downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: _dataTransfer.Id, - success: false, - offset: range.Offset, - bytesTransferred: (long)range.Length, - result: default, - exception: ex, - false, - _cancellationToken)).ConfigureAwait(false); - } - else - { - // If the _downloadChunkHandler has been disposed before we call to it - // we should at least filter the exception to error handling just in case. - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - } + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -435,7 +420,8 @@ public async Task CopyToStreamInternal( long offset, long sourceLength, Stream source, - long expectedLength) + long expectedLength, + bool initial) { CancellationHelper.ThrowIfCancellationRequested(_cancellationToken); @@ -450,6 +436,7 @@ await _destinationResource.CopyFromStreamAsync( options: new StorageResourceWriteToOffsetOptions() { Position = offset, + Initial = initial, }, cancellationToken: _cancellationToken).ConfigureAwait(false); return true; @@ -464,42 +451,14 @@ await _destinationResource.CopyFromStreamAsync( return false; } - public async Task WriteChunkToTempFile(string chunkFilePath, Stream source) - { - CancellationHelper.ThrowIfCancellationRequested(_cancellationToken); - - using (FileStream fileStream = File.OpenWrite(chunkFilePath)) - { - await source.CopyToAsync( - fileStream, - DataMovementConstants.DefaultStreamCopyBufferSize, - _cancellationToken) - .ConfigureAwait(false); - } - } - - internal DownloadChunkHandler GetDownloadChunkHandler( - long currentTransferred, - long expectedLength, - IList ranges, - UriToStreamJobPart jobPart) - => new DownloadChunkHandler( - currentTransferred, - expectedLength, - ranges, - GetDownloadChunkHandlerBehaviors(jobPart), - ClientDiagnostics, - _cancellationToken); - - internal static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(UriToStreamJobPart job) + private static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors(UriToStreamJobPart jobPart) { return new DownloadChunkHandler.Behaviors() { - CopyToDestinationFile = job.CopyToStreamInternal, - CopyToChunkFile = job.WriteChunkToTempFile, - ReportProgressInBytes = job.ReportBytesWritten, - InvokeFailedHandler = job.InvokeFailedArgAsync, - QueueCompleteFileDownload = job.QueueCompleteFileDownload + CopyToDestinationFile = jobPart.CopyToStreamInternal, + ReportProgressInBytes = jobPart.ReportBytesWritten, + InvokeFailedHandler = jobPart.InvokeFailedArgAsync, + QueueCompleteFileDownload = jobPart.QueueCompleteFileDownload }; } @@ -508,14 +467,12 @@ private Task QueueCompleteFileDownload() return QueueChunkToChannelAsync(CompleteFileDownload); } - private static IList GetRangesList(long initialLength, long totalLength, long rangeSize) + private static IEnumerable GetRanges(long initialLength, long totalLength, long rangeSize) { - IList list = new List(); for (long offset = initialLength; offset < totalLength; offset += rangeSize) { - list.Add(new HttpRange(offset, Math.Min(totalLength - offset, rangeSize))); + yield return new HttpRange(offset, Math.Min(totalLength - offset, rangeSize)); } - return list; } #endregion PartitionedDownloader @@ -547,7 +504,8 @@ private async Task CreateZeroLengthDownload() offset: 0, sourceLength: 0, source: default, - expectedLength: 0).ConfigureAwait(false); + expectedLength: 0, + initial: true).ConfigureAwait(false); if (successfulCreation) { // Queue the work to end the download diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs index 4675e14cb2cf..43e79dbbe6c0 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs @@ -10,8 +10,6 @@ using System.Threading; using Azure.Core; using Azure.Storage.Tests.Shared; -using Azure.Core.Pipeline; -using Azure.Storage.Common; namespace Azure.Storage.DataMovement.Tests { @@ -23,17 +21,13 @@ public DownloadChunkHandlerTests() { } private readonly int _maxDelayInSec = 1; private readonly string _failedEventMsg = "Amount of Failed Event Handler calls was incorrect."; private readonly string _copyToDestinationMsg = "Amount of Copy To Destination Task calls were incorrect."; - private readonly string _copyToChunkFileMsg = "Amount of Copy To Chunk File Task calls were incorrect."; private readonly string _reportProgressInBytesMsg = "Amount of Progress amount calls were incorrect."; private readonly string _completeFileDownloadMsg = "Complete File Download call amount calls were incorrect."; - private ClientDiagnostics ClientDiagnostics => new(ClientOptions.Default); - private void VerifyDelegateInvocations( MockDownloadChunkBehaviors behaviors, int expectedFailureCount, int expectedCopyDestinationCount, - int expectedCopyChunkCount, int expectedReportProgressCount, int expectedCompleteFileCount, int maxWaitTimeInSec = 6) @@ -42,14 +36,12 @@ private void VerifyDelegateInvocations( CancellationToken cancellationToken = cancellationSource.Token; int currentFailedEventCount = behaviors.InvokeFailedEventHandlerTask.Invocations.Count; int currentCopyDestinationCount = behaviors.CopyToDestinationFileTask.Invocations.Count; - int currentCopyChunkCount = behaviors.CopyToChunkFileTask.Invocations.Count; int currentProgressReportedCount = behaviors.ReportProgressInBytesTask.Invocations.Count; int currentCompleteDownloadCount = behaviors.QueueCompleteFileDownloadTask.Invocations.Count; try { while (currentFailedEventCount != expectedFailureCount || currentCopyDestinationCount != expectedCopyDestinationCount - || currentCopyChunkCount != expectedCopyChunkCount || currentProgressReportedCount != expectedReportProgressCount || currentCompleteDownloadCount != expectedCompleteFileCount) { @@ -63,8 +55,6 @@ private void VerifyDelegateInvocations( Assert.LessOrEqual(currentFailedEventCount, expectedFailureCount, _failedEventMsg); currentCopyDestinationCount = behaviors.CopyToDestinationFileTask.Invocations.Count; Assert.LessOrEqual(currentCopyDestinationCount, expectedCopyDestinationCount, _copyToDestinationMsg); - currentCopyChunkCount = behaviors.CopyToChunkFileTask.Invocations.Count; - Assert.LessOrEqual(currentCopyChunkCount, expectedCopyChunkCount, _copyToChunkFileMsg); currentProgressReportedCount = behaviors.ReportProgressInBytesTask.Invocations.Count; Assert.LessOrEqual(currentProgressReportedCount, expectedReportProgressCount, _reportProgressInBytesMsg); currentCompleteDownloadCount = behaviors.QueueCompleteFileDownloadTask.Invocations.Count; @@ -76,41 +66,39 @@ private void VerifyDelegateInvocations( string message = "Timed out waiting for the correct amount of invocations for each task\n" + $"Current Failed Event Invocations: {currentFailedEventCount} | Expected: {expectedFailureCount}\n" + $"Current Copy Destination Invocations: {currentCopyDestinationCount} | Expected: {expectedCopyDestinationCount}\n" + - $"Current Copy Chunk Invocations: {currentCopyChunkCount} | Expected: {expectedCopyChunkCount}\n" + $"Current Progress Reported Invocations: {currentProgressReportedCount} | Expected: {expectedReportProgressCount}\n" + $"Current Complete Download Invocations: {currentCompleteDownloadCount} | Expected: {expectedCompleteFileCount}"; Assert.Fail(message); } + + // Assert the first call to copy to the destination always specifies initial and the rest don't + int count = 0; + foreach (IInvocation invocation in behaviors.CopyToDestinationFileTask.Invocations) + { + if (count == 0) + { + Assert.That((bool)invocation.Arguments[4], Is.True); + } + else + { + Assert.That((bool)invocation.Arguments[4], Is.False); + } + count++; + } } private Mock GetCopyToDestinationFileTask() { var mock = new Mock(MockBehavior.Strict); - mock.Setup(del => del(It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsNotNull())) - .Returns(Task.CompletedTask); - return mock; - } - - private Mock GetCopyToChunkFileTask() - { - var mock = new Mock(MockBehavior.Strict); - mock.Setup(del => del(It.IsNotNull(),It.IsNotNull())) + mock.Setup(del => del(It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsAny())) .Returns(Task.CompletedTask); return mock; } - private Mock GetExceptionCopyToChunkFileTask() - { - var mock = new Mock(MockBehavior.Strict); - mock.Setup(del => del(It.IsNotNull(), It.IsNotNull())) - .Throws(new UnauthorizedAccessException()); - return mock; - } - private Mock GetExceptionCopyToDestinationFileTask() { var mock = new Mock(MockBehavior.Strict); - mock.Setup(del => del(It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsNotNull())) + mock.Setup(del => del(It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsNotNull(), It.IsAny())) .Throws(new UnauthorizedAccessException()); return mock; } @@ -149,7 +137,6 @@ private void VerifyDelegateInvocations( internal struct MockDownloadChunkBehaviors { public Mock CopyToDestinationFileTask; - public Mock CopyToChunkFileTask; public Mock ReportProgressInBytesTask; public Mock QueueCompleteFileDownloadTask; public Mock InvokeFailedEventHandlerTask; @@ -159,89 +146,45 @@ private MockDownloadChunkBehaviors GetMockDownloadChunkBehaviors() => new MockDownloadChunkBehaviors() { CopyToDestinationFileTask = GetCopyToDestinationFileTask(), - CopyToChunkFileTask = GetCopyToChunkFileTask(), ReportProgressInBytesTask = GetReportProgressInBytesTask(), QueueCompleteFileDownloadTask = GetQueueCompleteFileDownloadTask(), InvokeFailedEventHandlerTask = GetInvokeFailedEventHandlerTask() }; - /// - /// Creates ranges that the download chunk handler is expecting. - /// - /// - /// The block size which the size of the range will equal. - /// This value must be less or equal to the expected length. - /// - /// - /// Expected full length of the download to create ranges of. - /// - /// - private List GetRanges(long blockSize, long expectedLength) - { - Argument.AssertNotDefault(ref blockSize, name: nameof(blockSize)); - Argument.AssertNotDefault(ref expectedLength, name: nameof(expectedLength)); - if (expectedLength < blockSize) - { - Argument.AssertInRange(blockSize, expectedLength, default, nameof(blockSize)); - } - List ranges = new List(); - - for (long offset = 0; offset < expectedLength; offset += blockSize) - { - ranges.Add(new HttpRange(offset, Math.Min(expectedLength - offset, blockSize))); - } - - return ranges; - } - [Test] [TestCase(512)] [TestCase(Constants.KB)] [TestCase(Constants.MB)] [TestCase(4 * Constants.MB)] - public async Task OneChunkTransfer(long blockSize) + public void OneChunkTransfer(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); - - List ranges = new List() - { - new HttpRange(0, blockSize) - }; using var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: blockSize, - ranges: ranges, new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Act - Make one chunk that would meet the expected length - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + content: content)); // Assert VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 1, expectedCompleteFileCount: 1); } @@ -249,68 +192,52 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( [Test] [TestCase(512)] [TestCase(Constants.KB)] - public async Task MultipleChunkTransfer(long blockSize) + public void MultipleChunkTransfer(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); using var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: expectedLength, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); - // Act - Make one chunk that would update the bytes but not cause a commit block list to occur - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + // Act - First chunk + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + content: content)); // Assert VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 1, expectedCompleteFileCount: 0); PredictableStream content2 = new PredictableStream(blockSize); - // Act - Now add the last block to meet the required commited block amount. - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + // Act - Second/final chunk + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: blockSize, - bytesTransferred: blockSize, - result: content2, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + content: content2)); // Assert VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 2, - expectedCopyChunkCount: 0, expectedReportProgressCount: 2, expectedCompleteFileCount: 1); } @@ -318,124 +245,51 @@ await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( [Test] [TestCase(512)] [TestCase(Constants.KB)] - public async Task MultipleChunkTransfer_UnexpectedOffsetError(long blockSize) + public void MultipleChunkTransfer_EarlyChunks(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); using var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: expectedLength, - ranges: ranges, - behaviors: new DownloadChunkHandler.Behaviors { - CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, - QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, - ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, - InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object }, - ClientDiagnostics, - cancellationToken: CancellationToken.None); - - PredictableStream content = new PredictableStream(blockSize); - - // Make initial range event - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, - offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); - - // Act - Make the repeat at the same offset to cause an error. - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, - offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); - - // Assert - VerifyDelegateInvocations( - behaviors: mockBehaviors, - expectedFailureCount: 1, - expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, - expectedReportProgressCount: 1, - expectedCompleteFileCount: 0); - } - - [Test] - [TestCase(512)] - [TestCase(Constants.KB)] - public async Task MultipleChunkTransfer_EarlyChunks(long blockSize) - { - // Arrange - Set up tasks - MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); - long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); - - using var downloadChunkHandler = new DownloadChunkHandler( - currentTransferred: 0, - expectedLength: expectedLength, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); - // Act - Make initial range event - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + // Act - The second chunk returns first + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: blockSize, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + content: content)); // Assert VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 0, - expectedCopyDestinationCount: 0, - expectedCopyChunkCount: 1, - expectedReportProgressCount: 0, + expectedCopyDestinationCount: 1, + expectedReportProgressCount: 1, expectedCompleteFileCount: 0); - // Act - Make the repeat at the same offset to cause an error. - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + // Act - The first chunk is then returned + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + content: content)); // Assert VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 2, - expectedCopyChunkCount: 1, expectedReportProgressCount: 2, expectedCompleteFileCount: 1); } @@ -450,20 +304,17 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); long expectedLength = blockSize * taskSize; - List ranges = GetRanges(blockSize, expectedLength); + using var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: expectedLength, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); List runningTasks = new List(); @@ -473,15 +324,11 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta { PredictableStream content = new PredictableStream(blockSize); - Task task = downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, - offset: i * blockSize, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + long offset = i * blockSize; + Task task = Task.Run(() => downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + offset: offset, + length: blockSize, + content: content))); runningTasks.Add(task); } @@ -494,189 +341,113 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: taskSize, - expectedCopyChunkCount: 0, expectedReportProgressCount: taskSize, expectedCompleteFileCount: 1); } [Test] - public async Task GetCopyToChunkFileTask_ExpectedFailure() - { - // Arrange - MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); - mockBehaviors.CopyToChunkFileTask = GetExceptionCopyToChunkFileTask(); - int blockSize = 512; - long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); - - var downloadChunkHandler = new DownloadChunkHandler( - currentTransferred: 0, - expectedLength: expectedLength, - ranges: ranges, - behaviors: new DownloadChunkHandler.Behaviors - { - CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, - QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, - ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, - InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, - }, - ClientDiagnostics, - cancellationToken: CancellationToken.None); - - PredictableStream content = new PredictableStream(blockSize); - - // Act - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, - offset: blockSize, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); - - // Assert - VerifyDelegateInvocations( - behaviors: mockBehaviors, - expectedFailureCount: 1, - expectedCopyDestinationCount: 0, - expectedCopyChunkCount: 1, - expectedReportProgressCount: 0, - expectedCompleteFileCount: 0); - } - - [Test] - public async Task GetCopyToDestinationFileTask_ExpectedFailure() + public void GetCopyToDestinationFileTask_ExpectedFailure() { // Arrange MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); mockBehaviors.CopyToDestinationFileTask = GetExceptionCopyToDestinationFileTask(); int blockSize = 512; long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: expectedLength, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Act - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + content: content)); // Assert VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 1, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 0, expectedCompleteFileCount: 0); } [Test] - public async Task QueueCompleteFileDownloadTask_ExpectedFailure() + public void QueueCompleteFileDownloadTask_ExpectedFailure() { // Arrange MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); mockBehaviors.QueueCompleteFileDownloadTask = GetExceptionQueueCompleteFileDownloadTask(); int blockSize = 512; - List ranges = GetRanges(blockSize, blockSize); var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: blockSize, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); PredictableStream content = new PredictableStream(blockSize); // Act - await downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( - transferId: "fake-id", - success: true, + downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( offset: 0, - bytesTransferred: blockSize, - result: content, - exception: default, - isRunningSynchronously: false, - cancellationToken: CancellationToken.None)); + length: blockSize, + content: content)); // Assert VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 1, expectedCopyDestinationCount: 1, - expectedCopyChunkCount: 0, expectedReportProgressCount: 1, expectedCompleteFileCount: 1); } [Test] - public async Task DisposedEventHandler() + public void DisposedEventHandler() { // Arrange - Create DownloadChunkHandler then Dispose it so the event handler is disposed MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); int blockSize = 512; long expectedLength = blockSize * 2; - List ranges = GetRanges(blockSize, expectedLength); var downloadChunkHandler = new DownloadChunkHandler( currentTransferred: 0, expectedLength: blockSize, - ranges: ranges, behaviors: new DownloadChunkHandler.Behaviors { CopyToDestinationFile = mockBehaviors.CopyToDestinationFileTask.Object, - CopyToChunkFile = mockBehaviors.CopyToChunkFileTask.Object, QueueCompleteFileDownload = mockBehaviors.QueueCompleteFileDownloadTask.Object, ReportProgressInBytes = mockBehaviors.ReportProgressInBytesTask.Object, InvokeFailedHandler = mockBehaviors.InvokeFailedEventHandlerTask.Object, }, - ClientDiagnostics, cancellationToken: CancellationToken.None); // Act downloadChunkHandler.Dispose(); - - // Assert - Do not throw when trying to invoke the event handler when disposed - await downloadChunkHandler.InvokeEvent(default); + downloadChunkHandler.QueueChunk(default); VerifyDelegateInvocations( behaviors: mockBehaviors, expectedFailureCount: 0, expectedCopyDestinationCount: 0, - expectedCopyChunkCount: 0, expectedReportProgressCount: 0, expectedCompleteFileCount: 0); } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/LocalFileStorageResourceTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/LocalFileStorageResourceTests.cs index 9460d7fa207c..2a87e2c9497b 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/LocalFileStorageResourceTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/LocalFileStorageResourceTests.cs @@ -170,7 +170,8 @@ await storageResource.CopyFromStreamAsync( stream, streamLength: length, false, - completeLength: length); + completeLength: length, + options: new StorageResourceWriteToOffsetOptions() { Initial = true }); } // Assert @@ -200,7 +201,7 @@ await storageResource.CopyFromStreamAsync( streamLength: length, overwrite: false, completeLength: length, - options: new StorageResourceWriteToOffsetOptions() { Position = writePosition }); + options: new StorageResourceWriteToOffsetOptions() { Position = writePosition, Initial = false }); } // Assert @@ -219,7 +220,7 @@ public async Task WriteStreamAsync_Error() string path = await CreateRandomFileAsync(test.DirectoryPath, size: length); LocalFileStorageResource storageResource = new LocalFileStorageResource(path); var data = GetRandomBuffer(length); - try + Assert.ThrowsAsync(async () => { using (var stream = new MemoryStream(data)) { @@ -227,13 +228,11 @@ await storageResource.CopyFromStreamAsync( stream: stream, streamLength: length, overwrite: false, - completeLength: length); + completeLength: length, + options: new StorageResourceWriteToOffsetOptions() { Initial = true }); } - } - catch (IOException ex) - { - Assert.AreEqual(ex.Message, $"File path `{path}` already exists. Cannot overwrite file."); - } + }, + $"File path `{path}` already exists. Cannot overwrite file."); } [Test]