diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs index cfe911966d..c5f0883089 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs @@ -55,6 +55,8 @@ private sealed class InitializationParameters public int MaxConcurrency { get; } + public bool NonStreamingOrderBy { get; } + public InitializationParameters( IDocumentContainer documentContainer, SqlQuerySpec sqlQuerySpec, @@ -62,7 +64,8 @@ public InitializationParameters( PartitionKey? partitionKey, IReadOnlyList orderByColumns, QueryPaginationOptions queryPaginationOptions, - int maxConcurrency) + int maxConcurrency, + bool nonStreamingOrderBy) { this.DocumentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer)); this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec)); @@ -71,6 +74,7 @@ public InitializationParameters( this.OrderByColumns = orderByColumns ?? throw new ArgumentNullException(nameof(orderByColumns)); this.QueryPaginationOptions = queryPaginationOptions ?? throw new ArgumentNullException(nameof(queryPaginationOptions)); this.MaxConcurrency = maxConcurrency; + this.NonStreamingOrderBy = nonStreamingOrderBy; } } @@ -181,6 +185,7 @@ public static TryCatch MonadicCreate( IReadOnlyList orderByColumns, QueryPaginationOptions queryPaginationOptions, int maxConcurrency, + bool nonStreamingOrderBy, CosmosElement continuationToken) { if (documentContainer == null) @@ -233,24 +238,28 @@ public static TryCatch MonadicCreate( partitionKey, orderByColumns, queryPaginationOptions, - maxConcurrency); + maxConcurrency, + nonStreamingOrderBy); return TryCatch.FromResult(new OrderByCrossPartitionQueryPipelineStage(init)); } - private static async ValueTask<(TryCatch, Queue)> MoveNextAsync_InitializeAsync(InitializationParameters init, ITrace trace, CancellationToken cancellationToken) + private static async ValueTask<(TryCatch, Queue)> MoveNextAsync_InitializeAsync( + InitializationParameters parameters, + ITrace trace, + CancellationToken cancellationToken) { SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec( - init.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter), - init.SqlQuerySpec.Parameters); + parameters.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter), + parameters.SqlQuerySpec.Parameters); - List uninitializedEnumerators = init.TargetRanges + List uninitializedEnumerators = parameters.TargetRanges .Select(range => OrderByQueryPartitionRangePageAsyncEnumerator.Create( - init.DocumentContainer, + parameters.DocumentContainer, rewrittenQueryForOrderBy, new FeedRangeState(range, state: default), - init.PartitionKey, - init.QueryPaginationOptions, + parameters.PartitionKey, + parameters.QueryPaginationOptions, TrueFilter, PrefetchPolicy.PrefetchSinglePage)) .ToList(); @@ -259,13 +268,12 @@ public static TryCatch MonadicCreate( uninitializedEnumerators .Select(x => (x, (OrderByContinuationToken)null))); - await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, init.MaxConcurrency, trace, cancellationToken); + await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, parameters.MaxConcurrency, trace, cancellationToken); - IReadOnlyList sortOrders = init.OrderByColumns.Select(column => column.SortOrder).ToList(); + IReadOnlyList sortOrders = parameters.OrderByColumns.Select(column => column.SortOrder).ToList(); PriorityQueue initializedEnumerators = new PriorityQueue(new OrderByEnumeratorComparer(sortOrders)); Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens = new Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)>(); - bool nonStreaming = false; Queue bufferedPages = new Queue(); QueryPageParameters queryPageParameters = null; while (uninitializedEnumeratorsAndTokens.Count != 0) @@ -278,7 +286,7 @@ public static TryCatch MonadicCreate( if (IsSplitException(enumerator.Current.Exception)) { await MoveNextAsync_InitializeAsync_HandleSplitAsync( - init.DocumentContainer, + parameters.DocumentContainer, uninitializedEnumeratorsAndTokens, enumerator, token, @@ -307,9 +315,6 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync( additionalHeaders: page.AdditionalHeaders); } - // For backwards compatibility the default value of streaming for ORDER BY is _true_ - nonStreaming = nonStreaming || (!page.Streaming.GetValueOrDefault(true) && (page.State != null)); - if (enumerator.Current.Result.Enumerator.MoveNext()) { // the page is non-empty then we need to enqueue the enumerator in the PriorityQueue @@ -335,7 +340,7 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync( } IQueryPipelineStage pipelineStage; - if (nonStreaming) + if (parameters.NonStreamingOrderBy) { Queue orderbyEnumerators = new Queue(); foreach ((OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken _) in enumeratorsAndTokens) @@ -350,10 +355,10 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync( orderbyEnumerators.Enqueue(bufferedEnumerator); } - await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, init.MaxConcurrency, trace, cancellationToken); + await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, parameters.MaxConcurrency, trace, cancellationToken); pipelineStage = await NonStreamingOrderByPipelineStage.CreateAsync( - init.QueryPaginationOptions, + parameters.QueryPaginationOptions, sortOrders, orderbyEnumerators, queryPageParameters, @@ -363,12 +368,12 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync( else { pipelineStage = StreamingOrderByCrossPartitionQueryPipelineStage.Create( - init.DocumentContainer, + parameters.DocumentContainer, sortOrders, initializedEnumerators, enumeratorsAndTokens, - init.QueryPaginationOptions, - init.MaxConcurrency); + parameters.QueryPaginationOptions, + parameters.MaxConcurrency); } return (TryCatch.FromResult(pipelineStage), bufferedPages); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs index 814682ba72..93008125fb 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs @@ -78,7 +78,8 @@ public static TryCatch MonadicCreate( .OrderByExpressions .Zip(queryInfo.OrderBy, (expression, sortOrder) => new OrderByColumn(expression, sortOrder)).ToList(), queryPaginationOptions: queryPaginationOptions, - maxConcurrency: maxConcurrency, + maxConcurrency: maxConcurrency, + nonStreamingOrderBy: queryInfo.HasNonStreamingOrderBy, continuationToken: continuationToken); } else diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs index 4ec2009758..226c587cee 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs @@ -44,16 +44,16 @@ FROM c [Benchmark(Baseline = true)] public Task StreamingOrderByPipelineStage() { - return CreateAndRunPipeline(StreamingContainer); + return CreateAndRunPipeline(StreamingContainer, nonStreamingOrderBy: false); } [Benchmark] public Task NonStreamingOrderByPipelineStage() { - return CreateAndRunPipeline(NonStreamingContainer); + return CreateAndRunPipeline(NonStreamingContainer, nonStreamingOrderBy: true); } - private static async Task CreateAndRunPipeline(IDocumentContainer documentContainer) + private static async Task CreateAndRunPipeline(IDocumentContainer documentContainer, bool nonStreamingOrderBy) { IReadOnlyList ranges = await documentContainer.GetFeedRangesAsync( trace: NoOpTrace.Singleton, @@ -67,6 +67,7 @@ private static async Task CreateAndRunPipeline(IDocumentContainer documentContai orderByColumns: OrderByColumns, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: EndUserPageSize), maxConcurrency: MaxConcurrency, + nonStreamingOrderBy: nonStreamingOrderBy, continuationToken: null); IQueryPipelineStage pipeline = pipelineStage.Result; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs index daac63b9b1..60d01dc69a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs @@ -228,14 +228,16 @@ private static async Task RunParityTests( ranges: ranges, queryText: testCase.QueryText, orderByColumns: testCase.OrderByColumns, - pageSize: pageSize); + pageSize: pageSize, + nonStreamingOrderBy: true); IReadOnlyList streamingResult = await CreateAndRunPipelineStage( documentContainer: documentContainer, ranges: ranges, queryText: testCase.QueryText, orderByColumns: testCase.OrderByColumns, - pageSize: pageSize); + pageSize: pageSize, + nonStreamingOrderBy: false); if (!streamingResult.SequenceEqual(nonStreamingResult)) { @@ -255,7 +257,8 @@ private static async Task> CreateAndRunPipelineStag IReadOnlyList ranges, string queryText, IReadOnlyList orderByColumns, - int pageSize) + int pageSize, + bool nonStreamingOrderBy) { TryCatch pipelineStage = OrderByCrossPartitionQueryPipelineStage.MonadicCreate( documentContainer: documentContainer, @@ -265,6 +268,7 @@ private static async Task> CreateAndRunPipelineStag orderByColumns: orderByColumns, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: pageSize), maxConcurrency: MaxConcurrency, + nonStreamingOrderBy: nonStreamingOrderBy, continuationToken: null); Assert.IsTrue(pipelineStage.Succeeded); @@ -311,7 +315,8 @@ private static async Task RunParityTests(IReadOnlyList testCases ranges: ranges, queryText: testCase.QueryText, orderByColumns: testCase.OrderByColumns, - pageSize: pageSize); + pageSize: pageSize, + nonStreamingOrderBy: true); DebugTraceHelpers.TraceStreamingPipelineStarting(); IReadOnlyList streamingResult = await CreateAndRunPipelineStage( @@ -319,7 +324,8 @@ private static async Task RunParityTests(IReadOnlyList testCases ranges: ranges, queryText: testCase.QueryText, orderByColumns: testCase.OrderByColumns, - pageSize: pageSize); + pageSize: pageSize, + nonStreamingOrderBy: false); if (!streamingResult.SequenceEqual(nonStreamingResult)) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs index eebd3b1b83..af77a51996 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs @@ -77,7 +77,8 @@ public void MonadicCreate_NullContinuationToken() new OrderByColumn("_ts", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, + maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: null); Assert.IsTrue(monadicCreate.Succeeded); } @@ -98,6 +99,7 @@ public void MonadicCreate_NonCosmosArrayContinuationToken() }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: CosmosObject.Create(new Dictionary())); Assert.IsTrue(monadicCreate.Failed); Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException); @@ -119,6 +121,7 @@ public void MonadicCreate_EmptyArrayContinuationToken() }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: CosmosArray.Create(new List())); Assert.IsTrue(monadicCreate.Failed); Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException); @@ -140,6 +143,7 @@ public void MonadicCreate_NonParallelContinuationToken() }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: CosmosArray.Create(new List() { CosmosString.Create("asdf") })); Assert.IsTrue(monadicCreate.Failed); Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException); @@ -176,6 +180,7 @@ public void MonadicCreate_SingleOrderByContinuationToken() }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: CosmosArray.Create( new List() { @@ -220,6 +225,7 @@ public void MonadicCreate_SingleOrderByContinuationToken() }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: CosmosArray.Create( new List() { @@ -279,6 +285,7 @@ public void MonadicCreate_MultipleOrderByContinuationToken() }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: CosmosArray.Create( new List() { @@ -321,6 +328,7 @@ public void MonadicCreate_OrderByWithResumeValues() }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: CosmosArray.Create( new List() { @@ -361,7 +369,8 @@ public void MonadicCreate_OrderByWithResumeValues() new OrderByColumn("item2", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, + maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: CosmosArray.Create( new List() { @@ -416,7 +425,8 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy new OrderByColumn("c._ts", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 1), - maxConcurrency: 0, + maxConcurrency: 0, + nonStreamingOrderBy: false, continuationToken: CosmosElement.Parse(continuationToken)); Assert.IsTrue(monadicCreate.Succeeded); @@ -451,7 +461,8 @@ FROM c new OrderByColumn("c._ts", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, + maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: null); Assert.IsTrue(monadicCreate.Succeeded); IQueryPipelineStage queryPipelineStage = monadicCreate.Result; @@ -500,7 +511,8 @@ FROM c new OrderByColumn("c._ts", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, + maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: null); Assert.IsTrue(monadicCreate.Succeeded); IQueryPipelineStage queryPipelineStage = monadicCreate.Result; @@ -557,7 +569,8 @@ FROM c new OrderByColumn("c.pk", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, + maxConcurrency: 10, + nonStreamingOrderBy: false, continuationToken: continuationToken); monadicQueryPipelineStage.ThrowIfFailed(); IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result;