Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private static async Task<IQueue<PartitionRangePageAsyncEnumerator<TPage, TState
rangeAndStates,
token);

if (maxConcurrency.HasValue)
if (maxConcurrency.HasValue && maxConcurrency.Value > 1)
{
await ParallelPrefetch.PrefetchInParallelAsync(bufferedEnumerators, maxConcurrency.Value, trace, token);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1629,10 +1629,8 @@ public async Task ItemMultiplePartitionQuery()

QueryRequestOptions requestOptions = new QueryRequestOptions()
{
MaxBufferedItemCount = 10,
ResponseContinuationTokenLimitInKb = 500,
MaxItemCount = 1,
MaxConcurrency = 1,
MaxConcurrency = -1,
};

FeedIterator<ToDoActivity> feedIterator = this.Container.GetItemQueryIterator<ToDoActivity>(
Expand All @@ -1654,7 +1652,8 @@ public async Task ItemMultiplePartitionQuery()
ServerSideCumulativeMetrics metrics = iter.Diagnostics.GetQueryMetrics();

if (metrics != null)
{
{
// This assumes that we are using parallel prefetch to hit multiple partitions concurrently
Assert.IsTrue(metrics.PartitionedMetrics.Count == 3);
Assert.IsTrue(metrics.CumulativeMetrics.TotalTime > TimeSpan.Zero);
Assert.IsTrue(metrics.CumulativeMetrics.QueryPreparationTime > TimeSpan.Zero);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Pagination
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
Expand Down Expand Up @@ -161,6 +162,104 @@ PartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState> createEnumerator(
Assert.AreEqual(1, createdEnumerators[2].GetNextPageAsyncCounter, "Second enumerator should not be used");
}

[TestMethod]
public async Task TestParallelPrefetch()
{
List<FeedRangeEpk> feedRanges = new List<FeedRangeEpk>()
{
new FeedRangeEpk(new Documents.Routing.Range<string>( "", "AA", isMinInclusive: true, isMaxInclusive: false)),
new FeedRangeEpk(new Documents.Routing.Range<string>("AA", "BB", isMinInclusive: true, isMaxInclusive: false)),
new FeedRangeEpk(new Documents.Routing.Range<string>("BB", "CC", isMinInclusive: true, isMaxInclusive: false)),
new FeedRangeEpk(new Documents.Routing.Range<string>("CC", "DD", isMinInclusive: true, isMaxInclusive: false)),
new FeedRangeEpk(new Documents.Routing.Range<string>("DD", "EE", isMinInclusive: true, isMaxInclusive: false)),
new FeedRangeEpk(new Documents.Routing.Range<string>("EE", "FF", isMinInclusive: true, isMaxInclusive: false)),
};

Mock<IFeedRangeProvider> mockFeedRangeProvider = new Mock<IFeedRangeProvider>();
mockFeedRangeProvider.Setup(p => p.GetFeedRangesAsync(
It.IsAny<ITrace>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(feedRanges);

foreach (int maxConcurrency in new[] { 0, 1, 2, 10, 100 })
{
List<MockEnumerator> rangeEnumerators = feedRanges
.Select(feedRange => new MockEnumerator(new FeedRangeState<ReadFeedState>(feedRange, null), 1))
.ToList();

IEnumerator<MockEnumerator> enumerator = rangeEnumerators.GetEnumerator();

MockEnumerator CreateMockEnumerator(FeedRangeState<ReadFeedState> feedRangeState)
{
Assert.IsTrue(enumerator.MoveNext());
return enumerator.Current;
};

CrossPartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState> crossPartitionEnumerator = new CrossPartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState>(
feedRangeProvider: mockFeedRangeProvider.Object,
createPartitionRangeEnumerator: CreateMockEnumerator,
comparer: null,
prefetchPolicy: PrefetchPolicy.PrefetchSinglePage,
maxConcurrency: maxConcurrency,
state: null);

await crossPartitionEnumerator.MoveNextAsync(NoOpTrace.Singleton, cancellationToken: default);

if (maxConcurrency <= 1)
{
Assert.AreEqual(1, rangeEnumerators.First().InvocationCount);
Assert.IsTrue(rangeEnumerators.Skip(1).All(x => x.InvocationCount == 0));
}
else
{
Assert.IsTrue(rangeEnumerators.All(x => x.InvocationCount == 1));
}
}
}

private class MockEnumerator : PartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState>
{
private static readonly IReadOnlyDictionary<string, string> EmptyHeaders = new Dictionary<string, string>();

private static readonly Stream EmptyStream = new MemoryStream(Encoding.UTF8.GetBytes("{\"Documents\": [], \"_count\": 0, \"_rid\": \"asdf\"}"));

private readonly int pageCount;

public int InvocationCount { get; private set; }

public MockEnumerator(FeedRangeState<ReadFeedState> feedRangeState, int pageCount)
: base(feedRangeState)
{
this.pageCount = pageCount;
}

public override ValueTask DisposeAsync()
{
return default;
}

protected override Task<TryCatch<ReadFeedPage>> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken)
{
if (this.InvocationCount >= this.pageCount)
{
return Task.FromResult(TryCatch<ReadFeedPage>.FromException(new InvalidOperationException(
"Trying to move next on an enumerator that is finished")));
}

++this.InvocationCount;

ReadFeedState state = null;
if (this.InvocationCount < this.pageCount)
{
CosmosElement continuationToken = CosmosString.Create("asdf");
state = new ReadFeedContinuationState(continuationToken);
}

return Task.FromResult(TryCatch<ReadFeedPage>.FromResult(
new ReadFeedPage(EmptyStream, 2.8, 0, "activityId", EmptyHeaders, state)));
}
}

private class EnumeratorThatSplits : PartitionRangePageAsyncEnumerator<ReadFeedPage,ReadFeedState>
{
private readonly bool throwError;
Expand Down