Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add more test coverage for splits/merge and request charge
  • Loading branch information
neildsh committed May 16, 2024
commit 7698978f8514d07b718d0f9709fba6a2a6c0e80c
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,15 @@ private static async Task RunParityTests(
{
foreach (int pageSize in testCase.PageSizes)
{
IReadOnlyList<CosmosElement> nonStreamingResult = await CreateAndRunPipelineStage(
(IReadOnlyList<CosmosElement> nonStreamingResult, double nonStreamingCharge) = await CreateAndRunPipelineStage(
documentContainer: nonStreamingDocumentContainer,
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize,
nonStreamingOrderBy: true);

IReadOnlyList<CosmosElement> streamingResult = await CreateAndRunPipelineStage(
(IReadOnlyList<CosmosElement> streamingResult, double streamingCharge) = await CreateAndRunPipelineStage(
documentContainer: documentContainer,
ranges: ranges,
queryText: testCase.QueryText,
Expand All @@ -248,11 +248,17 @@ private static async Task RunParityTests(
{
Assert.Fail($"Could not validate result for query:\n{testCase.QueryText}\npageSize: {pageSize}");
}

if (Math.Abs(streamingCharge - nonStreamingCharge) > 0.0001)
{
Assert.Fail($"Request charge mismatch for query:\n{testCase.QueryText}\npageSize: {pageSize}" +
$"\nStreaming request charge: {streamingCharge} NonStreaming request charge: {nonStreamingCharge}");
}
}
}
}

private static async Task<IReadOnlyList<CosmosElement>> CreateAndRunPipelineStage(
private static async Task<(IReadOnlyList<CosmosElement>, double)> CreateAndRunPipelineStage(
IDocumentContainer documentContainer,
IReadOnlyList<FeedRangeEpk> ranges,
string queryText,
Expand All @@ -273,6 +279,7 @@ private static async Task<IReadOnlyList<CosmosElement>> CreateAndRunPipelineStag

Assert.IsTrue(pipelineStage.Succeeded);

double totalRequestCharge = 0;
IQueryPipelineStage stage = pipelineStage.Result;
List<CosmosElement> documents = new List<CosmosElement>();
while (await stage.MoveNextAsync(NoOpTrace.Singleton, default))
Expand All @@ -281,9 +288,10 @@ private static async Task<IReadOnlyList<CosmosElement>> CreateAndRunPipelineStag
Assert.IsTrue(stage.Current.Result.Documents.Count <= pageSize);
DebugTraceHelpers.TracePipelineStagePage(stage.Current.Result);
documents.AddRange(stage.Current.Result.Documents);
totalRequestCharge += stage.Current.Result.RequestCharge;
}

return documents;
return (documents, totalRequestCharge);
}

private static async Task RunParityTests(IReadOnlyList<ParityTestCase> testCases)
Expand All @@ -300,17 +308,17 @@ private static async Task RunParityTests(IReadOnlyList<ParityTestCase> testCases
new FeedRangeEpk(new Documents.Routing.Range<string>("EE", "FF", true, false)),
};

IDocumentContainer nonStreamingDocumentContainer = MockDocumentContainer.Create(ranges, testCase.FeedMode, testCase.DocumentCreationMode);
MockDocumentContainer nonStreamingDocumentContainer = MockDocumentContainer.Create(ranges, testCase.FeedMode, testCase.DocumentCreationMode);

IDocumentContainer streamingDocumentContainer = MockDocumentContainer.Create(
MockDocumentContainer streamingDocumentContainer = MockDocumentContainer.Create(
ranges,
testCase.FeedMode & PartitionedFeedMode.StreamingReversed,
testCase.DocumentCreationMode);

foreach (int pageSize in testCase.PageSizes)
{
DebugTraceHelpers.TraceNonStreamingPipelineStarting();
IReadOnlyList<CosmosElement> nonStreamingResult = await CreateAndRunPipelineStage(
(IReadOnlyList<CosmosElement> nonStreamingResult, double nonStreamingCharge) = await CreateAndRunPipelineStage(
documentContainer: nonStreamingDocumentContainer,
ranges: ranges,
queryText: testCase.QueryText,
Expand All @@ -319,7 +327,7 @@ private static async Task RunParityTests(IReadOnlyList<ParityTestCase> testCases
nonStreamingOrderBy: true);

DebugTraceHelpers.TraceStreamingPipelineStarting();
IReadOnlyList<CosmosElement> streamingResult = await CreateAndRunPipelineStage(
(IReadOnlyList<CosmosElement> streamingResult, double streamingCharge) = await CreateAndRunPipelineStage(
documentContainer: streamingDocumentContainer,
ranges: ranges,
queryText: testCase.QueryText,
Expand All @@ -331,6 +339,18 @@ private static async Task RunParityTests(IReadOnlyList<ParityTestCase> testCases
{
Assert.Fail($"Results mismatch for query:\n{testCase.QueryText}\npageSize: {pageSize}");
}

if (Math.Abs(streamingCharge - nonStreamingCharge) > 0.0001)
{
Assert.Fail($"Request charge mismatch for query:\n{testCase.QueryText}\npageSize: {pageSize}" +
$"\nStreaming request charge: {streamingCharge} NonStreaming request charge: {nonStreamingCharge}");
}

if (Math.Abs(nonStreamingCharge - nonStreamingDocumentContainer.TotalRequestCharge) > 0.0001)
{
Assert.Fail($"Request charge mismatch for query:\n{testCase.QueryText}\npageSize: {pageSize}" +
$"\nExpected: {nonStreamingDocumentContainer.TotalRequestCharge} Actual NonStreaming request charge: {nonStreamingCharge}");
}
}
}
}
Expand Down Expand Up @@ -693,21 +713,28 @@ private class MockDocumentContainer : IDocumentContainer

private readonly bool streaming;

public static IDocumentContainer Create(IReadOnlyList<FeedRangeEpk> feedRanges, PartitionedFeedMode feedMode, DocumentCreationMode documentCreationMode)
public double TotalRequestCharge { get; }

public static MockDocumentContainer Create(IReadOnlyList<FeedRangeEpk> feedRanges, PartitionedFeedMode feedMode, DocumentCreationMode documentCreationMode)
{
IReadOnlyDictionary<FeedRange, IReadOnlyList<IReadOnlyList<CosmosElement>>> pages = CreatePartitionedFeed(
feedRanges,
LeafPageCount,
PageSize,
feedMode,
(index) => CreateDocument(index, documentCreationMode));
return new MockDocumentContainer(pages, !feedMode.HasFlag(PartitionedFeedMode.NonStreaming));
double totalRequestCharge = feedRanges.Count * LeafPageCount * QueryCharge;
return new MockDocumentContainer(pages, !feedMode.HasFlag(PartitionedFeedMode.NonStreaming), totalRequestCharge);
}

private MockDocumentContainer(IReadOnlyDictionary<FeedRange, IReadOnlyList<IReadOnlyList<CosmosElement>>> pages, bool streaming)
private MockDocumentContainer(
IReadOnlyDictionary<FeedRange, IReadOnlyList<IReadOnlyList<CosmosElement>>> pages,
bool streaming,
double totalRequestCharge)
{
this.pages = pages ?? throw new ArgumentNullException(nameof(pages));
this.streaming = streaming;
this.streaming = streaming;
this.TotalRequestCharge = totalRequestCharge;
}

public Task<ChangeFeedPage> ChangeFeedAsync(FeedRangeState<ChangeFeedState> feedRangeState, ChangeFeedPaginationOptions changeFeedPaginationOptions, ITrace trace, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline
{
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void MonadicCreate_NullContinuationToken()
new OrderByColumn("_ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
Expand Down Expand Up @@ -369,7 +369,7 @@ public void MonadicCreate_OrderByWithResumeValues()
new OrderByColumn("item2", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
Expand Down Expand Up @@ -425,7 +425,7 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 1),
maxConcurrency: 0,
maxConcurrency: 0,
nonStreamingOrderBy: false,
continuationToken: CosmosElement.Parse(continuationToken));
Assert.IsTrue(monadicCreate.Succeeded);
Expand All @@ -440,7 +440,9 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy
}

[TestMethod]
public async Task TestDrainFully_StartFromBeginingAsync_NoDocuments()
[DataRow(false, DisplayName = "NonStreaming: false")]
[DataRow(true, DisplayName = "NonStreaming: true")]
public async Task TestDrainFully_StartFromBeginingAsync_NoDocuments(bool nonStreamingOrderBy)
{
int numItems = 0;
IDocumentContainer documentContainer = await CreateDocumentContainerAsync(numItems);
Expand All @@ -461,8 +463,8 @@ FROM c
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
maxConcurrency: 10,
nonStreamingOrderBy: nonStreamingOrderBy,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
Expand All @@ -479,9 +481,8 @@ FROM c
QueryPage queryPage = tryGetQueryPage.Result;
documents.AddRange(queryPage.Documents);

if (queryPage.RequestCharge > 0)
if (!nonStreamingOrderBy)
{
// some empty pages may be emitted
Assert.AreEqual(42, queryPage.RequestCharge);
}
}
Expand Down Expand Up @@ -511,12 +512,12 @@ FROM c
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;

int countAdditionalHeadersReceived = 0;
while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton, cancellationToken: default))
{
Expand All @@ -526,32 +527,36 @@ FROM c
Assert.Fail(tryGetQueryPage.Exception.ToString());
}

QueryPage queryPage = tryGetQueryPage.Result;
if (queryPage.AdditionalHeaders.Count > 0)
{
++countAdditionalHeadersReceived;
QueryPage queryPage = tryGetQueryPage.Result;
if (queryPage.AdditionalHeaders.Count > 0)
{
++countAdditionalHeadersReceived;
}
}
int countFeedRanges = (await documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: default))
.Count;
}

int countFeedRanges = (await documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: default))
.Count;
Assert.IsTrue(countAdditionalHeadersReceived >= countFeedRanges);
}
}

[TestMethod]
[DataRow(false, false, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")]
[DataRow(false, false, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")]
[DataRow(false, true, false, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: false")]
[DataRow(false, true, true, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: true")]
[DataRow(true, false, false, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: false")]
[DataRow(true, false, true, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: true")]
[DataRow(true, true, false, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: false")]
[DataRow(true, true, true, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: true")]
public async Task TestDrainWithStateSplitsAndMergeAsync(bool useState, bool allowSplits, bool allowMerges)
[DataRow(false, false, false, false, DisplayName = "NonStreaming: false, Use State: false, Allow Splits: false, Allow Merges: false")]
[DataRow(false, false, false, true, DisplayName = "NonStreaming: false, Use State: false, Allow Splits: false, Allow Merges: true")]
[DataRow(false, false, true, false, DisplayName = "NonStreaming: false, Use State: false, Allow Splits: true, Allow Merges: false")]
[DataRow(false, false, true, true, DisplayName = "NonStreaming: false, Use State: false, Allow Splits: true, Allow Merges: true")]
[DataRow(false, true, false, false, DisplayName = "NonStreaming: false, Use State: true, Allow Splits: false, Allow Merges: false")]
[DataRow(false, true, false, true, DisplayName = "NonStreaming: false, Use State: true, Allow Splits: false, Allow Merges: true")]
[DataRow(false, true, true, false, DisplayName = "NonStreaming: false, Use State: true, Allow Splits: true, Allow Merges: false")]
[DataRow(false, true, true, true, DisplayName = "NonStreaming: false, Use State: true, Allow Splits: true, Allow Merges: true")]
[DataRow(true, false, false, false, DisplayName = "NonStreaming: true, Use State: false, Allow Splits: false, Allow Merges: false")]
[DataRow(true, false, false, true, DisplayName = "NonStreaming: true, Use State: false, Allow Splits: false, Allow Merges: true")]
[DataRow(true, false, true, false, DisplayName = "NonStreaming: true, Use State: false, Allow Splits: true, Allow Merges: false")]
[DataRow(true, false, true, true, DisplayName = "NonStreaming: true, Use State: false, Allow Splits: true, Allow Merges: true")]
public async Task TestDrainWithStateSplitsAndMergeAsync(bool nonStreamingOrderBy, bool useState, bool allowSplits, bool allowMerges)
{
static async Task<IQueryPipelineStage> CreatePipelineStateAsync(IDocumentContainer documentContainer, CosmosElement continuationToken)
static async Task<IQueryPipelineStage> CreatePipelineStateAsync(IDocumentContainer documentContainer, CosmosElement continuationToken, bool nonStreamingOrderBy)
{
TryCatch<IQueryPipelineStage> monadicQueryPipelineStage = OrderByCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
Expand All @@ -569,18 +574,19 @@ FROM c
new OrderByColumn("c.pk", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
maxConcurrency: 10,
nonStreamingOrderBy: nonStreamingOrderBy,
continuationToken: continuationToken);
monadicQueryPipelineStage.ThrowIfFailed();
IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result;

return queryPipelineStage;
}

bool verbose = false;
int numItems = 1000;
IDocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems);
IQueryPipelineStage queryPipelineStage = await CreatePipelineStateAsync(inMemoryCollection, continuationToken: null);
IQueryPipelineStage queryPipelineStage = await CreatePipelineStateAsync(inMemoryCollection, continuationToken: null, nonStreamingOrderBy);
List<CosmosElement> documents = new List<CosmosElement>();
Random random = new Random();
while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton, cancellationToken: default))
Expand Down Expand Up @@ -615,7 +621,7 @@ FROM c
break;
}

queryPipelineStage = await CreatePipelineStateAsync(inMemoryCollection, queryState.Value);
queryPipelineStage = await CreatePipelineStateAsync(inMemoryCollection, queryState.Value, nonStreamingOrderBy);
}

if (random.Next() % 2 == 0)
Expand All @@ -629,6 +635,11 @@ FROM c
cancellationToken: default);
FeedRangeInternal randomRangeToSplit = ranges[random.Next(0, ranges.Count)];
await inMemoryCollection.SplitAsync(randomRangeToSplit, cancellationToken: default);

if (verbose)
{
System.Diagnostics.Trace.WriteLine($"Split range: {randomRangeToSplit.ToJsonString()}");
}
}

if (allowMerges && (random.Next() % 2 == 0))
Expand All @@ -645,6 +656,12 @@ FROM c
int adjacentIndex = indexToMerge == (ranges.Count - 1) ? indexToMerge - 1 : indexToMerge + 1;
await inMemoryCollection.MergeAsync(ranges[indexToMerge], ranges[adjacentIndex], cancellationToken: default);
}

if (verbose)
{
string mergedRanges = string.Join(", ", ranges.Select(range => range.ToJsonString()));
System.Diagnostics.Trace.WriteLine($"Merged ranges: {mergedRanges}");
}
}
}
}
Expand Down