diff --git a/sdk/storage/Azure.Storage.Common/tests/Shared/ScopeManager.cs b/sdk/storage/Azure.Storage.Common/tests/Shared/ScopeManager.cs
new file mode 100644
index 000000000000..43d48f99458e
--- /dev/null
+++ b/sdk/storage/Azure.Storage.Common/tests/Shared/ScopeManager.cs
@@ -0,0 +1,41 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+
+namespace Azure.Storage.Test.Shared;
+
+///
+/// Class for managing IDisposable objects to determine if an
+/// arbitrary concept is "in scope." Great for temporarily
+/// enabling a behavior.
+///
+public class ScopeManager
+{
+ private class Scope : IDisposable
+ {
+ private readonly ScopeManager _parent;
+
+ public Scope(ScopeManager parent)
+ {
+ _parent = parent;
+ }
+
+ public void Dispose()
+ {
+ _parent._scopes.Remove(this);
+ }
+ }
+
+ private readonly HashSet _scopes = new();
+
+ public bool InScope => _scopes.Count > 0;
+
+ public IDisposable GetScope()
+ {
+ Scope scope = new(this);
+ _scopes.Add(scope);
+ return scope;
+ }
+}
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs
index ade8ed94c82e..bd98dc5a1976 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs
@@ -39,7 +39,7 @@ internal JobBuilder(
ClientDiagnostics = clientDiagnostics;
}
- public async Task<(DataTransfer Transfer, TransferJobInternal TransferInternal)> BuildJobAsync(
+ public virtual async Task<(DataTransfer Transfer, TransferJobInternal TransferInternal)> BuildJobAsync(
StorageResource sourceResource,
StorageResource destinationResource,
DataTransferOptions transferOptions,
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs
index 0f9eb1b7f429..4d9463f6ccfa 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs
@@ -179,10 +179,11 @@ public override async Task ProcessPartToChunkAsync()
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
- long? fileLength = _sourceResource.Length;
+ long? fileLength = default;
StorageResourceItemProperties sourceProperties = default;
try
{
+ fileLength = _sourceResource.Length;
sourceProperties = await _sourceResource.GetPropertiesAsync(_cancellationToken).ConfigureAwait(false);
await _destinationResource.SetPermissionsAsync(
_sourceResource,
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs
index d7c1aafd0e77..5d845ee23273 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs
@@ -354,6 +354,9 @@ await _checkpointer.AddNewJobAsync(
destinationResource,
cancellationToken).ConfigureAwait(false);
+ // TODO: if the below fails for any reason, this job will still be in the checkpointer.
+ // That seems not desirable.
+
DataTransfer dataTransfer = await BuildAndAddTransferJobAsync(
sourceResource,
destinationResource,
diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/StorageResourceTestWrappers.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/StorageResourceTestWrappers.cs
new file mode 100644
index 000000000000..732697d8a409
--- /dev/null
+++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/StorageResourceTestWrappers.cs
@@ -0,0 +1,162 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Storage.Test.Shared;
+
+namespace Azure.Storage.DataMovement.Tests.Shared;
+
+public class InjectedFailureException : Exception { }
+
+///
+/// Wraps all calls to a with toggleable throwing.
+///
+public class StorageResourceItemFailureWrapper : StorageResourceItem
+{
+ private readonly ScopeManager _throwScopeManager;
+ private readonly StorageResourceItem _inner;
+
+ public StorageResourceItemFailureWrapper(StorageResourceItem inner, ScopeManager throwScopeManager = default)
+ {
+ _inner = inner;
+ _throwScopeManager = throwScopeManager ?? new();
+ }
+
+ public IDisposable ThrowScope() => _throwScopeManager.GetScope();
+
+ private T ThrowOr(T result) => _throwScopeManager.InScope
+ ? throw new InjectedFailureException()
+ : result;
+ private T ThrowOrDo(Func func) => _throwScopeManager.InScope
+ ? throw new InjectedFailureException()
+ : func();
+
+ #region Passthru
+ public override Uri Uri => ThrowOr(_inner.Uri);
+
+ public override string ProviderId => ThrowOr(_inner.ProviderId);
+
+ protected internal override string ResourceId => ThrowOr(_inner.ResourceId);
+
+ protected internal override DataTransferOrder TransferType => ThrowOr(_inner.TransferType);
+
+ protected internal override long MaxSupportedChunkSize => ThrowOr(_inner.MaxSupportedChunkSize);
+
+ protected internal override long? Length => ThrowOr(_inner.Length);
+
+ protected internal override Task CompleteTransferAsync(bool overwrite, StorageResourceCompleteTransferOptions completeTransferOptions = null, CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.CompleteTransferAsync(overwrite, completeTransferOptions, cancellationToken));
+
+ protected internal override Task CopyBlockFromUriAsync(StorageResourceItem sourceResource, HttpRange range, bool overwrite, long completeLength, StorageResourceCopyFromUriOptions options = null, CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.CopyBlockFromUriAsync(sourceResource, range, overwrite, completeLength, options, cancellationToken));
+
+ protected internal override Task CopyFromStreamAsync(Stream stream, long streamLength, bool overwrite, long completeLength, StorageResourceWriteToOffsetOptions options = null, CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.CopyFromStreamAsync(stream, streamLength, overwrite, completeLength, options, cancellationToken));
+
+ protected internal override Task CopyFromUriAsync(StorageResourceItem sourceResource, bool overwrite, long completeLength, StorageResourceCopyFromUriOptions options = null, CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.CopyFromUriAsync(sourceResource, overwrite, completeLength, options, cancellationToken));
+
+ protected internal override Task DeleteIfExistsAsync(CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.DeleteIfExistsAsync(cancellationToken));
+
+ protected internal override Task GetCopyAuthorizationHeaderAsync(CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.GetCopyAuthorizationHeaderAsync(cancellationToken));
+
+ protected internal override StorageResourceCheckpointData GetDestinationCheckpointData()
+ => ThrowOrDo(_inner.GetDestinationCheckpointData);
+
+ protected internal override Task GetPermissionsAsync(StorageResourceItemProperties properties = null, CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.GetPermissionsAsync(properties, cancellationToken));
+
+ protected internal override Task GetPropertiesAsync(CancellationToken token = default)
+ => ThrowOrDo(() => _inner.GetPropertiesAsync(token));
+
+ protected internal override StorageResourceCheckpointData GetSourceCheckpointData()
+ => ThrowOrDo(_inner.GetSourceCheckpointData);
+
+ protected internal override Task ReadStreamAsync(long position = 0, long? length = null, CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.ReadStreamAsync(position, length, cancellationToken));
+
+ protected internal override Task SetPermissionsAsync(StorageResourceItem sourceResource, StorageResourceItemProperties sourceProperties, CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.SetPermissionsAsync(sourceResource, sourceProperties, cancellationToken));
+ #endregion
+}
+
+///
+/// Wraps all calls to a with toggleable throwing.
+///
+public class StorageResourceContainerFailureWrapper : StorageResourceContainer
+{
+ private readonly ScopeManager _throwScopeManager;
+ private readonly StorageResourceContainer _inner;
+
+ public StorageResourceContainerFailureWrapper(StorageResourceContainer inner, ScopeManager throwScopeManager = default)
+ {
+ _inner = inner;
+ _throwScopeManager = throwScopeManager ?? new();
+ }
+
+ public IDisposable ThrowScope() => _throwScopeManager.GetScope();
+
+ private T ThrowOr(T result) => _throwScopeManager.InScope
+ ? throw new InjectedFailureException()
+ : result;
+ private T ThrowOrDo(Func func) => _throwScopeManager.InScope
+ ? throw new InjectedFailureException()
+ : func();
+
+ protected internal override StorageResourceItem GetStorageResourceReference(string path, string resourceId)
+ => ThrowOrDo(() => new StorageResourceItemFailureWrapper(_inner.GetStorageResourceReference(path, resourceId), _throwScopeManager));
+
+ protected internal override StorageResourceContainer GetChildStorageResourceContainer(string path)
+ => ThrowOrDo(() => new StorageResourceContainerFailureWrapper(_inner.GetChildStorageResourceContainer(path), _throwScopeManager));
+
+ protected internal override async IAsyncEnumerable GetStorageResourcesAsync(StorageResourceContainer destinationContainer = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ if (_throwScopeManager.InScope)
+ {
+ throw new InjectedFailureException();
+ }
+
+ await foreach (StorageResource resource in _inner.GetStorageResourcesAsync(destinationContainer, cancellationToken))
+ {
+ if (resource is StorageResourceItem item)
+ {
+ yield return new StorageResourceItemFailureWrapper(item, _throwScopeManager);
+ }
+ else if (resource is StorageResourceContainer container)
+ {
+ yield return new StorageResourceContainerFailureWrapper(container, _throwScopeManager);
+ }
+ else
+ {
+ yield return resource;
+ }
+ }
+ }
+
+ public override string ToString()
+ {
+ return base.ToString();
+ }
+
+ #region Passthru
+ public override Uri Uri => ThrowOr(_inner.Uri);
+
+ public override string ProviderId => ThrowOr(_inner.ProviderId);
+
+ protected internal override Task CreateIfNotExistsAsync(CancellationToken cancellationToken = default)
+ => ThrowOrDo(() => _inner.CreateIfNotExistsAsync(cancellationToken));
+
+ protected internal override StorageResourceCheckpointData GetDestinationCheckpointData()
+ => ThrowOrDo(_inner.GetDestinationCheckpointData);
+
+ protected internal override StorageResourceCheckpointData GetSourceCheckpointData()
+ => ThrowOrDo(_inner.GetSourceCheckpointData);
+ #endregion
+}
diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs
index 3a9cf04c919d..c60d2ed4c5c8 100644
--- a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs
@@ -11,6 +11,7 @@
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Pipeline;
+using Azure.Storage.DataMovement.Tests.Shared;
using Moq;
using NUnit.Framework;
@@ -18,6 +19,35 @@ namespace Azure.Storage.DataMovement.Tests;
public class TransferManagerTests
{
+ public static IEnumerable AllTransferDirections()
+ => Enum.GetValues(typeof(TransferDirection)).Cast();
+
+ private static (StepProcessor JobProcessor, StepProcessor PartProcessor, StepProcessor> ChunkProcessor) StepProcessors()
+ => (new(), new(), new());
+
+ private static (StorageResource Source, StorageResource Destination, Func SrcThrowScope, Func DstThrowScope)
+ GetBasicSetupResources(bool isContainer, Uri srcUri, Uri dstUri)
+ {
+ if (isContainer)
+ {
+ Mock srcContainer = new(MockBehavior.Strict);
+ Mock dstContainer = new(MockBehavior.Strict);
+ (srcContainer, dstContainer).BasicSetup(srcUri, dstUri);
+ StorageResourceContainerFailureWrapper srcWrapper = new(srcContainer.Object);
+ StorageResourceContainerFailureWrapper dstWrapper = new(dstContainer.Object);
+ return (srcWrapper, dstWrapper, srcWrapper.ThrowScope, dstWrapper.ThrowScope);
+ }
+ else
+ {
+ Mock srcItem = new(MockBehavior.Strict);
+ Mock dstItem = new(MockBehavior.Strict);
+ (srcItem, dstItem).BasicSetup(srcUri, dstUri);
+ StorageResourceItemFailureWrapper srcWrapper = new(srcItem.Object);
+ StorageResourceItemFailureWrapper dstWrapper = new(dstItem.Object);
+ return (srcWrapper, dstWrapper, srcWrapper.ThrowScope, dstWrapper.ThrowScope);
+ }
+ }
+
[Test]
public async Task BasicProcessorLifetime()
{
@@ -63,10 +93,8 @@ public async Task BasicItemTransfer(
Uri srcUri = new("file:///foo/bar");
Uri dstUri = new("https://example.com/fizz/buzz");
- StepProcessor jobsProcessor = new();
- StepProcessor partsProcessor = new();
- StepProcessor> chunksProcessor = new();
- Mock jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default));
+ (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors();
+ JobBuilder jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default));
Mock checkpointer = new();
var resources = Enumerable.Range(0, items).Select(_ =>
@@ -83,7 +111,7 @@ public async Task BasicItemTransfer(
jobsProcessor,
partsProcessor,
chunksProcessor,
- jobBuilder.Object,
+ jobBuilder,
checkpointer.Object,
default);
@@ -171,10 +199,8 @@ public async Task BasicContainerTransfer(
Uri srcUri = new("file:///foo/bar");
Uri dstUri = new("https://example.com/fizz/buzz");
- StepProcessor jobsProcessor = new();
- StepProcessor partsProcessor = new();
- StepProcessor> chunksProcessor = new();
- Mock jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default));
+ (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors();
+ JobBuilder jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default));
Mock checkpointer = new();
var resources = Enumerable.Range(1, numJobs).Select(i =>
@@ -189,7 +215,7 @@ public async Task BasicContainerTransfer(
jobsProcessor,
partsProcessor,
chunksProcessor,
- jobBuilder.Object,
+ jobBuilder,
checkpointer.Object,
default);
@@ -254,6 +280,156 @@ public async Task BasicContainerTransfer(
Assert.That(transfer.HasCompleted);
}
}
+
+ [Test]
+ [Combinatorial]
+ public async Task TransferFailAtQueue(
+ [Values(0, 1)] int failAt,
+ [Values(true, false)] bool isContainer)
+ {
+ Uri srcUri = new("file:///foo/bar");
+ Uri dstUri = new("https://example.com/fizz/buzz");
+
+ (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors();
+ Mock jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default))
+ {
+ CallBase = true,
+ };
+ Mock checkpointer = new();
+
+ (StorageResource srcResource, StorageResource dstResource, Func srcThrowScope, Func dstThrowScope)
+ = GetBasicSetupResources(isContainer, srcUri, dstUri);
+
+ Exception expectedException = new();
+ switch (failAt)
+ {
+ case 0:
+ jobBuilder.Setup(b => b.BuildJobAsync(It.IsAny(), It.IsAny(),
+ It.IsAny(), It.IsAny(), It.IsAny(),
+ It.IsAny(), It.IsAny())
+ ).Throws(expectedException);
+ break;
+ case 1:
+ checkpointer.Setup(c => c.AddNewJobAsync(It.IsAny(), It.IsAny(),
+ It.IsAny(), It.IsAny())
+ ).Throws(expectedException);
+ break;
+ }
+
+ await using TransferManager transferManager = new(
+ jobsProcessor,
+ partsProcessor,
+ chunksProcessor,
+ jobBuilder.Object,
+ checkpointer.Object,
+ default);
+
+ DataTransfer transfer = null;
+
+ Assert.That(async () => transfer = await transferManager.StartTransferAsync(
+ srcResource,
+ dstResource), Throws.Exception.EqualTo(expectedException));
+
+ Assert.That(transfer, Is.Null);
+
+ // TODO determine if checkpointer still has the job tracked even though it failed to queue (it shouldn't)
+ // need checkpointer API refactor for this
+ }
+
+ [Test]
+ public async Task TransferFailAtJobProcess(
+ [Values(true, false)] bool isContainer,
+ [ValueSource(nameof(AllTransferDirections))] TransferDirection direction)
+ {
+ Uri srcUri = new(direction == TransferDirection.Upload ? "file:///foo/bar" : "https://example.com/foo/bar");
+ Uri dstUri = new(direction == TransferDirection.Download ? "file:///fizz/buzz" : "https://example.com/fizz/buzz");
+
+ (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors();
+ JobBuilder jobBuilder = new(ArrayPool.Shared, default, new(ClientOptions.Default));
+ Mock checkpointer = new(MockBehavior.Loose);
+
+ (StorageResource srcResource, StorageResource dstResource, Func srcThrowScope, Func dstThrowScope)
+ = GetBasicSetupResources(isContainer, srcUri, dstUri);
+
+ await using TransferManager transferManager = new(
+ jobsProcessor,
+ partsProcessor,
+ chunksProcessor,
+ jobBuilder,
+ checkpointer.Object,
+ default);
+
+ //Exception expectedException = new();
+ //checkpointer.Setup(c => c.AddNewJobPartAsync(It.IsAny(), It.IsAny(), It.IsAny(),
+ // It.IsAny())
+ //).Throws(expectedException);
+
+ // need to listen to events to get exception that takes place in processing
+ List failures = new();
+ DataTransferOptions options = new();
+ options.ItemTransferFailed += e => { failures.Add(e); return Task.CompletedTask; };
+
+ DataTransfer transfer = await transferManager.StartTransferAsync(srcResource, dstResource);
+
+ using (srcThrowScope())
+ {
+ Assert.That(await jobsProcessor.TryStepAsync(), Is.True);
+ }
+ Assert.That(jobsProcessor.ItemsInQueue, Is.Zero);
+ Assert.That(partsProcessor.ItemsInQueue, Is.Zero); // because of failure
+ // TODO Failures in processing job into job part(s) should surface errors (currently doesn't)
+ // Assert.That(transfer.TransferStatus.HasFailedItems);
+ // Assert.That(failures, Is.Not.Empty);
+ // TODO determine checkpointer status of job parts
+ // need checkpointer API refactor for this
+ }
+
+ [Test]
+ public async Task TransferFailAtPartProcess(
+ [Values(true, false)] bool isContainer,
+ [ValueSource(nameof(AllTransferDirections))] TransferDirection direction)
+ {
+ Uri srcUri = new(direction == TransferDirection.Upload ? "file:///foo/bar" : "https://example.com/foo/bar");
+ Uri dstUri = new(direction == TransferDirection.Download ? "file:///fizz/buzz" : "https://example.com/fizz/buzz");
+
+ (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors();
+ JobBuilder jobBuilder = new(ArrayPool.Shared, default, new(ClientOptions.Default));
+ Mock checkpointer = new(MockBehavior.Loose);
+
+ (StorageResource srcResource, StorageResource dstResource, Func srcThrowScope, Func dstThrowScope)
+ = GetBasicSetupResources(isContainer, srcUri, dstUri);
+
+ await using TransferManager transferManager = new(
+ jobsProcessor,
+ partsProcessor,
+ chunksProcessor,
+ jobBuilder,
+ checkpointer.Object,
+ default);
+
+ // need to listen to events to get exception that takes place in processing
+ List failures = new();
+ DataTransferOptions options = new();
+ options.ItemTransferFailed += e => { failures.Add(e); return Task.CompletedTask; };
+
+ DataTransfer transfer = await transferManager.StartTransferAsync(srcResource, dstResource, options);
+
+ Assert.That(await jobsProcessor.TryStepAsync(), Is.True);
+ Assert.That(jobsProcessor.ItemsInQueue, Is.Zero);
+ Assert.That(partsProcessor.ItemsInQueue, Is.AtLeast(1));
+
+ using (srcThrowScope())
+ {
+ Assert.That(await partsProcessor.StepAll(), Is.AtLeast(1));
+ }
+ Assert.That(partsProcessor.ItemsInQueue, Is.Zero);
+ Assert.That(chunksProcessor.ItemsInQueue, Is.Zero); // because of failure
+
+ Assert.That(transfer.TransferStatus.HasFailedItems);
+ Assert.That(failures, Is.Not.Empty);
+ // TODO determine checkpointer status of job chunks
+ // need checkpointer API refactor for this
+ }
}
internal static partial class MockExtensions
@@ -281,6 +457,8 @@ public static void BasicSetup(
items.Source.SetupGet(r => r.ResourceId).Returns("Mock");
items.Destination.SetupGet(r => r.ResourceId).Returns("Mock");
+ items.Source.SetupGet(r => r.Length).Returns(itemSize);
+
items.Destination.SetupGet(r => r.TransferType).Returns(default(DataTransferOrder));
items.Destination.SetupGet(r => r.MaxSupportedChunkSize).Returns(Constants.GB);