diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index 72fde2348d2e..637d5baf5b24 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -225,7 +225,7 @@ await QueueChunk( } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } Interlocked.Increment(ref _completedChunkCount); await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false); @@ -234,9 +234,9 @@ await QueueChunk( } /// - /// Processes the job to job parts + /// Processes the job part to chunks /// - /// An IEnumerable that contains the job chunks + /// The task that's queueing up the chunks public abstract Task ProcessPartToChunkAsync(); /// @@ -285,11 +285,11 @@ internal async Task OnTransferStateChangedAsync(DataTransferState transferState) else if (JobPartStatus.HasCompletedSuccessfully) { _progressTracker.IncrementCompletedFiles(); - await InvokeSingleCompletedArg().ConfigureAwait(false); + await InvokeSingleCompletedArgAsync().ConfigureAwait(false); } // Set the status in the checkpointer - await SetCheckpointerStatus().ConfigureAwait(false); + await SetCheckpointerStatusAsync().ConfigureAwait(false); await PartTransferStatusEventHandler.RaiseAsync( new TransferStatusEventArgs( @@ -313,7 +313,7 @@ internal void ReportBytesWritten(long bytesTransferred) _progressTracker.IncrementBytesTransferred(bytesTransferred); } - public async virtual Task InvokeSingleCompletedArg() + public async virtual Task InvokeSingleCompletedArgAsync() { if (SingleTransferCompletedEventHandler != null) { @@ -334,7 +334,7 @@ await SingleTransferCompletedEventHandler.RaiseAsync( /// /// Invokes Skipped Argument Event. /// - public async virtual Task InvokeSkippedArg() + public async virtual Task InvokeSkippedArgAsync() { if (TransferSkippedEventHandler != null) { @@ -375,7 +375,7 @@ await PartTransferStatusEventHandler.RaiseAsync( /// /// Invokes Failed Argument Event. /// - public async virtual Task InvokeFailedArg(Exception ex) + public async virtual Task InvokeFailedArgAsync(Exception ex) { if (ex is not OperationCanceledException && ex is not TaskCanceledException && @@ -485,7 +485,7 @@ await _checkpointer.AddNewJobPartAsync( } } - internal async virtual Task SetCheckpointerStatus() + internal async virtual Task SetCheckpointerStatusAsync() { await _checkpointer.SetJobPartStatusAsync( transferId: _dataTransfer.Id, diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs index bc28cc5611d8..f0ca9318f1d6 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs @@ -177,6 +177,10 @@ public static ServiceToServiceJobPart CreateJobPartFromCheckpoint( createPreference: createPreference); } + /// + /// Processes the job part to chunks + /// + /// The task that's queueing up the chunks public override async Task ProcessPartToChunkAsync() { try @@ -195,7 +199,7 @@ await _destinationResource.SetPermissionsAsync( fileLength = sourceProperties.ResourceLength; if (!fileLength.HasValue) { - await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false); + await InvokeFailedArgAsync(Errors.UnableToGetLength()).ConfigureAwait(false); return; } long length = fileLength.Value; @@ -244,7 +248,7 @@ await QueueStageBlockRequest( } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -269,17 +273,17 @@ await _destinationResource.CopyFromUriAsync( when (_createMode == StorageResourceCreationPreference.SkipIfExists && exception.ErrorCode == "BlobAlreadyExists") { - await InvokeSkippedArg().ConfigureAwait(false); + await InvokeSkippedArgAsync().ConfigureAwait(false); } catch (InvalidOperationException ex) when (_createMode == StorageResourceCreationPreference.SkipIfExists && ex.Message.Contains("Cannot overwrite file.")) { - await InvokeSkippedArg().ConfigureAwait(false); + await InvokeSkippedArgAsync().ConfigureAwait(false); } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -316,11 +320,11 @@ await _destinationResource.CopyBlockFromUriAsync( when (_createMode == StorageResourceCreationPreference.SkipIfExists && exception.ErrorCode == "BlobAlreadyExists") { - await InvokeSkippedArg().ConfigureAwait(false); + await InvokeSkippedArgAsync().ConfigureAwait(false); } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } return false; } @@ -349,7 +353,7 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors( QueuePutBlockTask = jobPart.QueueStageBlockRequest, QueueCommitBlockTask = jobPart.CompleteTransferAsync, ReportProgressInBytes = jobPart.ReportBytesWritten, - InvokeFailedHandler = jobPart.InvokeFailedArg, + InvokeFailedHandler = jobPart.InvokeFailedArgAsync, }; } #endregion @@ -372,7 +376,7 @@ await _destinationResource.CompleteTransferAsync( } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -456,11 +460,11 @@ await _commitBlockHandler.InvokeEvent( // before uploading to it. if (_createMode == StorageResourceCreationPreference.FailIfExists) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } else // (_createMode == StorageResourceCreateMode.Skip) { - await InvokeSkippedArg().ConfigureAwait(false); + await InvokeSkippedArgAsync().ConfigureAwait(false); } } catch (Exception ex) @@ -481,21 +485,21 @@ await _commitBlockHandler.InvokeEvent( { // If the _commitBlockHandler has been disposed before we call to it // we should at least filter the exception to error handling just in case. - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } } - public override async Task InvokeSkippedArg() + public override async Task InvokeSkippedArgAsync() { DisposeHandlers(); - await base.InvokeSkippedArg().ConfigureAwait(false); + await base.InvokeSkippedArgAsync().ConfigureAwait(false); } - public override async Task InvokeFailedArg(Exception ex) + public override async Task InvokeFailedArgAsync(Exception ex) { DisposeHandlers(); - await base.InvokeFailedArg(ex).ConfigureAwait(false); + await base.InvokeFailedArgAsync(ex).ConfigureAwait(false); } internal void DisposeHandlers() diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs index c41a882de603..05558d51a42e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs @@ -178,7 +178,7 @@ public static StreamToUriJobPart CreateJobPartFromCheckpoint( } /// - /// Processes the job to job parts + /// Processes the job part to chunks /// /// The task that's queueing up the chunks public override async Task ProcessPartToChunkAsync() @@ -244,12 +244,12 @@ await QueueStageBlockRequest( else { // TODO: logging when given the event handler - await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false); + await InvokeFailedArgAsync(Errors.UnableToGetLength()).ConfigureAwait(false); } } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -275,16 +275,16 @@ await InitialUploadCall( catch (RequestFailedException r) when (r.ErrorCode == "BlobAlreadyExists" && _createMode == StorageResourceCreationPreference.SkipIfExists) { - await InvokeSkippedArg().ConfigureAwait(false); + await InvokeSkippedArgAsync().ConfigureAwait(false); } catch (InvalidOperationException i) when (i.Message.Contains("Cannot overwrite file.") && _createMode == StorageResourceCreationPreference.SkipIfExists) { - await InvokeSkippedArg().ConfigureAwait(false); + await InvokeSkippedArgAsync().ConfigureAwait(false); } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } // Do not continue if we need to skip or there was an error. @@ -381,7 +381,7 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors( QueuePutBlockTask = jobPart.QueueStageBlockRequest, QueueCommitBlockTask = jobPart.CompleteTransferAsync, ReportProgressInBytes = jobPart.ReportBytesWritten, - InvokeFailedHandler = jobPart.InvokeFailedArg, + InvokeFailedHandler = jobPart.InvokeFailedArgAsync, }; } #endregion @@ -453,7 +453,7 @@ await _commitBlockHandler.InvokeEvent( { // If the _commitBlockHandler has been disposed before we call to it // we should at least filter the exception to error handling just in case. - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } } @@ -555,16 +555,16 @@ private static async Task GetOffsetPartitionInternal( cancellationToken: cancellationToken).ConfigureAwait(false); } - public override async Task InvokeSkippedArg() + public override async Task InvokeSkippedArgAsync() { DisposeHandlers(); - await base.InvokeSkippedArg().ConfigureAwait(false); + await base.InvokeSkippedArgAsync().ConfigureAwait(false); } - public override async Task InvokeFailedArg(Exception ex) + public override async Task InvokeFailedArgAsync(Exception ex) { DisposeHandlers(); - await base.InvokeFailedArg(ex).ConfigureAwait(false); + await base.InvokeFailedArgAsync(ex).ConfigureAwait(false); } internal void DisposeHandlers() diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs index d778320fdf63..1f2339b1c05a 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs @@ -184,7 +184,7 @@ private TransferJobInternal( _errorMode = errorHandling; _creationPreference = creationPreference; - JobPartStatusEvents += JobPartEvent; + JobPartStatusEvents += JobPartEventAsync; TransferStatusEventHandler = statusEventHandler; TransferFailedEventHandler = failedEventHandler; TransferSkippedEventHandler = skippedEventHandler; @@ -271,7 +271,7 @@ public void DisposeHandlers() { if (JobPartStatusEvents != default) { - JobPartStatusEvents -= JobPartEvent; + JobPartStatusEvents -= JobPartEventAsync; } } @@ -303,7 +303,7 @@ public virtual async IAsyncEnumerable ProcessJobToJobPartAsync( // Single resource transfer, we can skip to chunking the job. part = await _createJobPartSingleAsync(this, partNumber).ConfigureAwait(false); AppendJobPart(part); - await OnAllResourcesEnumerated().ConfigureAwait(false); + await OnAllResourcesEnumeratedAsync().ConfigureAwait(false); } catch (Exception ex) { @@ -314,7 +314,7 @@ public virtual async IAsyncEnumerable ProcessJobToJobPartAsync( } else { - await foreach (JobPartInternal part in GetStorageResourcesAsync().ConfigureAwait(false)) + await foreach (JobPartInternal part in EnumerateAndCreateJobPartsAsync().ConfigureAwait(false)) { yield return part; } @@ -345,7 +345,7 @@ public virtual async IAsyncEnumerable ProcessJobToJobPartAsync( if (!isEnumerationComplete) { - await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false)) + await foreach (JobPartInternal jobPartInternal in EnumerateAndCreateJobPartsAsync().ConfigureAwait(false)) { yield return jobPartInternal; } @@ -355,7 +355,7 @@ public virtual async IAsyncEnumerable ProcessJobToJobPartAsync( try { // Call regardless of the outcome of enumeration so job can pause/finish - await OnEnumerationComplete().ConfigureAwait(false); + await OnEnumerationCompleteAsync().ConfigureAwait(false); } catch (Exception ex) { @@ -363,7 +363,7 @@ public virtual async IAsyncEnumerable ProcessJobToJobPartAsync( } } - private async IAsyncEnumerable GetStorageResourcesAsync() + private async IAsyncEnumerable EnumerateAndCreateJobPartsAsync() { // Start the partNumber based on the last part number. If this is a new job, // the count will automatically be at 0 (the beginning). @@ -396,7 +396,7 @@ private async IAsyncEnumerable GetStorageResourcesAsync() _cancellationToken.ThrowIfCancellationRequested(); if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) { - await OnAllResourcesEnumerated().ConfigureAwait(false); + await OnAllResourcesEnumeratedAsync().ConfigureAwait(false); enumerationCompleted = true; continue; } @@ -538,7 +538,7 @@ await TransferFailedEventHandler.RaiseAsync( /// In order to properly propagate the transfer status events of each job part up /// until all job parts have completed. /// - public async Task JobPartEvent(TransferStatusEventArgs args) + public async Task JobPartEventAsync(TransferStatusEventArgs args) { DataTransferStatus jobPartStatus = args.TransferStatus; DataTransferState jobState = _dataTransfer._state.GetTransferStatus().State; @@ -553,7 +553,7 @@ public async Task JobPartEvent(TransferStatusEventArgs args) { if (_dataTransfer._state.SetFailedItemsState()) { - await SetCheckpointerStatus().ConfigureAwait(false); + await SetCheckpointerStatusAsync().ConfigureAwait(false); await OnJobPartStatusChangedAsync().ConfigureAwait(false); } } @@ -561,7 +561,7 @@ public async Task JobPartEvent(TransferStatusEventArgs args) { if (_dataTransfer._state.SetSkippedItemsState()) { - await SetCheckpointerStatus().ConfigureAwait(false); + await SetCheckpointerStatusAsync().ConfigureAwait(false); await OnJobPartStatusChangedAsync().ConfigureAwait(false); } } @@ -604,7 +604,7 @@ public async Task OnJobStateChangedAsync(DataTransferState state) } await OnJobPartStatusChangedAsync().ConfigureAwait(false); - await SetCheckpointerStatus().ConfigureAwait(false); + await SetCheckpointerStatusAsync().ConfigureAwait(false); } } @@ -624,7 +624,7 @@ await TransferStatusEventHandler.RaiseAsync( } } - internal async virtual Task SetCheckpointerStatus() + internal async virtual Task SetCheckpointerStatusAsync() { await _checkpointer.SetJobStatusAsync( transferId: _dataTransfer.Id, @@ -635,7 +635,7 @@ await _checkpointer.SetJobStatusAsync( /// Called when enumeration is complete whether it finished successfully, failed, or was paused. /// All resources may or may not have been enumerated. /// - protected async Task OnEnumerationComplete() + protected async Task OnEnumerationCompleteAsync() { _enumerationComplete = true; @@ -660,7 +660,7 @@ protected async Task OnEnumerationComplete() /// /// Called when all resources have been enumerated successfully. /// - protected async Task OnAllResourcesEnumerated() + protected async Task OnAllResourcesEnumeratedAsync() { await _checkpointer.SetEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false); } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs index 2852d800706f..3d2e7f845151 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs @@ -159,7 +159,7 @@ public virtual async IAsyncEnumerable GetTransfersAsync( [EnumeratorCancellation] CancellationToken cancellationToken = default) { cancellationToken = LinkCancellation(cancellationToken); - await SetDataTransfers(cancellationToken).ConfigureAwait(false); + await SetDataTransfersAsync(cancellationToken).ConfigureAwait(false); IEnumerable totalTransfers; if (filterByStatus == default || filterByStatus.Count == 0) { @@ -419,7 +419,7 @@ private async Task BuildAndAddTransferJobAsync( } #endregion - private async Task SetDataTransfers(CancellationToken cancellationToken = default) + private async Task SetDataTransfersAsync(CancellationToken cancellationToken = default) { _dataTransfers.Clear(); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index cecc1c8fd3be..b271dbdee417 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -180,7 +180,7 @@ public static UriToStreamJobPart CreateJobPartFromCheckpoint( } /// - /// Processes the job to job parts + /// Processes the job part to chunks /// /// Just start downloading using an initial range. If it's a /// small blob, we'll get the whole thing in one shot. If it's @@ -211,7 +211,7 @@ public override async Task ProcessPartToChunkAsync() catch (Exception ex) { // The file either does not exist any more, got moved, or renamed. - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -275,7 +275,7 @@ internal async Task UnknownDownloadInternal() } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -375,7 +375,7 @@ await _destinationResource.CompleteTransferAsync( } catch (Exception ex) { - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } @@ -426,7 +426,7 @@ await _downloadChunkHandler.InvokeEvent(new DownloadRangeEventArgs( { // If the _downloadChunkHandler has been disposed before we call to it // we should at least filter the exception to error handling just in case. - await InvokeFailedArg(ex).ConfigureAwait(false); + await InvokeFailedArgAsync(ex).ConfigureAwait(false); } } } @@ -459,7 +459,7 @@ await _destinationResource.CopyFromStreamAsync( ex.Message.Contains("Cannot overwrite file.")) { // Skip file that already exists on the destination. - await InvokeSkippedArg().ConfigureAwait(false); + await InvokeSkippedArgAsync().ConfigureAwait(false); } return false; } @@ -498,7 +498,7 @@ internal static DownloadChunkHandler.Behaviors GetDownloadChunkHandlerBehaviors( CopyToDestinationFile = job.CopyToStreamInternal, CopyToChunkFile = job.WriteChunkToTempFile, ReportProgressInBytes = job.ReportBytesWritten, - InvokeFailedHandler = job.InvokeFailedArg, + InvokeFailedHandler = job.InvokeFailedArgAsync, QueueCompleteFileDownload = job.QueueCompleteFileDownload }; } @@ -519,16 +519,16 @@ private static IList GetRangesList(long initialLength, long totalLeng } #endregion PartitionedDownloader - public override async Task InvokeSkippedArg() + public override async Task InvokeSkippedArgAsync() { DisposeHandlers(); - await base.InvokeSkippedArg().ConfigureAwait(false); + await base.InvokeSkippedArgAsync().ConfigureAwait(false); } - public override async Task InvokeFailedArg(Exception ex) + public override async Task InvokeFailedArgAsync(Exception ex) { DisposeHandlers(); - await base.InvokeFailedArg(ex).ConfigureAwait(false); + await base.InvokeFailedArgAsync(ex).ConfigureAwait(false); } internal void DisposeHandlers()