Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions sdk/storage/Azure.Storage.Common/tests/Shared/ScopeManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;

namespace Azure.Storage.Test.Shared;

/// <summary>
/// Class for managing IDisposable objects to determine if an
/// arbitrary concept is "in scope." Great for temporarily
/// enabling a behavior.
/// </summary>
public class ScopeManager
{
private class Scope : IDisposable
{
private readonly ScopeManager _parent;

public Scope(ScopeManager parent)
{
_parent = parent;
}

public void Dispose()
{
_parent._scopes.Remove(this);
}
}

private readonly HashSet<Scope> _scopes = new();

public bool InScope => _scopes.Count > 0;

public IDisposable GetScope()
{
Scope scope = new(this);
_scopes.Add(scope);
return scope;
}
}
2 changes: 1 addition & 1 deletion sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ internal JobBuilder(
ClientDiagnostics = clientDiagnostics;
}

public async Task<(DataTransfer Transfer, TransferJobInternal TransferInternal)> BuildJobAsync(
public virtual async Task<(DataTransfer Transfer, TransferJobInternal TransferInternal)> BuildJobAsync(
StorageResource sourceResource,
StorageResource destinationResource,
DataTransferOptions transferOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,11 @@ public override async Task ProcessPartToChunkAsync()
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);

long? fileLength = _sourceResource.Length;
long? fileLength = default;
StorageResourceItemProperties sourceProperties = default;
try
{
fileLength = _sourceResource.Length;
sourceProperties = await _sourceResource.GetPropertiesAsync(_cancellationToken).ConfigureAwait(false);
await _destinationResource.SetPermissionsAsync(
_sourceResource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ await _checkpointer.AddNewJobAsync(
destinationResource,
cancellationToken).ConfigureAwait(false);

// TODO: if the below fails for any reason, this job will still be in the checkpointer.
// That seems not desirable.

DataTransfer dataTransfer = await BuildAndAddTransferJobAsync(
sourceResource,
destinationResource,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Test.Shared;

namespace Azure.Storage.DataMovement.Tests.Shared;

public class InjectedFailureException : Exception { }

/// <summary>
/// Wraps all calls to a <see cref="StorageResourceItem"/> with toggleable throwing.
/// </summary>
public class StorageResourceItemFailureWrapper : StorageResourceItem
{
private readonly ScopeManager _throwScopeManager;
private readonly StorageResourceItem _inner;

public StorageResourceItemFailureWrapper(StorageResourceItem inner, ScopeManager throwScopeManager = default)
{
_inner = inner;
_throwScopeManager = throwScopeManager ?? new();
}

public IDisposable ThrowScope() => _throwScopeManager.GetScope();

private T ThrowOr<T>(T result) => _throwScopeManager.InScope
? throw new InjectedFailureException()
: result;
private T ThrowOrDo<T>(Func<T> func) => _throwScopeManager.InScope
? throw new InjectedFailureException()
: func();

#region Passthru
public override Uri Uri => ThrowOr(_inner.Uri);

public override string ProviderId => ThrowOr(_inner.ProviderId);

protected internal override string ResourceId => ThrowOr(_inner.ResourceId);

protected internal override DataTransferOrder TransferType => ThrowOr(_inner.TransferType);

protected internal override long MaxSupportedChunkSize => ThrowOr(_inner.MaxSupportedChunkSize);

protected internal override long? Length => ThrowOr(_inner.Length);

protected internal override Task CompleteTransferAsync(bool overwrite, StorageResourceCompleteTransferOptions completeTransferOptions = null, CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.CompleteTransferAsync(overwrite, completeTransferOptions, cancellationToken));

protected internal override Task CopyBlockFromUriAsync(StorageResourceItem sourceResource, HttpRange range, bool overwrite, long completeLength, StorageResourceCopyFromUriOptions options = null, CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.CopyBlockFromUriAsync(sourceResource, range, overwrite, completeLength, options, cancellationToken));

protected internal override Task CopyFromStreamAsync(Stream stream, long streamLength, bool overwrite, long completeLength, StorageResourceWriteToOffsetOptions options = null, CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.CopyFromStreamAsync(stream, streamLength, overwrite, completeLength, options, cancellationToken));

protected internal override Task CopyFromUriAsync(StorageResourceItem sourceResource, bool overwrite, long completeLength, StorageResourceCopyFromUriOptions options = null, CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.CopyFromUriAsync(sourceResource, overwrite, completeLength, options, cancellationToken));

protected internal override Task<bool> DeleteIfExistsAsync(CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.DeleteIfExistsAsync(cancellationToken));

protected internal override Task<HttpAuthorization> GetCopyAuthorizationHeaderAsync(CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.GetCopyAuthorizationHeaderAsync(cancellationToken));

protected internal override StorageResourceCheckpointData GetDestinationCheckpointData()
=> ThrowOrDo(_inner.GetDestinationCheckpointData);

protected internal override Task<string> GetPermissionsAsync(StorageResourceItemProperties properties = null, CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.GetPermissionsAsync(properties, cancellationToken));

protected internal override Task<StorageResourceItemProperties> GetPropertiesAsync(CancellationToken token = default)
=> ThrowOrDo(() => _inner.GetPropertiesAsync(token));

protected internal override StorageResourceCheckpointData GetSourceCheckpointData()
=> ThrowOrDo(_inner.GetSourceCheckpointData);

protected internal override Task<StorageResourceReadStreamResult> ReadStreamAsync(long position = 0, long? length = null, CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.ReadStreamAsync(position, length, cancellationToken));

protected internal override Task SetPermissionsAsync(StorageResourceItem sourceResource, StorageResourceItemProperties sourceProperties, CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.SetPermissionsAsync(sourceResource, sourceProperties, cancellationToken));
#endregion
}

/// <summary>
/// Wraps all calls to a <see cref="StorageResourceContainer"/> with toggleable throwing.
/// </summary>
public class StorageResourceContainerFailureWrapper : StorageResourceContainer
{
private readonly ScopeManager _throwScopeManager;
private readonly StorageResourceContainer _inner;

public StorageResourceContainerFailureWrapper(StorageResourceContainer inner, ScopeManager throwScopeManager = default)
{
_inner = inner;
_throwScopeManager = throwScopeManager ?? new();
}

public IDisposable ThrowScope() => _throwScopeManager.GetScope();

private T ThrowOr<T>(T result) => _throwScopeManager.InScope
? throw new InjectedFailureException()
: result;
private T ThrowOrDo<T>(Func<T> func) => _throwScopeManager.InScope
? throw new InjectedFailureException()
: func();

protected internal override StorageResourceItem GetStorageResourceReference(string path, string resourceId)
=> ThrowOrDo(() => new StorageResourceItemFailureWrapper(_inner.GetStorageResourceReference(path, resourceId), _throwScopeManager));

protected internal override StorageResourceContainer GetChildStorageResourceContainer(string path)
=> ThrowOrDo(() => new StorageResourceContainerFailureWrapper(_inner.GetChildStorageResourceContainer(path), _throwScopeManager));

protected internal override async IAsyncEnumerable<StorageResource> GetStorageResourcesAsync(StorageResourceContainer destinationContainer = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (_throwScopeManager.InScope)
{
throw new InjectedFailureException();
}

await foreach (StorageResource resource in _inner.GetStorageResourcesAsync(destinationContainer, cancellationToken))
{
if (resource is StorageResourceItem item)
{
yield return new StorageResourceItemFailureWrapper(item, _throwScopeManager);
}
else if (resource is StorageResourceContainer container)
{
yield return new StorageResourceContainerFailureWrapper(container, _throwScopeManager);
}
else
{
yield return resource;
}
}
}

public override string ToString()
{
return base.ToString();
}

#region Passthru
public override Uri Uri => ThrowOr(_inner.Uri);

public override string ProviderId => ThrowOr(_inner.ProviderId);

protected internal override Task CreateIfNotExistsAsync(CancellationToken cancellationToken = default)
=> ThrowOrDo(() => _inner.CreateIfNotExistsAsync(cancellationToken));

protected internal override StorageResourceCheckpointData GetDestinationCheckpointData()
=> ThrowOrDo(_inner.GetDestinationCheckpointData);

protected internal override StorageResourceCheckpointData GetSourceCheckpointData()
=> ThrowOrDo(_inner.GetSourceCheckpointData);
#endregion
}
Loading