Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2d90f0f
Initial prep for adding a non streaming order by query pipline stage
neildsh Mar 9, 2024
92a0625
Change TracingAsyncEnumerator to be an adapter class between ITracing…
neildsh Mar 11, 2024
0eff840
Move TracingAsyncEnumerator to product code
neildsh Mar 11, 2024
3ae0b62
Change ITracingAsyncEnumerator.MoveNext to take a CancellationToken, …
neildsh Mar 11, 2024
0ff0687
draft implementation of non streaming order by pipeline stage
neildsh Mar 14, 2024
b252123
Fix bug that drops first record on an initialized page
neildsh Mar 14, 2024
163f31f
Add a test class for non streaming order by unit tests
neildsh Mar 14, 2024
83e1726
remove unnecessary usings
neildsh Mar 14, 2024
bf2e7dd
Add an emulator test for non streaming order by. Also lay groundwork …
neildsh Mar 18, 2024
275b063
Remove the ResponseLengthInBytes property from QueryPage. This was be…
neildsh Mar 18, 2024
94e3125
Add an ItemCount property to the Page class
neildsh Mar 18, 2024
9eb7809
Add a multi level heap implementation for non streaming order by
neildsh Mar 20, 2024
ce69f9d
Add infrastructure for writing parity tests
neildsh Mar 20, 2024
702c7ad
Fix a bug that caused the page enumerator to be dropped when we reach…
neildsh Mar 20, 2024
cb874da
When cloning OrderByQueryPartitionRangePageAsyncEnumerator as a fully…
neildsh Mar 20, 2024
97d93d9
Simplify the non streaming pipeline stage, and add performance test
neildsh Mar 22, 2024
075cb46
Revert the changes for Headers.ItemCount Keep it as a string
neildsh Mar 22, 2024
1d02ee8
Avoid an allocation each time the OrderByItems property of OrderByQue…
neildsh Mar 22, 2024
5f464d0
Fix up the OrderByPipelineSatgeBenchmark to use fully materialized Co…
neildsh Mar 22, 2024
8089fdf
Add a few more unit tests for non streaming order by
neildsh Mar 22, 2024
6e5a807
Add emulator tests for non streaming order by
neildsh Mar 25, 2024
82bde9b
Add a few more integration test cases
neildsh Mar 26, 2024
be3fb81
Fix up broken unit test
neildsh Mar 26, 2024
c54e89b
Add more test coverage for the non streaming order by
neildsh Mar 27, 2024
8babdb7
If there is no continuationtoken, assume that the response is streaming
neildsh Mar 27, 2024
784295a
Add stronger validation to the non streaming order by unit tests
neildsh Mar 27, 2024
98218bb
Fix up broken unit tests to account for ItemCount
neildsh Mar 27, 2024
6b66ad2
fix up plumbing for index utilization, and incorporate code review fe…
neildsh Mar 27, 2024
6abe501
Minor clean up
neildsh Mar 27, 2024
cf6f04b
revert bug introduced in pursuit of more elegant code :)
neildsh Mar 28, 2024
3065ed1
fix up broken unit test
neildsh Mar 28, 2024
b966a8a
Fix up broken perf test
neildsh Mar 28, 2024
3661c36
Fix up broken IndexMetricsParserBaselineTest
neildsh Mar 29, 2024
74f3eff
Minor bug fixes for OrderByCrossPartitionEnumerator
neildsh Mar 29, 2024
05befa6
Merge branch 'master' into users/ndeshpan/nonStreamingOrderBy
neildsh Mar 29, 2024
2942dd1
Merge branch 'master' into users/ndeshpan/nonStreamingOrderBy
neildsh Mar 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add an ItemCount property to the Page class
  • Loading branch information
neildsh committed Mar 28, 2024
commit 94e3125edc4c70910ef13e80a8b8caab309fb194
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ internal sealed class ChangeFeedNotModifiedPage : ChangeFeedPage
private static readonly ImmutableHashSet<string> bannedHeaders = new HashSet<string>().ToImmutableHashSet();

