diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobContainerServiceToServiceJobTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobContainerServiceToServiceJobTests.cs index f6e97a243e57..fac009689856 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobContainerServiceToServiceJobTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobContainerServiceToServiceJobTests.cs @@ -110,10 +110,12 @@ await checkpointer.AddNewJobAsync( It.IsAny(), It.IsAny())) .Returns(GetStorageResourceItemsAsyncEnumerable(blobItems)); - ServiceToServiceTransferJob transferJob = new ServiceToServiceTransferJob( + TransferJobInternal transferJob = new( new DataTransfer(id: transferId), sourceMock.Object, destinationMock.Object, + ServiceToServiceJobPart.CreateJobPartAsync, + ServiceToServiceJobPart.CreateJobPartAsync, new DataTransferOptions(), checkpointer, DataTransferErrorMode.StopOnAnyFailure, @@ -164,10 +166,12 @@ await checkpointer.AddNewJobAsync( It.IsAny(), It.IsAny())) .Returns(GetStorageResourceItemsAsyncEnumerable(blobItems)); - ServiceToServiceTransferJob transferJob = new ServiceToServiceTransferJob( + TransferJobInternal transferJob = new( new DataTransfer(id: transferId), sourceMock.Object, destinationMock.Object, + ServiceToServiceJobPart.CreateJobPartAsync, + ServiceToServiceJobPart.CreateJobPartAsync, new DataTransferOptions(), checkpointer, DataTransferErrorMode.StopOnAnyFailure, diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs index bd98dc5a1976..79be6c5b6fd1 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs @@ -6,6 +6,7 @@ using System.IO; using System.Buffers; using Azure.Core.Pipeline; +using System; namespace Azure.Storage.DataMovement; @@ -95,113 +96,62 @@ private async Task BuildSingleTransferJob( bool resumeJob, CancellationToken cancellationToken) { - // If the resource cannot produce a Uri, it means it can only produce a local path - // From here we only support an upload job - if (sourceResource.IsLocalResource()) + TransferJobInternal.CreateJobPartSingleAsync single; + TransferJobInternal.CreateJobPartMultiAsync multi; + Func rehydrate; + if (sourceResource.IsLocalResource() && !destinationResource.IsLocalResource()) { - if (!destinationResource.IsLocalResource()) - { - // Stream to Uri job (Upload Job) - StreamToUriTransferJob streamToUriJob = new StreamToUriTransferJob( - dataTransfer: dataTransfer, - sourceResource: sourceResource, - destinationResource: destinationResource, - transferOptions: transferOptions, - checkpointer: checkpointer, - errorHandling: _errorHandling, - arrayPool: _arrayPool, - clientDiagnostics: ClientDiagnostics); - - if (resumeJob) - { - using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( - transferId: dataTransfer.Id, - partNumber: 0, - offset: 0, - length: 0, - cancellationToken: cancellationToken).ConfigureAwait(false)) - { - streamToUriJob.AppendJobPart( - streamToUriJob.ToJobPartAsync( - stream, - sourceResource, - destinationResource)); - } - } - return streamToUriJob; - } - else // Invalid argument that both resources do not produce a Uri - { - throw Errors.InvalidSourceDestinationParams(); - } + single = StreamToUriJobPart.CreateJobPartAsync; + multi = StreamToUriJobPart.CreateJobPartAsync; + rehydrate = DataMovementExtensions.ToStreamToUriJobPartAsync; + } + else if (!sourceResource.IsLocalResource() && destinationResource.IsLocalResource()) + { + single = UriToStreamJobPart.CreateJobPartAsync; + multi = UriToStreamJobPart.CreateJobPartAsync; + rehydrate = DataMovementExtensions.ToUriToStreamJobPartAsync; + } + else if (!sourceResource.IsLocalResource() && !destinationResource.IsLocalResource()) + { + single = ServiceToServiceJobPart.CreateJobPartAsync; + multi = ServiceToServiceJobPart.CreateJobPartAsync; + rehydrate = DataMovementExtensions.ToServiceToServiceJobPartAsync; } else { - // Source is remote - if (!destinationResource.IsLocalResource()) - { - // Service to Service Job (Copy job) - ServiceToServiceTransferJob serviceToServiceJob = new ServiceToServiceTransferJob( - dataTransfer: dataTransfer, - sourceResource: sourceResource, - destinationResource: destinationResource, - transferOptions: transferOptions, - CheckPointFolderPath: checkpointer, - errorHandling: _errorHandling, - arrayPool: _arrayPool, - clientDiagnostics: ClientDiagnostics); + throw Errors.InvalidSourceDestinationParams(); + } - if (resumeJob) - { - using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( - transferId: dataTransfer.Id, - partNumber: 0, - offset: 0, - length: 0, - cancellationToken: cancellationToken).ConfigureAwait(false)) - { - serviceToServiceJob.AppendJobPart( - serviceToServiceJob.ToJobPartAsync( - stream, - sourceResource, - destinationResource)); - } - } - return serviceToServiceJob; - } - else + TransferJobInternal job = new( + dataTransfer: dataTransfer, + sourceResource: sourceResource, + destinationResource: destinationResource, + single, + multi, + transferOptions: transferOptions, + checkpointer: checkpointer, + errorHandling: _errorHandling, + arrayPool: _arrayPool, + clientDiagnostics: ClientDiagnostics); + + if (resumeJob) + { + using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( + transferId: dataTransfer.Id, + partNumber: 0, + offset: 0, + length: 0, + cancellationToken: cancellationToken).ConfigureAwait(false)) { - // Download to local operation - // Service to Local job (Download Job) - UriToStreamTransferJob uriToStreamJob = new UriToStreamTransferJob( - dataTransfer: dataTransfer, - sourceResource: sourceResource, - destinationResource: destinationResource, - transferOptions: transferOptions, - checkpointer: checkpointer, - errorHandling: _errorHandling, - arrayPool: _arrayPool, - clientDiagnostics: ClientDiagnostics); - - if (resumeJob) - { - using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( - transferId: dataTransfer.Id, - partNumber: 0, - offset: 0, - length: 0, - cancellationToken: cancellationToken).ConfigureAwait(false)) - { - uriToStreamJob.AppendJobPart( - uriToStreamJob.ToJobPartAsync( - stream, - sourceResource, - destinationResource)); - } - } - return uriToStreamJob; + job.AppendJobPart( + rehydrate( + job, + stream, + sourceResource, + destinationResource)); } } + return job; } private async Task BuildContainerTransferJob( @@ -213,133 +163,68 @@ private async Task BuildContainerTransferJob( bool resumeJob, CancellationToken cancellationToken) { - // If the resource cannot produce a Uri, it means it can only produce a local path - // From here we only support an upload job - if (sourceResource.IsLocalResource()) + TransferJobInternal.CreateJobPartSingleAsync single; + TransferJobInternal.CreateJobPartMultiAsync multi; + Func rehydrate; + if (sourceResource.IsLocalResource() && !destinationResource.IsLocalResource()) { - if (!destinationResource.IsLocalResource()) - { - // Stream to Uri job (Upload Job) - StreamToUriTransferJob streamToUriJob = new StreamToUriTransferJob( - dataTransfer: dataTransfer, - sourceResource: sourceResource, - destinationResource: destinationResource, - transferOptions: transferOptions, - checkpointer: checkpointer, - errorHandling: _errorHandling, - arrayPool: _arrayPool, - clientDiagnostics: ClientDiagnostics); - - if (resumeJob) - { - // Iterate through all job parts and append to the job - int jobPartCount = await checkpointer.CurrentJobPartCountAsync( - transferId: dataTransfer.Id, - cancellationToken: cancellationToken).ConfigureAwait(false); - for (var currentJobPart = 0; currentJobPart < jobPartCount; currentJobPart++) - { - using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( - transferId: dataTransfer.Id, - partNumber: currentJobPart, - offset: 0, - length: 0, - cancellationToken: cancellationToken).ConfigureAwait(false)) - { - streamToUriJob.AppendJobPart( - streamToUriJob.ToJobPartAsync( - stream, - sourceResource, - destinationResource)); - } - } - } - return streamToUriJob; - } - else // Invalid argument that both resources do not produce a Uri - { - throw Errors.InvalidSourceDestinationParams(); - } + single = StreamToUriJobPart.CreateJobPartAsync; + multi = StreamToUriJobPart.CreateJobPartAsync; + rehydrate = DataMovementExtensions.ToStreamToUriJobPartAsync; + } + else if (!sourceResource.IsLocalResource() && destinationResource.IsLocalResource()) + { + single = UriToStreamJobPart.CreateJobPartAsync; + multi = UriToStreamJobPart.CreateJobPartAsync; + rehydrate = DataMovementExtensions.ToUriToStreamJobPartAsync; + } + else if (!sourceResource.IsLocalResource() && !destinationResource.IsLocalResource()) + { + single = ServiceToServiceJobPart.CreateJobPartAsync; + multi = ServiceToServiceJobPart.CreateJobPartAsync; + rehydrate = DataMovementExtensions.ToServiceToServiceJobPartAsync; } else { - // Source is remote - if (!destinationResource.IsLocalResource()) - { - // Service to Service Job (Copy job) - ServiceToServiceTransferJob serviceToServiceJob = new ServiceToServiceTransferJob( - dataTransfer: dataTransfer, - sourceResource: sourceResource, - destinationResource: destinationResource, - transferOptions: transferOptions, - checkpointer: checkpointer, - errorHandling: _errorHandling, - arrayPool: _arrayPool, - clientDiagnostics: ClientDiagnostics); + throw Errors.InvalidSourceDestinationParams(); + } - if (resumeJob) - { - // Iterate through all job parts and append to the job - int jobPartCount = await checkpointer.CurrentJobPartCountAsync( - transferId: dataTransfer.Id, - cancellationToken: cancellationToken).ConfigureAwait(false); - for (var currentJobPart = 0; currentJobPart < jobPartCount; currentJobPart++) - { - using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( - transferId: dataTransfer.Id, - partNumber: currentJobPart, - offset: 0, - length: 0, - cancellationToken: cancellationToken).ConfigureAwait(false)) - { - serviceToServiceJob.AppendJobPart( - serviceToServiceJob.ToJobPartAsync( - stream, - sourceResource, - destinationResource)); - } - } - } - return serviceToServiceJob; - } - else + TransferJobInternal job = new( + dataTransfer: dataTransfer, + sourceResource: sourceResource, + destinationResource: destinationResource, + single, + multi, + transferOptions: transferOptions, + checkpointer: checkpointer, + errorHandling: _errorHandling, + arrayPool: _arrayPool, + clientDiagnostics: ClientDiagnostics); + + if (resumeJob) + { + // Iterate through all job parts and append to the job + int jobPartCount = await checkpointer.CurrentJobPartCountAsync( + transferId: dataTransfer.Id, + cancellationToken: cancellationToken).ConfigureAwait(false); + for (var currentJobPart = 0; currentJobPart < jobPartCount; currentJobPart++) { - // Download to local operation - // Service to Local job (Download Job) - UriToStreamTransferJob uriToStreamJob = new UriToStreamTransferJob( - dataTransfer: dataTransfer, - sourceResource: sourceResource, - destinationResource: destinationResource, - transferOptions: transferOptions, - checkpointer: checkpointer, - errorHandling: _errorHandling, - arrayPool: _arrayPool, - clientDiagnostics: ClientDiagnostics); - - if (resumeJob) + using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( + transferId: dataTransfer.Id, + partNumber: currentJobPart, + offset: 0, + length: 0, + cancellationToken: cancellationToken).ConfigureAwait(false)) { - // Iterate through all job parts and append to the job - int jobPartCount = await checkpointer.CurrentJobPartCountAsync( - transferId: dataTransfer.Id, - cancellationToken: cancellationToken).ConfigureAwait(false); - for (var currentJobPart = 0; currentJobPart < jobPartCount; currentJobPart++) - { - using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( - transferId: dataTransfer.Id, - partNumber: currentJobPart, - offset: 0, - length: 0, - cancellationToken: cancellationToken).ConfigureAwait(false)) - { - uriToStreamJob.AppendJobPart( - uriToStreamJob.ToJobPartAsync( - stream, - sourceResource, - destinationResource)); - } - } + job.AppendJobPart( + rehydrate( + job, + stream, + sourceResource, + destinationResource)); } - return uriToStreamJob; } } + return job; } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs index 74577c761c35..dbb764ed878d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs @@ -24,7 +24,7 @@ internal class ServiceToServiceJobPart : JobPartInternal, IDisposable /// /// Creating job part based on a single transfer job /// - private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber) + private ServiceToServiceJobPart(TransferJobInternal job, int partNumber) : base(dataTransfer: job._dataTransfer, partNumber: partNumber, sourceResource: job._sourceResource, @@ -50,7 +50,7 @@ private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber) /// Creating transfer job based on a storage resource created from listing. /// private ServiceToServiceJobPart( - ServiceToServiceTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, @@ -82,7 +82,7 @@ private ServiceToServiceJobPart( /// Creating transfer job based on a checkpoint file. /// private ServiceToServiceJobPart( - ServiceToServiceTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, @@ -121,8 +121,8 @@ public void Dispose() /// /// Called when creating a job part from a single transfer. /// - public static async Task CreateJobPartAsync( - ServiceToServiceTransferJob job, + public static async Task CreateJobPartAsync( + TransferJobInternal job, int partNumber) { // Create Job Part file as we're initializing the job part @@ -134,12 +134,11 @@ public static async Task CreateJobPartAsync( /// /// Called when creating a job part from a container transfer. /// - public static async Task CreateJobPartAsync( - ServiceToServiceTransferJob job, + public static async Task CreateJobPartAsync( + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, - StorageResourceItem destinationResource, - long? length = default) + StorageResourceItem destinationResource) { Argument.AssertNotNull(sourceResource, nameof(sourceResource)); Argument.AssertNotNull(destinationResource, nameof(destinationResource)); @@ -149,8 +148,7 @@ public static async Task CreateJobPartAsync( job: job, partNumber: partNumber, sourceResource: sourceResource, - destinationResource: destinationResource, - length: length); + destinationResource: destinationResource); await part.AddJobPartToCheckpointerAsync().ConfigureAwait(false); return part; } @@ -159,7 +157,7 @@ public static async Task CreateJobPartAsync( /// Called when creating a job part from a checkpoint file on resume. /// public static ServiceToServiceJobPart CreateJobPartFromCheckpoint( - ServiceToServiceTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs deleted file mode 100644 index 1afa3db3dd13..000000000000 --- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs +++ /dev/null @@ -1,225 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System; -using System.Buffers; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Azure.Core.Pipeline; - -namespace Azure.Storage.DataMovement -{ - internal class ServiceToServiceTransferJob : TransferJobInternal - { - /// - /// Create Storage Transfer Job for single transfer - /// - internal ServiceToServiceTransferJob( - DataTransfer dataTransfer, - StorageResourceItem sourceResource, - StorageResourceItem destinationResource, - DataTransferOptions transferOptions, - TransferCheckpointer CheckPointFolderPath, - DataTransferErrorMode errorHandling, - ArrayPool arrayPool, - ClientDiagnostics clientDiagnostics) - : base(dataTransfer, - sourceResource, - destinationResource, - transferOptions, - CheckPointFolderPath, - errorHandling, - arrayPool, - clientDiagnostics) - { - } - - /// - /// Create Storage Transfer Job for container transfer - /// - internal ServiceToServiceTransferJob( - DataTransfer dataTransfer, - StorageResourceContainer sourceResource, - StorageResourceContainer destinationResource, - DataTransferOptions transferOptions, - TransferCheckpointer checkpointer, - DataTransferErrorMode errorHandling, - ArrayPool arrayPool, - ClientDiagnostics clientDiagnostics) - : base(dataTransfer, - sourceResource, - destinationResource, - transferOptions, - checkpointer, - errorHandling, - arrayPool, - clientDiagnostics) - { - } - - /// - /// Processes the job to job parts - /// - /// An IEnumerable that contains the job parts - public override async IAsyncEnumerable ProcessJobToJobPartAsync() - { - await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); - int partNumber = 0; - - if (_jobParts.Count == 0) - { - // Starting brand new job - if (_isSingleResource) - { - ServiceToServiceJobPart part = default; - try - { - // Single resource transfer, we can skip to chunking the job. - part = await ServiceToServiceJobPart.CreateJobPartAsync( - job: this, - partNumber: partNumber).ConfigureAwait(false); - AppendJobPart(part); - await OnAllResourcesEnumerated().ConfigureAwait(false); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - yield return part; - } - else - { - await foreach (JobPartInternal part in GetStorageResourcesAsync().ConfigureAwait(false)) - { - yield return part; - } - } - } - else - { - // Resuming old job with existing job parts - foreach (JobPartInternal part in _jobParts) - { - if (!part.JobPartStatus.HasCompletedSuccessfully) - { - part.JobPartStatus.SetTransferStateChange(DataTransferState.Queued); - yield return part; - } - } - - if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false)) - { - await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false)) - { - yield return jobPartInternal; - } - } - } - - // Call regardless of the outcome of enumeration so job can pause/finish - await OnEnumerationComplete().ConfigureAwait(false); - } - - private async IAsyncEnumerable GetStorageResourcesAsync() - { - // Start the partNumber based on the last part number. If this is a new job, - // the count will automatically be at 0 (the beginning). - int partNumber = _jobParts.Count; - HashSet existingSources = GetJobPartSourceResourcePaths(); - // Call listing operation on the source container - IAsyncEnumerator enumerator; - - // Obtain enumerator and check for any point of failure before we attempt to list - // and fail gracefully. - try - { - enumerator = _sourceResourceContainer.GetStorageResourcesAsync( - destinationContainer: _destinationResourceContainer, - cancellationToken: _cancellationToken).GetAsyncEnumerator(); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - - // List the container in this specific way because MoveNext needs to be separately wrapped - // in a try/catch as we can't yield return inside a try/catch. - bool enumerationCompleted = false; - while (!enumerationCompleted) - { - try - { - _cancellationToken.ThrowIfCancellationRequested(); - if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) - { - await OnAllResourcesEnumerated().ConfigureAwait(false); - enumerationCompleted = true; - continue; - } - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - - StorageResource current = enumerator.Current; - - if (current.IsContainer) - { - // Create sub-container - string containerUriPath = _sourceResourceContainer.Uri.GetPath(); - string subContainerPath = string.IsNullOrEmpty(containerUriPath) - ? current.Uri.GetPath() - : current.Uri.GetPath().Substring(containerUriPath.Length + 1); - StorageResourceContainer subContainer = - _destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath); - - try - { - await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - } - else - { - if (!existingSources.Contains(current.Uri)) - { - string containerUriPath = _sourceResourceContainer.Uri.GetPath(); - string sourceName = string.IsNullOrEmpty(containerUriPath) - ? current.Uri.GetPath() - : current.Uri.GetPath().Substring(containerUriPath.Length + 1); - - ServiceToServiceJobPart part; - try - { - StorageResourceItem sourceItem = (StorageResourceItem)current; - part = await ServiceToServiceJobPart.CreateJobPartAsync( - job: this, - partNumber: partNumber, - sourceResource: sourceItem, - destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName, sourceItem.ResourceId)) - .ConfigureAwait(false); - AppendJobPart(part); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - yield return part; - partNumber++; - } - } - } - } - } -} diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs index 4f82fc571511..8f980dd8db79 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs @@ -21,8 +21,8 @@ internal static StorageResourceItemProperties ToStorageResourceProperties(this F properties: properties); } - public static StreamToUriJobPart ToJobPartAsync( - this StreamToUriTransferJob baseJob, + public static StreamToUriJobPart ToStreamToUriJobPartAsync( + this TransferJobInternal baseJob, Stream planFileStream, StorageResourceItem sourceResource, StorageResourceItem destinationResource) @@ -53,8 +53,8 @@ public static StreamToUriJobPart ToJobPartAsync( return jobPart; } - public static ServiceToServiceJobPart ToJobPartAsync( - this ServiceToServiceTransferJob baseJob, + public static ServiceToServiceJobPart ToServiceToServiceJobPartAsync( + this TransferJobInternal baseJob, Stream planFileStream, StorageResourceItem sourceResource, StorageResourceItem destinationResource) @@ -85,8 +85,8 @@ public static ServiceToServiceJobPart ToJobPartAsync( return jobPart; } - public static UriToStreamJobPart ToJobPartAsync( - this UriToStreamTransferJob baseJob, + public static UriToStreamJobPart ToUriToStreamJobPartAsync( + this TransferJobInternal baseJob, Stream planFileStream, StorageResourceItem sourceResource, StorageResourceItem destinationResource) @@ -117,8 +117,8 @@ public static UriToStreamJobPart ToJobPartAsync( return jobPart; } - public static StreamToUriJobPart ToJobPartAsync( - this StreamToUriTransferJob baseJob, + public static StreamToUriJobPart ToStreamToUriJobPartAsync( + this TransferJobInternal baseJob, Stream planFileStream, StorageResourceContainer sourceResource, StorageResourceContainer destinationResource) @@ -153,8 +153,8 @@ public static StreamToUriJobPart ToJobPartAsync( return jobPart; } - public static ServiceToServiceJobPart ToJobPartAsync( - this ServiceToServiceTransferJob baseJob, + public static ServiceToServiceJobPart ToServiceToServiceJobPartAsync( + this TransferJobInternal baseJob, Stream planFileStream, StorageResourceContainer sourceResource, StorageResourceContainer destinationResource) @@ -187,8 +187,8 @@ public static ServiceToServiceJobPart ToJobPartAsync( return jobPart; } - public static UriToStreamJobPart ToJobPartAsync( - this UriToStreamTransferJob baseJob, + public static UriToStreamJobPart ToUriToStreamJobPartAsync( + this TransferJobInternal baseJob, Stream planFileStream, StorageResourceContainer sourceResource, StorageResourceContainer destinationResource) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs index 39197042075f..db106b2d6544 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs @@ -24,7 +24,7 @@ internal class StreamToUriJobPart : JobPartInternal, IDisposable /// /// Creating job part based on a single transfer job /// - private StreamToUriJobPart(StreamToUriTransferJob job, int partNumber) + private StreamToUriJobPart(TransferJobInternal job, int partNumber) : base(dataTransfer: job._dataTransfer, partNumber: partNumber, sourceResource: job._sourceResource, @@ -50,7 +50,7 @@ private StreamToUriJobPart(StreamToUriTransferJob job, int partNumber) /// Creating transfer job based on a storage resource created from listing. /// private StreamToUriJobPart( - StreamToUriTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, @@ -82,7 +82,7 @@ private StreamToUriJobPart( /// Creating transfer job based on a checkpoint file. /// private StreamToUriJobPart( - StreamToUriTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, @@ -121,8 +121,8 @@ public void Dispose() /// /// Called when creating a job part from a single transfer. /// - public static async Task CreateJobPartAsync( - StreamToUriTransferJob job, + public static async Task CreateJobPartAsync( + TransferJobInternal job, int partNumber) { // Create Job Part file as we're initializing the job part @@ -134,12 +134,11 @@ public static async Task CreateJobPartAsync( /// /// Called when creating a job part from a container transfer. /// - public static async Task CreateJobPartAsync( - StreamToUriTransferJob job, + public static async Task CreateJobPartAsync( + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, - StorageResourceItem destinationResource, - long? length = default) + StorageResourceItem destinationResource) { Argument.AssertNotNull(sourceResource, nameof(sourceResource)); Argument.AssertNotNull(destinationResource, nameof(destinationResource)); @@ -149,8 +148,7 @@ public static async Task CreateJobPartAsync( job: job, partNumber: partNumber, sourceResource: sourceResource, - destinationResource: destinationResource, - length: length); + destinationResource: destinationResource); await part.AddJobPartToCheckpointerAsync().ConfigureAwait(false); return part; } @@ -159,7 +157,7 @@ public static async Task CreateJobPartAsync( /// Called when creating a job part from a checkpoint file on resume. /// public static StreamToUriJobPart CreateJobPartFromCheckpoint( - StreamToUriTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs deleted file mode 100644 index 4eaf4c62da0d..000000000000 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System.Collections.Generic; -using System.Threading.Tasks; -using System.Buffers; -using System; -using Azure.Core.Pipeline; - -namespace Azure.Storage.DataMovement -{ - internal class StreamToUriTransferJob : TransferJobInternal - { - /// - /// Create Storage Transfer Job. - /// - internal StreamToUriTransferJob( - DataTransfer dataTransfer, - StorageResourceItem sourceResource, - StorageResourceItem destinationResource, - DataTransferOptions transferOptions, - TransferCheckpointer checkpointer, - DataTransferErrorMode errorHandling, - ArrayPool arrayPool, - ClientDiagnostics clientDiagnostics) - : base(dataTransfer, - sourceResource, - destinationResource, - transferOptions, - checkpointer, - errorHandling, - arrayPool, - clientDiagnostics) - { - } - - /// - /// Create Storage Transfer Job. - /// - internal StreamToUriTransferJob( - DataTransfer dataTransfer, - StorageResourceContainer sourceResource, - StorageResourceContainer destinationResource, - DataTransferOptions transferOptions, - TransferCheckpointer checkpointer, - DataTransferErrorMode errorHandling, - ArrayPool arrayPool, - ClientDiagnostics clientDiagnostics) - : base(dataTransfer, - sourceResource, - destinationResource, - transferOptions, - checkpointer, - errorHandling, - arrayPool, - clientDiagnostics) - { - } - - /// - /// Processes the job to job parts - /// - /// An IEnumerable that contains the job parts - public override async IAsyncEnumerable ProcessJobToJobPartAsync() - { - await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); - int partNumber = 0; - - if (_jobParts.Count == 0) - { - // Starting brand new job - if (_isSingleResource) - { - StreamToUriJobPart part = default; - try - { - // Single resource transfer, we can skip to chunking the job. - part = await StreamToUriJobPart.CreateJobPartAsync( - job: this, - partNumber: partNumber).ConfigureAwait(false); - AppendJobPart(part); - await OnAllResourcesEnumerated().ConfigureAwait(false); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - yield return part; - } - else - { - await foreach (JobPartInternal part in GetStorageResourcesAsync().ConfigureAwait(false)) - { - yield return part; - } - } - } - else - { - // Resuming old job with existing job parts - foreach (JobPartInternal part in _jobParts) - { - if (!part.JobPartStatus.HasCompletedSuccessfully) - { - part.JobPartStatus.SetTransferStateChange(DataTransferState.Queued); - yield return part; - } - } - - if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false)) - { - await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false)) - { - yield return jobPartInternal; - } - } - } - - // Call regardless of the outcome of enumeration so job can pause/finish - await OnEnumerationComplete().ConfigureAwait(false); - } - - private async IAsyncEnumerable GetStorageResourcesAsync() - { - // Start the partNumber based on the last part number. If this is a new job, - // the count will automatically be at 0 (the beginning). - int partNumber = _jobParts.Count; - HashSet existingSources = GetJobPartSourceResourcePaths(); - // Call listing operation on the source container - IAsyncEnumerator enumerator; - - // Obtain enumerator and check for any point of failure before we attempt to list - // and fail gracefully. - try - { - enumerator = _sourceResourceContainer.GetStorageResourcesAsync( - cancellationToken: _cancellationToken).GetAsyncEnumerator(); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - - // List the container in this specific way because MoveNext needs to be separately wrapped - // in a try/catch as we can't yield return inside a try/catch. - bool enumerationCompleted = false; - while (!enumerationCompleted) - { - try - { - _cancellationToken.ThrowIfCancellationRequested(); - if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) - { - await OnAllResourcesEnumerated().ConfigureAwait(false); - enumerationCompleted = true; - continue; - } - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - - StorageResource current = enumerator.Current; - - if (current.IsContainer) - { - // Create sub-container - string containerUriPath = _sourceResourceContainer.Uri.GetPath(); - string subContainerPath = string.IsNullOrEmpty(containerUriPath) - ? current.Uri.GetPath() - : current.Uri.GetPath().Substring(containerUriPath.Length + 1); - StorageResourceContainer subContainer = - _destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath); - - try - { - await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - } - else - { - if (!existingSources.Contains(current.Uri)) - { - string containerUriPath = _sourceResourceContainer.Uri.GetPath(); - string sourceName = string.IsNullOrEmpty(containerUriPath) - ? current.Uri.GetPath() - : current.Uri.GetPath().Substring(containerUriPath.Length + 1); - - StreamToUriJobPart part; - try - { - part = await StreamToUriJobPart.CreateJobPartAsync( - job: this, - partNumber: partNumber, - sourceResource: (StorageResourceItem)current, - destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName, default)) - .ConfigureAwait(false); - AppendJobPart(part); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - yield return part; - partNumber++; - } - } - } - } - } -} diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs index f60858399372..00b15ce496b5 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs @@ -12,13 +12,27 @@ namespace Azure.Storage.DataMovement { - internal abstract class TransferJobInternal : IDisposable + internal class TransferJobInternal : IDisposable { + internal delegate Task CreateJobPartSingleAsync( + TransferJobInternal job, + int partNumber); + + internal delegate Task CreateJobPartMultiAsync( + TransferJobInternal job, + int partNumber, + StorageResourceItem sourceResource, + StorageResourceItem destinationResource); + /// /// DataTransfer communicate when the transfer has finished and the progress /// internal DataTransfer _dataTransfer { get; set; } + private readonly CreateJobPartSingleAsync _createJobPartSingleAsync; + + private readonly CreateJobPartMultiAsync _createJobPartMultiAsync; + /// /// Plan file writer for the respective job /// @@ -135,6 +149,8 @@ internal protected TransferJobInternal() private TransferJobInternal( DataTransfer dataTransfer, + CreateJobPartSingleAsync createJobPartSingleAsync, + CreateJobPartMultiAsync createJobPartMultiAsync, TransferCheckpointer checkPointer, DataTransferErrorMode errorHandling, long? initialTransferSize, @@ -151,6 +167,8 @@ private TransferJobInternal( _dataTransfer = dataTransfer ?? throw Errors.ArgumentNull(nameof(dataTransfer)); _dataTransfer.TransferStatus.SetTransferStateChange(DataTransferState.Queued); + _createJobPartSingleAsync = createJobPartSingleAsync; + _createJobPartMultiAsync = createJobPartMultiAsync; _checkpointer = checkPointer; _arrayPool = arrayPool; _jobParts = new List(); @@ -181,12 +199,16 @@ internal TransferJobInternal( DataTransfer dataTransfer, StorageResourceItem sourceResource, StorageResourceItem destinationResource, + CreateJobPartSingleAsync createJobPartSingleAsync, + CreateJobPartMultiAsync createJobPartMultiAsync, DataTransferOptions transferOptions, TransferCheckpointer checkpointer, DataTransferErrorMode errorHandling, ArrayPool arrayPool, ClientDiagnostics clientDiagnostics) : this(dataTransfer, + createJobPartSingleAsync, + createJobPartMultiAsync, checkpointer, errorHandling, transferOptions.InitialTransferSize, @@ -212,12 +234,16 @@ internal TransferJobInternal( DataTransfer dataTransfer, StorageResourceContainer sourceResource, StorageResourceContainer destinationResource, + CreateJobPartSingleAsync createJobPartSingleAsync, + CreateJobPartMultiAsync createJobPartMultiAsync, DataTransferOptions transferOptions, TransferCheckpointer checkpointer, DataTransferErrorMode errorHandling, ArrayPool arrayPool, ClientDiagnostics clientDiagnostics) : this(dataTransfer, + createJobPartSingleAsync, + createJobPartMultiAsync, checkpointer, errorHandling, transferOptions.InitialTransferSize, @@ -253,7 +279,162 @@ public void DisposeHandlers() /// Processes the job to job parts /// /// An IEnumerable that contains the job parts - public abstract IAsyncEnumerable ProcessJobToJobPartAsync(); + public virtual async IAsyncEnumerable ProcessJobToJobPartAsync() + { + await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); + int partNumber = 0; + + if (_jobParts.Count == 0) + { + // Starting brand new job + if (_isSingleResource) + { + JobPartInternal part = default; + try + { + // Single resource transfer, we can skip to chunking the job. + part = await _createJobPartSingleAsync(this, partNumber).ConfigureAwait(false); + AppendJobPart(part); + await OnAllResourcesEnumerated().ConfigureAwait(false); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } + yield return part; + } + else + { + await foreach (JobPartInternal part in GetStorageResourcesAsync().ConfigureAwait(false)) + { + yield return part; + } + } + } + else + { + // Resuming old job with existing job parts + foreach (JobPartInternal part in _jobParts) + { + if (!part.JobPartStatus.HasCompletedSuccessfully) + { + part.JobPartStatus.SetTransferStateChange(DataTransferState.Queued); + yield return part; + } + } + + if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false)) + { + await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false)) + { + yield return jobPartInternal; + } + } + } + + // Call regardless of the outcome of enumeration so job can pause/finish + await OnEnumerationComplete().ConfigureAwait(false); + } + + private async IAsyncEnumerable GetStorageResourcesAsync() + { + // Start the partNumber based on the last part number. If this is a new job, + // the count will automatically be at 0 (the beginning). + int partNumber = _jobParts.Count; + HashSet existingSources = GetJobPartSourceResourcePaths(); + // Call listing operation on the source container + IAsyncEnumerator enumerator; + + // Obtain enumerator and check for any point of failure before we attempt to list + // and fail gracefully. + try + { + enumerator = _sourceResourceContainer.GetStorageResourcesAsync( + destinationContainer: _destinationResourceContainer, + cancellationToken: _cancellationToken).GetAsyncEnumerator(); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } + + // List the container in this specific way because MoveNext needs to be separately wrapped + // in a try/catch as we can't yield return inside a try/catch. + bool enumerationCompleted = false; + while (!enumerationCompleted) + { + try + { + _cancellationToken.ThrowIfCancellationRequested(); + if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) + { + await OnAllResourcesEnumerated().ConfigureAwait(false); + enumerationCompleted = true; + continue; + } + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } + + StorageResource current = enumerator.Current; + + if (current.IsContainer) + { + // Create sub-container + string containerUriPath = _sourceResourceContainer.Uri.GetPath(); + string subContainerPath = string.IsNullOrEmpty(containerUriPath) + ? current.Uri.GetPath() + : current.Uri.GetPath().Substring(containerUriPath.Length + 1); + StorageResourceContainer subContainer = + _destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath); + + try + { + await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } + } + else + { + if (!existingSources.Contains(current.Uri)) + { + string containerUriPath = _sourceResourceContainer.Uri.GetPath(); + string sourceName = string.IsNullOrEmpty(containerUriPath) + ? current.Uri.GetPath() + : current.Uri.GetPath().Substring(containerUriPath.Length + 1); + + JobPartInternal part; + try + { + StorageResourceItem sourceItem = (StorageResourceItem)current; + part = await _createJobPartMultiAsync( + this, + partNumber, + sourceItem, + _destinationResourceContainer.GetStorageResourceReference(sourceName, sourceItem.ResourceId)) + .ConfigureAwait(false); + AppendJobPart(part); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } + yield return part; + partNumber++; + } + } + } + } /// /// Triggers the cancellation for the Job Part. diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index e7451b124ac5..55983aa0db1e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -26,7 +26,7 @@ internal class UriToStreamJobPart : JobPartInternal, IDisposable /// Creating job part based on a single transfer job /// private UriToStreamJobPart( - UriToStreamTransferJob job, + TransferJobInternal job, int partNumber) : base(dataTransfer: job._dataTransfer, partNumber: partNumber, @@ -52,7 +52,7 @@ private UriToStreamJobPart( /// Creating transfer job based on a storage resource created from listing. /// private UriToStreamJobPart( - UriToStreamTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, @@ -84,7 +84,7 @@ private UriToStreamJobPart( /// Creating transfer job based on a checkpoint file. /// private UriToStreamJobPart( - UriToStreamTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, @@ -123,8 +123,8 @@ public void Dispose() /// /// Called when creating a job part from a single transfer. /// - public static async Task CreateJobPartAsync( - UriToStreamTransferJob job, + public static async Task CreateJobPartAsync( + TransferJobInternal job, int partNumber) { // Create Job Part file as we're initializing the job part @@ -136,12 +136,11 @@ public static async Task CreateJobPartAsync( /// /// Called when creating a job part from a container transfer. /// - public static async Task CreateJobPartAsync( - UriToStreamTransferJob job, + public static async Task CreateJobPartAsync( + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, - StorageResourceItem destinationResource, - long? length = default) + StorageResourceItem destinationResource) { Argument.AssertNotNull(sourceResource, nameof(sourceResource)); Argument.AssertNotNull(destinationResource, nameof(destinationResource)); @@ -151,8 +150,7 @@ public static async Task CreateJobPartAsync( job: job, partNumber: partNumber, sourceResource: sourceResource, - destinationResource: destinationResource, - length: length); + destinationResource: destinationResource); await part.AddJobPartToCheckpointerAsync().ConfigureAwait(false); return part; } @@ -161,7 +159,7 @@ public static async Task CreateJobPartAsync( /// Called when creating a job part from a checkpoint file on resume. /// public static UriToStreamJobPart CreateJobPartFromCheckpoint( - UriToStreamTransferJob job, + TransferJobInternal job, int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamTransferJob.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamTransferJob.cs deleted file mode 100644 index d94490bdd9ad..000000000000 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamTransferJob.cs +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -using System.Buffers; -using System.Collections.Generic; -using System.Threading.Tasks; -using System; -using Azure.Core.Pipeline; - -namespace Azure.Storage.DataMovement -{ - internal class UriToStreamTransferJob : TransferJobInternal - { - /// - /// Create Storage Transfer Job for single transfer - /// - internal UriToStreamTransferJob( - DataTransfer dataTransfer, - StorageResourceItem sourceResource, - StorageResourceItem destinationResource, - DataTransferOptions transferOptions, - TransferCheckpointer checkpointer, - DataTransferErrorMode errorHandling, - ArrayPool arrayPool, - ClientDiagnostics clientDiagnostics) - : base(dataTransfer, - sourceResource, - destinationResource, - transferOptions, - checkpointer, - errorHandling, - arrayPool, - clientDiagnostics) - { - } - - /// - /// Create Storage Transfer Job for container transfer - /// - internal UriToStreamTransferJob( - DataTransfer dataTransfer, - StorageResourceContainer sourceResource, - StorageResourceContainer destinationResource, - DataTransferOptions transferOptions, - TransferCheckpointer checkpointer, - DataTransferErrorMode errorHandling, - ArrayPool arrayPool, - ClientDiagnostics clientDiagnostics) - : base(dataTransfer, - sourceResource, - destinationResource, - transferOptions, - checkpointer, - errorHandling, - arrayPool, - clientDiagnostics) - { - } - - /// - /// Processes the job to job parts - /// - /// An IEnumerable that contains the job parts - public override async IAsyncEnumerable ProcessJobToJobPartAsync() - { - await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false); - int partNumber = 0; - - if (_jobParts.Count == 0) - { - // Starting brand new job - if (_isSingleResource) - { - UriToStreamJobPart part = default; - try - { - // Single resource transfer, we can skip to chunking the job. - part = await UriToStreamJobPart.CreateJobPartAsync( - job: this, - partNumber: partNumber).ConfigureAwait(false); - AppendJobPart(part); - await OnAllResourcesEnumerated().ConfigureAwait(false); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - yield return part; - } - else - { - await foreach (JobPartInternal part in GetStorageResourcesAsync().ConfigureAwait(false)) - { - yield return part; - } - } - } - else - { - // Resuming old job with existing job parts - foreach (JobPartInternal part in _jobParts) - { - if (!part.JobPartStatus.HasCompletedSuccessfully) - { - part.JobPartStatus.SetTransferStateChange(DataTransferState.Queued); - yield return part; - } - } - - if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false)) - { - await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false)) - { - yield return jobPartInternal; - } - } - } - - // Call regardless of the outcome of enumeration so job can pause/finish - await OnEnumerationComplete().ConfigureAwait(false); - } - - private async IAsyncEnumerable GetStorageResourcesAsync() - { - // Start the partNumber based on the last part number. If this is a new job, - // the count will automatically be at 0 (the beginning). - int partNumber = _jobParts.Count; - HashSet existingSources = GetJobPartSourceResourcePaths(); - // Call listing operation on the source container - IAsyncEnumerator enumerator; - - // Obtain enumerator and check for any point of failure before we attempt to list - // and fail gracefully. - try - { - enumerator = _sourceResourceContainer.GetStorageResourcesAsync( - cancellationToken: _cancellationToken).GetAsyncEnumerator(); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - - // List the container in this specific way because MoveNext needs to be separately wrapped - // in a try/catch as we can't yield return inside a try/catch. - bool enumerationCompleted = false; - while (!enumerationCompleted) - { - try - { - _cancellationToken.ThrowIfCancellationRequested(); - if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) - { - await OnAllResourcesEnumerated().ConfigureAwait(false); - enumerationCompleted = true; - continue; - } - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - - StorageResource current = enumerator.Current; - if (!current.IsContainer && - !existingSources.Contains(current.Uri)) - { - string containerUriPath = _sourceResourceContainer.Uri.GetPath(); - string sourceName = string.IsNullOrEmpty(containerUriPath) - ? current.Uri.GetPath() - : current.Uri.GetPath().Substring(containerUriPath.Length + 1); - - UriToStreamJobPart part; - try - { - part = await UriToStreamJobPart.CreateJobPartAsync( - job: this, - partNumber: partNumber, - sourceResource: (StorageResourceItem)current, - destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName, default)) - .ConfigureAwait(false); - AppendJobPart(part); - } - catch (Exception ex) - { - await InvokeFailedArgAsync(ex).ConfigureAwait(false); - yield break; - } - yield return part; - partNumber++; - } - } - } - } -} diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/CleanUpTransferTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/CleanUpTransferTests.cs index 9b7628e53ac2..985cc958ba89 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/CleanUpTransferTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/CleanUpTransferTests.cs @@ -81,7 +81,7 @@ private Mock GetRemoteDestinationResource(bool throwOnDelet private void AssertBaseSource(Mock source) { - source.Verify(b => b.Uri, Times.Exactly(6)); + source.Verify(b => b.Uri, Times.Exactly(8)); source.Verify(b => b.ProviderId, Times.Once()); source.Verify(b => b.ResourceId, Times.Once()); source.Verify(b => b.Length, Times.Once()); @@ -111,7 +111,7 @@ public async Task CleanupAfterFailureAsync() // Assert AssertBaseSource(sourceMock); - destMock.Verify(b => b.Uri, Times.Exactly(5)); + destMock.Verify(b => b.Uri, Times.Exactly(6)); destMock.Verify(b => b.ProviderId, Times.Once()); destMock.Verify(b => b.ResourceId, Times.Once()); destMock.Verify(b => b.MaxSupportedChunkSize, Times.Exactly(2)); @@ -154,7 +154,7 @@ public async Task ErrorThrownDuringCleanup() // Assert AssertBaseSource(sourceMock); - destMock.Verify(b => b.Uri, Times.Exactly(5)); + destMock.Verify(b => b.Uri, Times.Exactly(6)); destMock.Verify(b => b.ProviderId, Times.Once()); destMock.Verify(b => b.ResourceId, Times.Once()); destMock.Verify(b => b.MaxSupportedChunkSize, Times.Exactly(2)); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/ServiceToServiceJobPartTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/ServiceToServiceJobPartTests.cs index 7c3355779d0c..ce5ce1d569f8 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/ServiceToServiceJobPartTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/ServiceToServiceJobPartTests.cs @@ -145,10 +145,12 @@ await checkpointer.AddNewJobAsync( Mock mockPartQueueChunkTask = GetPartQueueChunkTask(); - ServiceToServiceTransferJob job = new( + TransferJobInternal job = new( new DataTransfer(id: transferId), mockSource.Object, mockDestination.Object, + ServiceToServiceJobPart.CreateJobPartAsync, + ServiceToServiceJobPart.CreateJobPartAsync, new DataTransferOptions(), checkpointer, DataTransferErrorMode.StopOnAnyFailure, @@ -156,7 +158,7 @@ await checkpointer.AddNewJobAsync( new ClientDiagnostics(ClientOptions.Default)); ServiceToServiceJobPart jobPart = await ServiceToServiceJobPart.CreateJobPartAsync( job, - 1); + 1) as ServiceToServiceJobPart; jobPart.SetQueueChunkDelegate(mockPartQueueChunkTask.Object); // Act @@ -212,10 +214,12 @@ await checkpointer.AddNewJobAsync( Mock mockPartQueueChunkTask = GetPartQueueChunkTask(); - ServiceToServiceTransferJob job = new( + TransferJobInternal job = new( new DataTransfer(id: transferId), mockSource.Object, mockDestination.Object, + ServiceToServiceJobPart.CreateJobPartAsync, + ServiceToServiceJobPart.CreateJobPartAsync, new DataTransferOptions(), checkpointer, DataTransferErrorMode.StopOnAnyFailure, @@ -223,7 +227,7 @@ await checkpointer.AddNewJobAsync( new ClientDiagnostics(ClientOptions.Default)); ServiceToServiceJobPart jobPart = await ServiceToServiceJobPart.CreateJobPartAsync( job, - 1); + 1) as ServiceToServiceJobPart; jobPart.SetQueueChunkDelegate(mockPartQueueChunkTask.Object); await jobPart.ProcessPartToChunkAsync(); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/StreamToUriJobPartTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/StreamToUriJobPartTests.cs index 00631658a617..a308aec49c0f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/StreamToUriJobPartTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/StreamToUriJobPartTests.cs @@ -158,10 +158,12 @@ await checkpointer.AddNewJobAsync( source: mockSource.Object, destination: mockDestination.Object); - StreamToUriTransferJob job = new( + TransferJobInternal job = new( new DataTransfer(id: transferId), mockSource.Object, mockDestination.Object, + StreamToUriJobPart.CreateJobPartAsync, + StreamToUriJobPart.CreateJobPartAsync, new DataTransferOptions(), checkpointer, DataTransferErrorMode.StopOnAnyFailure, @@ -169,7 +171,7 @@ await checkpointer.AddNewJobAsync( new ClientDiagnostics(ClientOptions.Default)); StreamToUriJobPart jobPart = await StreamToUriJobPart.CreateJobPartAsync( job, - 1); + 1) as StreamToUriJobPart; jobPart.SetQueueChunkDelegate(mockPartQueueChunkTask.Object); // Act @@ -261,10 +263,12 @@ await checkpointer.AddNewJobAsync( Mock mockPartQueueChunkTask = MockQueueInternalTasks.GetPartQueueChunkTask(); - StreamToUriTransferJob job = new( + TransferJobInternal job = new( new DataTransfer(id: transferId), mockSource.Object, mockDestination.Object, + StreamToUriJobPart.CreateJobPartAsync, + StreamToUriJobPart.CreateJobPartAsync, new DataTransferOptions(), checkpointer, DataTransferErrorMode.StopOnAnyFailure, @@ -272,7 +276,7 @@ await checkpointer.AddNewJobAsync( new ClientDiagnostics(ClientOptions.Default)); StreamToUriJobPart jobPart = await StreamToUriJobPart.CreateJobPartAsync( job, - 1); + 1) as StreamToUriJobPart; jobPart.SetQueueChunkDelegate(mockPartQueueChunkTask.Object); // Act