Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ await QueueChunk(
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
Interlocked.Increment(ref _completedChunkCount);
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
Expand All @@ -234,9 +234,9 @@ await QueueChunk(
}

/// <summary>
/// Processes the job to job parts
/// Processes the job part to chunks
/// </summary>
/// <returns>An IEnumerable that contains the job chunks</returns>
/// <returns>The task that's queueing up the chunks</returns>
public abstract Task ProcessPartToChunkAsync();

/// <summary>
Expand Down Expand Up @@ -285,11 +285,11 @@ internal async Task OnTransferStateChangedAsync(DataTransferState transferState)
else if (JobPartStatus.HasCompletedSuccessfully)
{
_progressTracker.IncrementCompletedFiles();
await InvokeSingleCompletedArg().ConfigureAwait(false);
await InvokeSingleCompletedArgAsync().ConfigureAwait(false);
}

// Set the status in the checkpointer
await SetCheckpointerStatus().ConfigureAwait(false);
await SetCheckpointerStatusAsync().ConfigureAwait(false);

await PartTransferStatusEventHandler.RaiseAsync(
new TransferStatusEventArgs(
Expand All @@ -313,7 +313,7 @@ internal void ReportBytesWritten(long bytesTransferred)
_progressTracker.IncrementBytesTransferred(bytesTransferred);
}

public async virtual Task InvokeSingleCompletedArg()
public async virtual Task InvokeSingleCompletedArgAsync()
{
if (SingleTransferCompletedEventHandler != null)
{
Expand All @@ -334,7 +334,7 @@ await SingleTransferCompletedEventHandler.RaiseAsync(
/// <summary>
/// Invokes Skipped Argument Event.
/// </summary>
public async virtual Task InvokeSkippedArg()
public async virtual Task InvokeSkippedArgAsync()
{
if (TransferSkippedEventHandler != null)
{
Expand Down Expand Up @@ -375,7 +375,7 @@ await PartTransferStatusEventHandler.RaiseAsync(
/// <summary>
/// Invokes Failed Argument Event.
/// </summary>
public async virtual Task InvokeFailedArg(Exception ex)
public async virtual Task InvokeFailedArgAsync(Exception ex)
{
if (ex is not OperationCanceledException &&
ex is not TaskCanceledException &&
Expand Down Expand Up @@ -485,7 +485,7 @@ await _checkpointer.AddNewJobPartAsync(
}
}

internal async virtual Task SetCheckpointerStatus()
internal async virtual Task SetCheckpointerStatusAsync()
{
await _checkpointer.SetJobPartStatusAsync(
transferId: _dataTransfer.Id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public static ServiceToServiceJobPart CreateJobPartFromCheckpoint(
createPreference: createPreference);
}

/// <summary>
/// Processes the job part to chunks
/// </summary>
/// <returns>The task that's queueing up the chunks</returns>
public override async Task ProcessPartToChunkAsync()
{
try
Expand All @@ -195,7 +199,7 @@ await _destinationResource.SetPermissionsAsync(
fileLength = sourceProperties.ResourceLength;
if (!fileLength.HasValue)
{
await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false);
await InvokeFailedArgAsync(Errors.UnableToGetLength()).ConfigureAwait(false);
return;
}
long length = fileLength.Value;
Expand Down Expand Up @@ -244,7 +248,7 @@ await QueueStageBlockRequest(
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

Expand All @@ -269,17 +273,17 @@ await _destinationResource.CopyFromUriAsync(
when (_createMode == StorageResourceCreationPreference.SkipIfExists
&& exception.ErrorCode == "BlobAlreadyExists")
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (InvalidOperationException ex)
when (_createMode == StorageResourceCreationPreference.SkipIfExists
&& ex.Message.Contains("Cannot overwrite file."))
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -316,11 +320,11 @@ await _destinationResource.CopyBlockFromUriAsync(
when (_createMode == StorageResourceCreationPreference.SkipIfExists
&& exception.ErrorCode == "BlobAlreadyExists")
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
return false;
}
Expand Down Expand Up @@ -349,7 +353,7 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors(
QueuePutBlockTask = jobPart.QueueStageBlockRequest,
QueueCommitBlockTask = jobPart.CompleteTransferAsync,
ReportProgressInBytes = jobPart.ReportBytesWritten,
InvokeFailedHandler = jobPart.InvokeFailedArg,
InvokeFailedHandler = jobPart.InvokeFailedArgAsync,
};
}
#endregion
Expand All @@ -372,7 +376,7 @@ await _destinationResource.CompleteTransferAsync(
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -456,11 +460,11 @@ await _commitBlockHandler.InvokeEvent(
// before uploading to it.
if (_createMode == StorageResourceCreationPreference.FailIfExists)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
else // (_createMode == StorageResourceCreateMode.Skip)
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
Expand All @@ -481,21 +485,21 @@ await _commitBlockHandler.InvokeEvent(
{
// If the _commitBlockHandler has been disposed before we call to it
// we should at least filter the exception to error handling just in case.
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}
}

public override async Task InvokeSkippedArg()
public override async Task InvokeSkippedArgAsync()
{
DisposeHandlers();
await base.InvokeSkippedArg().ConfigureAwait(false);
await base.InvokeSkippedArgAsync().ConfigureAwait(false);
}

public override async Task InvokeFailedArg(Exception ex)
public override async Task InvokeFailedArgAsync(Exception ex)
{
DisposeHandlers();
await base.InvokeFailedArg(ex).ConfigureAwait(false);
await base.InvokeFailedArgAsync(ex).ConfigureAwait(false);
}

internal void DisposeHandlers()
Expand Down
24 changes: 12 additions & 12 deletions sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public static StreamToUriJobPart CreateJobPartFromCheckpoint(
}

/// <summary>
/// Processes the job to job parts
/// Processes the job part to chunks
/// </summary>
/// <returns>The task that's queueing up the chunks</returns>
public override async Task ProcessPartToChunkAsync()
Expand Down Expand Up @@ -244,12 +244,12 @@ await QueueStageBlockRequest(
else
{
// TODO: logging when given the event handler
await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false);
await InvokeFailedArgAsync(Errors.UnableToGetLength()).ConfigureAwait(false);
}
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

Expand All @@ -275,16 +275,16 @@ await InitialUploadCall(
catch (RequestFailedException r)
when (r.ErrorCode == "BlobAlreadyExists" && _createMode == StorageResourceCreationPreference.SkipIfExists)
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (InvalidOperationException i)
when (i.Message.Contains("Cannot overwrite file.") && _createMode == StorageResourceCreationPreference.SkipIfExists)
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}

// Do not continue if we need to skip or there was an error.
Expand Down Expand Up @@ -381,7 +381,7 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors(
QueuePutBlockTask = jobPart.QueueStageBlockRequest,
QueueCommitBlockTask = jobPart.CompleteTransferAsync,
ReportProgressInBytes = jobPart.ReportBytesWritten,
InvokeFailedHandler = jobPart.InvokeFailedArg,
InvokeFailedHandler = jobPart.InvokeFailedArgAsync,
};
}
#endregion
Expand Down Expand Up @@ -453,7 +453,7 @@ await _commitBlockHandler.InvokeEvent(
{
// If the _commitBlockHandler has been disposed before we call to it
// we should at least filter the exception to error handling just in case.
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}
}
Expand Down Expand Up @@ -555,16 +555,16 @@ private static async Task<Stream> GetOffsetPartitionInternal(
cancellationToken: cancellationToken).ConfigureAwait(false);
}

public override async Task InvokeSkippedArg()
public override async Task InvokeSkippedArgAsync()
{
DisposeHandlers();
await base.InvokeSkippedArg().ConfigureAwait(false);
await base.InvokeSkippedArgAsync().ConfigureAwait(false);
}

public override async Task InvokeFailedArg(Exception ex)
public override async Task InvokeFailedArgAsync(Exception ex)
{
DisposeHandlers();
await base.InvokeFailedArg(ex).ConfigureAwait(false);
await base.InvokeFailedArgAsync(ex).ConfigureAwait(false);
}

internal void DisposeHandlers()
Expand Down
Loading