public ChangeFeedNotModifiedPage(
double requestCharge,
double requestCharge,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ChangeFeedState state)
: base(requestCharge, activityId, additionalHeaders, state)
{
}

}

public override int ItemCount => 0;

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ internal sealed class ChangeFeedSuccessPage : ChangeFeedPage

public ChangeFeedSuccessPage(
Stream content,
double requestCharge,
double requestCharge,
int itemCount,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ChangeFeedState state)
: base(requestCharge, activityId, additionalHeaders, state)
{
this.Content = content ?? throw new ArgumentNullException(nameof(content));
this.Content = content ?? throw new ArgumentNullException(nameof(content));
this.ItemCount = itemCount;
}

public Stream Content { get; }

public Stream Content { get; }

public override int ItemCount { get; }

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,16 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
{
backendPage = new ChangeFeedSuccessPage(
changeFeedSuccessPage.Content,
totalRequestCharge,
totalRequestCharge,
changeFeedSuccessPage.ItemCount,
changeFeedSuccessPage.ActivityId,
changeFeedSuccessPage.AdditionalHeaders,
changeFeedSuccessPage.State);
}
else
{
backendPage = new ChangeFeedNotModifiedPage(
totalRequestCharge,
totalRequestCharge,
backendPage.ActivityId,
backendPage.AdditionalHeaders,
backendPage.State);
Expand Down
9 changes: 8 additions & 1 deletion Microsoft.Azure.Cosmos/src/Headers/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,14 @@ internal virtual string EndEpk
set => this.CosmosMessageHeaders.EndEpk = value;
}

internal virtual string ItemCount => this.CosmosMessageHeaders.Get(HttpConstants.HttpHeaders.ItemCount);
internal virtual int ItemCount
{
get
{
string itemCountText = this.CosmosMessageHeaders.Get(HttpConstants.HttpHeaders.ItemCount);
return int.Parse(itemCountText, NumberStyles.Integer, CultureInfo.InvariantCulture);
}
}

/// <summary>
/// Creates a new instance of <see cref="Headers"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ public override Exception BufferedException

return null;
}
}
}

public override int BufferedItemCount => this.bufferedPage.HasValue && this.bufferedPage.Value.Succeeded ?
this.bufferedPage.Value.Result.ItemCount :
0;

public BufferedPartitionRangePageAsyncEnumerator(PartitionRangePageAsyncEnumerator<TPage, TState> enumerator)
: base(enumerator.FeedRangeState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ protected BufferedPartitionRangePageAsyncEnumeratorBase(FeedRangeState<TState> f
{
}

public abstract Exception BufferedException { get; }
public abstract Exception BufferedException { get; }

public abstract int BufferedItemCount { get; }

public abstract ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellationToken);
}
Expand Down
6 changes: 4 additions & 2 deletions Microsoft.Azure.Cosmos/src/Pagination/CrossFeedRangePage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ public CrossFeedRangePage(TBackendPage backendEndPage, CrossFeedRangeState<TBack
this.Page = backendEndPage;
}

public TBackendPage Page { get; }

public TBackendPage Page { get; }

