Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
incorporate code review feedback
  • Loading branch information
neildsh committed May 17, 2024
commit 9faa445802356aaba8001d76a6c12e6dfb8c03cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public InitializationParameters(
private enum ExecutionState
{
Uninitialized,
Initialized
Initialized,
Done
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
Expand Down Expand Up @@ -1673,15 +1674,12 @@ private sealed class NonStreamingOrderByPipelineStage : IQueryPipelineStage

private BufferedOrderByResults bufferedResults;

private bool firstPage;

public TryCatch<QueryPage> Current { get; private set; }

private NonStreamingOrderByPipelineStage(InitializationParameters parameters, int pageSize)
{
this.parameters = parameters ?? throw new ArgumentNullException(nameof(parameters));
this.pageSize = pageSize;
this.firstPage = true;
this.executionState = ExecutionState.Uninitialized;
}

Expand All @@ -1693,11 +1691,19 @@ public ValueTask DisposeAsync()

public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cancellationToken)
{
if (this.executionState == ExecutionState.Done)
{
return false;
}

cancellationToken.ThrowIfCancellationRequested();

bool firstPage = false;
if (this.executionState == ExecutionState.Uninitialized)
{
await this.MoveNextAsync_InitializeAsync(trace, cancellationToken);
firstPage = true;
this.bufferedResults = await this.MoveNextAsync_InitializeAsync(trace, cancellationToken);
this.executionState = ExecutionState.Initialized;
}

List<CosmosElement> documents = new List<CosmosElement>(this.pageSize);
Expand All @@ -1706,9 +1712,9 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
documents.Add(this.bufferedResults.Enumerator.Current.Payload);
}

if (this.firstPage || documents.Count > 0)
if (firstPage || documents.Count > 0)
{
double requestCharge = this.firstPage ? this.bufferedResults.TotalRequestCharge : 0;
double requestCharge = firstPage ? this.bufferedResults.TotalRequestCharge : 0;
QueryPage queryPage = new QueryPage(
documents: documents,
requestCharge: requestCharge,
Expand All @@ -1720,17 +1726,17 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
state: documents.Count > 0 ? NonStreamingOrderByInProgress : null,
streaming: false);

this.firstPage = false;
this.Current = TryCatch<QueryPage>.FromResult(queryPage);
return true;
}
else
{
this.executionState = ExecutionState.Done;
return false;
}
}

private async Task MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken cancellationToken)
private async Task<BufferedOrderByResults> MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken cancellationToken)
{
ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>> enumerator = await OrderByCrossPartitionRangePageEnumerator.CreateAsync(
this.parameters.DocumentContainer,
Expand All @@ -1752,8 +1758,7 @@ private async Task MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken
trace,
cancellationToken);

this.bufferedResults = bufferedResults;
this.executionState = ExecutionState.Initialized;
return bufferedResults;
}

public static IQueryPipelineStage Create(
Expand Down Expand Up @@ -1817,7 +1822,7 @@ public static async Task<ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>>> Cr
OrderByQueryPartitionRangePageAsyncEnumerator enumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
sqlQuerySpec,
new FeedRangeState<QueryState>(range, null),
new FeedRangeState<QueryState>(range, state: null),
partitionKey,
queryPaginationOptions,
filter: null,
Expand Down Expand Up @@ -1853,6 +1858,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
{
while (this.enumeratorsAndTokens.Count > 0)
{
cancellationToken.ThrowIfCancellationRequested();
(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token) = this.enumeratorsAndTokens.Dequeue();
if (await enumerator.MoveNextAsync(trace, cancellationToken))
{
Expand All @@ -1861,7 +1867,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
OrderByContinuationToken continuationToken;
if (enumerator.Current.Result.Page.Documents.Count > 0)
{
// Use the token for the next page, since we fully drained the enumerator.
// Use the token for the next page, since we fully drained the page.
continuationToken = enumerator.FeedRangeState.State?.Value != null ?
CreateOrderByContinuationToken(
new ParallelContinuationToken(
Expand Down