public override int ItemCount => this.Page.ItemCount;

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ internal sealed class FullyBufferedPartitionRangeAsyncEnumerator<TPage, TState>
{
private readonly PartitionRangePageAsyncEnumerator<TPage, TState> enumerator;
private readonly List<TPage> bufferedPages;
private int currentIndex;
private int currentIndex;
private int bufferedItemCount;
private Exception exception;

private bool hasPrefetched;

public override Exception BufferedException => this.exception;
public override Exception BufferedException => this.exception;

public override int BufferedItemCount => this.bufferedItemCount;

public FullyBufferedPartitionRangeAsyncEnumerator(PartitionRangePageAsyncEnumerator<TPage, TState> enumerator)
: this(enumerator, null)
Expand Down Expand Up @@ -67,7 +70,8 @@ public override async ValueTask PrefetchAsync(ITrace trace, CancellationToken ca
TryCatch<TPage> current = this.enumerator.Current;
if (current.Succeeded)
{
this.bufferedPages.Add(current.Result);
this.bufferedPages.Add(current.Result);
this.bufferedItemCount += current.Result.ItemCount;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,17 @@ public async Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
if (responseMessage.StatusCode == HttpStatusCode.OK)
{
double requestCharge = responseMessage.Headers.RequestCharge;
string activityId = responseMessage.Headers.ActivityId;
string activityId = responseMessage.Headers.ActivityId;
int itemCount = responseMessage.Headers.ItemCount;
ReadFeedState state = responseMessage.Headers.ContinuationToken != null ? ReadFeedState.Continuation(CosmosString.Create(responseMessage.Headers.ContinuationToken)) : null;
Dictionary<string, string> additionalHeaders = GetAdditionalHeaders(
responseMessage.Headers.CosmosMessageHeaders,
ReadFeedPage.BannedHeaders);

ReadFeedPage readFeedPage = new ReadFeedPage(
responseMessage.Content,
requestCharge,
requestCharge,
itemCount,
activityId,
additionalHeaders,
state);
Expand Down Expand Up @@ -327,7 +329,8 @@ public async Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
if (pageHasResult)
{
double requestCharge = responseMessage.Headers.RequestCharge;
string activityId = responseMessage.Headers.ActivityId;
string activityId = responseMessage.Headers.ActivityId;
int itemCount = responseMessage.Headers.ItemCount;
ChangeFeedState state = ChangeFeedState.Continuation(CosmosString.Create(responseMessage.Headers.ETag));
Dictionary<string, string> additionalHeaders = GetAdditionalHeaders(
responseMessage.Headers.CosmosMessageHeaders,
Expand All @@ -338,7 +341,8 @@ public async Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
{
changeFeedPage = new ChangeFeedSuccessPage(
responseMessage.Content,
requestCharge,
requestCharge,
itemCount,
activityId,
additionalHeaders,
state);
Expand Down
8 changes: 5 additions & 3 deletions Microsoft.Azure.Cosmos/src/Pagination/Page.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ internal abstract class Page<TState>
private static readonly IReadOnlyDictionary<string, string> EmptyDictionary = new Dictionary<string, string>();

protected Page(
double requestCharge,
double requestCharge,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
TState state)
{
this.RequestCharge = requestCharge < 0 ? throw new ArgumentOutOfRangeException(nameof(requestCharge)) : requestCharge;
this.RequestCharge = requestCharge < 0 ? throw new ArgumentOutOfRangeException(nameof(requestCharge)) : requestCharge;
this.ActivityId = activityId;
this.State = state;

Expand All @@ -47,7 +47,9 @@ protected Page(
this.AdditionalHeaders = additionalHeaders ?? EmptyDictionary;
}

public double RequestCharge { get; }
public double RequestCharge { get; }

public abstract int ItemCount { get; }

public string ActivityId { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ public OrderByQueryPage(QueryPage queryPage)

public QueryPage Page { get; }

public IEnumerator<CosmosElement> Enumerator { get; }

public IEnumerator<CosmosElement> Enumerator { get; }

public override int ItemCount => this.Page.ItemCount;

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
{
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Pagination;
Expand Down Expand Up @@ -63,7 +62,9 @@ private OrderByQueryPartitionRangePageAsyncEnumerator(

public string Filter => this.innerEnumerator.Filter;

public QueryState StartOfPageState { get; private set; }
public QueryState StartOfPageState { get; private set; }

public int BufferedResultCount => this.bufferedEnumerator.BufferedItemCount;

public override ValueTask DisposeAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public QueryPage(

public bool? Streaming { get; }

public override int ItemCount => this.Documents.Count;

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => QueryPage.BannedHeaders;
}
}
12 changes: 8 additions & 4 deletions Microsoft.Azure.Cosmos/src/ReadFeed/Pagination/ReadFeedPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ internal sealed class ReadFeedPage : Page<ReadFeedState>

public ReadFeedPage(
Stream content,
double requestCharge,
double requestCharge,
int itemCount,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ReadFeedState state)
: base(requestCharge, activityId, additionalHeaders, state)
{
this.Content = content ?? throw new ArgumentNullException(nameof(content));
this.Content = content ?? throw new ArgumentNullException(nameof(content));
this.ItemCount = itemCount;
}

public Stream Content { get; }

public Stream Content { get; }

public override int ItemCount { get; }

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => BannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal OpenTelemetryResponse(TransactionalBatchResponse responseMessage)
requestCharge: OpenTelemetryResponse.GetHeader(responseMessage)?.RequestCharge,
responseContentLength: null,
diagnostics: responseMessage.Diagnostics,
itemCount: OpenTelemetryResponse.GetHeader(responseMessage)?.ItemCount,
itemCount: OpenTelemetryResponse.GetHeader(responseMessage)?.ItemCount.ToString(),
requestMessage: null,
subStatusCode: OpenTelemetryResponse.GetHeader(responseMessage)?.SubStatusCode,
activityId: OpenTelemetryResponse.GetHeader(responseMessage)?.ActivityId,
Expand All @@ -32,7 +32,7 @@ internal OpenTelemetryResponse(ResponseMessage responseMessage)
requestCharge: OpenTelemetryResponse.GetHeader(responseMessage)?.RequestCharge,
responseContentLength: OpenTelemetryResponse.GetPayloadSize(responseMessage),
diagnostics: responseMessage.Diagnostics,
itemCount: OpenTelemetryResponse.GetHeader(responseMessage)?.ItemCount,
itemCount: OpenTelemetryResponse.GetHeader(responseMessage)?.ItemCount.ToString(),
requestMessage: responseMessage.RequestMessage,
subStatusCode: OpenTelemetryResponse.GetHeader(responseMessage)?.SubStatusCode,
activityId: OpenTelemetryResponse.GetHeader(responseMessage)?.ActivityId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal OpenTelemetryResponse(FeedResponse<T> responseMessage)
requestCharge: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.RequestCharge,
responseContentLength: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.ContentLength,
diagnostics: responseMessage.Diagnostics,
itemCount: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.ItemCount,
itemCount: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.ItemCount.ToString(),
requestMessage: responseMessage.RequestMessage,
subStatusCode: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.SubStatusCode,
activityId: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.ActivityId,
Expand All @@ -32,7 +32,7 @@ internal OpenTelemetryResponse(Response<T> responseMessage)
requestCharge: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.RequestCharge,
responseContentLength: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.ContentLength,
diagnostics: responseMessage.Diagnostics,
itemCount: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.ItemCount,
itemCount: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.ItemCount.ToString(),
requestMessage: responseMessage.RequestMessage,
subStatusCode: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.SubStatusCode,
activityId: OpenTelemetryResponse<T>.GetHeader(responseMessage)?.ActivityId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,13 @@ public async Task ShouldSkipNotModifiedAndReturnResults()
It.IsAny<ITrace>(),
It.IsAny<CancellationToken>())).ReturnsAsync(
(FeedRangeState<ChangeFeedState> state, ChangeFeedPaginationOptions options, ITrace trace, CancellationToken token)
=> TryCatch<ChangeFeedPage>.FromResult(new ChangeFeedSuccessPage(content: new MemoryStream(Encoding.UTF8.GetBytes("{\"Documents\": [], \"_count\": 0, \"_rid\": \"asdf\"}")), requestCharge: 5, activityId: string.Empty, additionalHeaders: default, state.State)));
=> TryCatch<ChangeFeedPage>.FromResult(new ChangeFeedSuccessPage(
content: new MemoryStream(Encoding.UTF8.GetBytes("{\"Documents\": [], \"_count\": 0, \"_rid\": \"asdf\"}")),
requestCharge: 5,
itemCount: 0,
activityId: string.Empty,
additionalHeaders: default,
state.State)));

// Returns a 304 with 1RU charge on CC-FF
documentContainer.Setup(c => c.MonadicChangeFeedAsync(
Expand Down
Loading