diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Client.cs deleted file mode 100644 index 4d2c095eda..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Client.cs +++ /dev/null @@ -1,142 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - using static IndexUtilizationHelper; - - internal abstract partial class AggregateQueryPipelineStage : QueryPipelineStageBase - { - private sealed class ClientAggregateQueryPipelineStage : AggregateQueryPipelineStage - { - private ClientAggregateQueryPipelineStage( - IQueryPipelineStage source, - SingleGroupAggregator singleGroupAggregator, - bool isValueAggregateQuery) - : base(source, singleGroupAggregator, isValueAggregateQuery) - { - // all the work is done in the base constructor. - } - - public static new TryCatch MonadicCreate( - IReadOnlyList aggregates, - IReadOnlyDictionary aliasToAggregateType, - IReadOnlyList orderedAliases, - bool hasSelectValue, - CosmosElement continuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) - { - if (monadicCreatePipelineStage == null) - { - throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); - } - - TryCatch tryCreateSingleGroupAggregator = SingleGroupAggregator.TryCreate( - aggregates, - aliasToAggregateType, - orderedAliases, - hasSelectValue, - continuationToken: null); - if (tryCreateSingleGroupAggregator.Failed) - { - return TryCatch.FromException(tryCreateSingleGroupAggregator.Exception); - } - - TryCatch tryCreateSource = monadicCreatePipelineStage(continuationToken); - if (tryCreateSource.Failed) - { - return tryCreateSource; - } - - ClientAggregateQueryPipelineStage stage = new ClientAggregateQueryPipelineStage( - tryCreateSource.Result, - tryCreateSingleGroupAggregator.Result, - hasSelectValue); - - return TryCatch.FromResult(stage); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (this.returnedFinalPage) - { - return false; - } - - // Note-2016-10-25-felixfan: Given what we support now, we should expect to return only 1 document. - // Note-2019-07-11-brchon: We can return empty pages until all the documents are drained, - // but then we will have to design a continuation token. - - double requestCharge = 0; - IReadOnlyDictionary cumulativeAdditionalHeaders = default; - - while (await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - TryCatch tryGetPageFromSource = this.inputStage.Current; - if (tryGetPageFromSource.Failed) - { - this.Current = tryGetPageFromSource; - return true; - } - - QueryPage sourcePage = tryGetPageFromSource.Result; - - requestCharge += sourcePage.RequestCharge; - - cumulativeAdditionalHeaders = AccumulateIndexUtilization( - cumulativeHeaders: cumulativeAdditionalHeaders, - currentHeaders: sourcePage.AdditionalHeaders); - - foreach (CosmosElement element in sourcePage.Documents) - { - cancellationToken.ThrowIfCancellationRequested(); - - RewrittenAggregateProjections rewrittenAggregateProjections = new RewrittenAggregateProjections( - this.isValueQuery, - element); - this.singleGroupAggregator.AddValues(rewrittenAggregateProjections.Payload); - } - } - - List finalResult = new List(); - CosmosElement aggregationResult = this.singleGroupAggregator.GetResult(); - if (aggregationResult != null) - { - finalResult.Add(aggregationResult); - } - - QueryPage queryPage = new QueryPage( - documents: finalResult, - requestCharge: requestCharge, - activityId: default, - cosmosQueryExecutionInfo: default, - distributionPlanSpec: default, - disallowContinuationTokenMessage: default, - additionalHeaders: cumulativeAdditionalHeaders, - state: default, - streaming: default); - - this.Current = TryCatch.FromResult(queryPage); - this.returnedFinalPage = true; - return true; - } - } - } -} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.cs index 57eac41990..13c245c4b3 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.cs @@ -7,22 +7,16 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate using System; using System.Collections.Generic; using System.Threading; + using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; + using Microsoft.Azure.Cosmos.Tracing; + using static IndexUtilizationHelper; - /// - /// Stage that is able to aggregate local aggregates from multiple continuations and partitions. - /// At a high level aggregates queries only return a "partial" aggregate. - /// "partial" means that the result is only valid for that one continuation (and one partition). - /// For example suppose you have the query "SELECT COUNT(1) FROM c" and you have a single partition collection, - /// then you will get one count for each continuation of the query. - /// If you wanted the true result for this query, then you will have to take the sum of all continuations. - /// The reason why we have multiple continuations is because for a long running query we have to break up the results into multiple continuations. - /// Fortunately all the aggregates can be aggregated across continuations and partitions. - /// - internal abstract partial class AggregateQueryPipelineStage : QueryPipelineStageBase - { + internal class AggregateQueryPipelineStage : QueryPipelineStageBase + { /// /// This class does most of the work, since a query like: /// @@ -47,7 +41,7 @@ internal abstract partial class AggregateQueryPipelineStage : QueryPipelineStage /// The single group aggregator that we will feed results into. /// Whether or not the query has the 'VALUE' keyword. /// This constructor is private since there is some async initialization that needs to happen in CreateAsync(). - public AggregateQueryPipelineStage( + private AggregateQueryPipelineStage( IQueryPipelineStage source, SingleGroupAggregator singleGroupAggregator, bool isValueQuery) @@ -55,6 +49,78 @@ public AggregateQueryPipelineStage( { this.singleGroupAggregator = singleGroupAggregator ?? throw new ArgumentNullException(nameof(singleGroupAggregator)); this.isValueQuery = isValueQuery; + } + + public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + if (trace == null) + { + throw new ArgumentNullException(nameof(trace)); + } + + if (this.returnedFinalPage) + { + return false; + } + + // Note-2016-10-25-felixfan: Given what we support now, we should expect to return only 1 document. + // Note-2019-07-11-brchon: We can return empty pages until all the documents are drained, + // but then we will have to design a continuation token. + + double requestCharge = 0; + IReadOnlyDictionary cumulativeAdditionalHeaders = default; + + while (await this.inputStage.MoveNextAsync(trace, cancellationToken)) + { + TryCatch tryGetPageFromSource = this.inputStage.Current; + if (tryGetPageFromSource.Failed) + { + this.Current = tryGetPageFromSource; + return true; + } + + QueryPage sourcePage = tryGetPageFromSource.Result; + + requestCharge += sourcePage.RequestCharge; + + cumulativeAdditionalHeaders = AccumulateIndexUtilization( + cumulativeHeaders: cumulativeAdditionalHeaders, + currentHeaders: sourcePage.AdditionalHeaders); + + foreach (CosmosElement element in sourcePage.Documents) + { + cancellationToken.ThrowIfCancellationRequested(); + + RewrittenAggregateProjections rewrittenAggregateProjections = new RewrittenAggregateProjections( + this.isValueQuery, + element); + this.singleGroupAggregator.AddValues(rewrittenAggregateProjections.Payload); + } + } + + List finalResult = new List(); + CosmosElement aggregationResult = this.singleGroupAggregator.GetResult(); + if (aggregationResult != null) + { + finalResult.Add(aggregationResult); + } + + QueryPage queryPage = new QueryPage( + documents: finalResult, + requestCharge: requestCharge, + activityId: default, + cosmosQueryExecutionInfo: default, + distributionPlanSpec: default, + disallowContinuationTokenMessage: default, + additionalHeaders: cumulativeAdditionalHeaders, + state: default, + streaming: default); + + this.Current = TryCatch.FromResult(queryPage); + this.returnedFinalPage = true; + return true; } public static TryCatch MonadicCreate( @@ -64,14 +130,35 @@ public static TryCatch MonadicCreate( bool hasSelectValue, CosmosElement continuationToken, MonadicCreatePipelineStage monadicCreatePipelineStage) - { - return ClientAggregateQueryPipelineStage.MonadicCreate( - aggregates, - aliasToAggregateType, - orderedAliases, - hasSelectValue, - continuationToken, - monadicCreatePipelineStage); + { + if (monadicCreatePipelineStage == null) + { + throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); + } + + TryCatch tryCreateSingleGroupAggregator = SingleGroupAggregator.TryCreate( + aggregates, + aliasToAggregateType, + orderedAliases, + hasSelectValue, + continuationToken: null); + if (tryCreateSingleGroupAggregator.Failed) + { + return TryCatch.FromException(tryCreateSingleGroupAggregator.Exception); + } + + TryCatch tryCreateSource = monadicCreatePipelineStage(continuationToken); + if (tryCreateSource.Failed) + { + return tryCreateSource; + } + + AggregateQueryPipelineStage stage = new AggregateQueryPipelineStage( + tryCreateSource.Result, + tryCreateSingleGroupAggregator.Result, + hasSelectValue); + + return TryCatch.FromResult(stage); } /// @@ -113,6 +200,6 @@ public RewrittenAggregateProjections(bool isValueAggregateQuery, CosmosElement r } public CosmosElement Payload { get; } - } + } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/AverageAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/AverageAggregator.cs index 2c47fba74a..5903e7ea7e 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/AverageAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/AverageAggregator.cs @@ -80,11 +80,6 @@ public static TryCatch TryCreate(CosmosElement continuationToken) return TryCatch.FromResult(new AverageAggregator(averageInfo)); } - public CosmosElement GetCosmosElementContinuationToken() - { - return AverageInfo.ToCosmosElement(this.globalAverage); - } - /// /// Struct that stores a weighted average as a sum and count so they that average across different partitions with different numbers of documents can be taken. /// @@ -104,19 +99,6 @@ public AverageInfo(double? sum, long count) this.Count = count; } - public static CosmosElement ToCosmosElement(AverageInfo averageInfo) - { - Dictionary dictionary = new Dictionary(); - if (averageInfo.Sum.HasValue) - { - dictionary.Add(AverageInfo.SumName, CosmosNumber64.Create(averageInfo.Sum.Value)); - } - - dictionary.Add(AverageInfo.CountName, CosmosNumber64.Create(averageInfo.Count)); - - return CosmosObject.Create(dictionary); - } - /// /// Initializes a new instance of the AverageInfo class. /// diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/CountAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/CountAggregator.cs index 696e44a1b5..61137e98d3 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/CountAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/CountAggregator.cs @@ -56,11 +56,6 @@ public CosmosElement GetResult() return CosmosNumber64.Create(this.globalCount); } - public string GetContinuationToken() - { - return this.globalCount.ToString(CultureInfo.InvariantCulture); - } - public static TryCatch TryCreate(CosmosElement continuationToken) { long partialCount; @@ -82,10 +77,5 @@ public static TryCatch TryCreate(CosmosElement continuationToken) return TryCatch.FromResult( new CountAggregator(initialCount: partialCount)); } - - public CosmosElement GetCosmosElementContinuationToken() - { - return CosmosNumber64.Create(this.globalCount); - } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/IAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/IAggregator.cs index c264163845..c6933e7284 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/IAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/IAggregator.cs @@ -22,7 +22,5 @@ internal interface IAggregator /// /// The result of the aggregation. CosmosElement GetResult(); - - CosmosElement GetCosmosElementContinuationToken(); } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MakeListAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MakeListAggregator.cs index 7ab664bdcb..299b0e4507 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MakeListAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MakeListAggregator.cs @@ -61,10 +61,5 @@ public static TryCatch TryCreate(CosmosElement continuationToken) return TryCatch.FromResult(new MakeListAggregator(initialList: partialList)); } - - public CosmosElement GetCosmosElementContinuationToken() - { - return this.GetResult(); - } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MakeSetAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MakeSetAggregator.cs index 79d6a9485e..987cb81e19 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MakeSetAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MakeSetAggregator.cs @@ -41,11 +41,6 @@ public CosmosElement GetResult() return CosmosArray.Create(this.globalSet); } - public string GetContinuationToken() - { - return this.globalSet.ToString(); - } - public static TryCatch TryCreate(CosmosElement continuationToken) { CosmosArray partialSet; @@ -66,10 +61,5 @@ public static TryCatch TryCreate(CosmosElement continuationToken) return TryCatch.FromResult(new MakeSetAggregator(initialSet: partialSet)); } - - public CosmosElement GetCosmosElementContinuationToken() - { - return this.GetResult(); - } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MinMaxAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MinMaxAggregator.cs index 56dab2cb5f..ccf140e3d6 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MinMaxAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/MinMaxAggregator.cs @@ -185,30 +185,6 @@ private static bool IsPrimitve(CosmosElement cosmosElement) return cosmosElement.Accept(IsPrimitiveCosmosElementVisitor.Singleton); } - public CosmosElement GetCosmosElementContinuationToken() - { - MinMaxContinuationToken minMaxContinuationToken; - if (this.globalMinMax == ItemComparer.MinValue) - { - minMaxContinuationToken = MinMaxContinuationToken.CreateMinValueContinuationToken(); - } - else if (this.globalMinMax == ItemComparer.MaxValue) - { - minMaxContinuationToken = MinMaxContinuationToken.CreateMaxValueContinuationToken(); - } - else if (this.globalMinMax is CosmosUndefined) - { - minMaxContinuationToken = MinMaxContinuationToken.CreateUndefinedValueContinuationToken(); - } - else - { - minMaxContinuationToken = MinMaxContinuationToken.CreateValueContinuationToken(this.globalMinMax); - } - - CosmosElement minMaxContinuationTokenAsCosmosElement = MinMaxContinuationToken.ToCosmosElement(minMaxContinuationToken); - return minMaxContinuationTokenAsCosmosElement; - } - private sealed class IsPrimitiveCosmosElementVisitor : ICosmosElementVisitor { public static readonly IsPrimitiveCosmosElementVisitor Singleton = new IsPrimitiveCosmosElementVisitor(); @@ -304,48 +280,6 @@ private MinMaxContinuationToken( public MinMaxContinuationTokenType Type { get; } public CosmosElement Value { get; } - public static MinMaxContinuationToken CreateMinValueContinuationToken() - { - return new MinMaxContinuationToken(type: MinMaxContinuationTokenType.MinValue, value: null); - } - - public static MinMaxContinuationToken CreateMaxValueContinuationToken() - { - return new MinMaxContinuationToken(type: MinMaxContinuationTokenType.MaxValue, value: null); - } - - public static MinMaxContinuationToken CreateUndefinedValueContinuationToken() - { - return new MinMaxContinuationToken(type: MinMaxContinuationTokenType.Undefined, value: null); - } - - public static MinMaxContinuationToken CreateValueContinuationToken(CosmosElement value) - { - return new MinMaxContinuationToken(type: MinMaxContinuationTokenType.Value, value: value); - } - - public static CosmosElement ToCosmosElement(MinMaxContinuationToken minMaxContinuationToken) - { - if (minMaxContinuationToken == null) - { - throw new ArgumentNullException(nameof(minMaxContinuationToken)); - } - - Dictionary dictionary = new Dictionary - { - { - MinMaxContinuationToken.PropertyNames.Type, - EnumToCosmosString.ConvertEnumToCosmosString(minMaxContinuationToken.Type) - } - }; - if (minMaxContinuationToken.Value != null) - { - dictionary.Add(MinMaxContinuationToken.PropertyNames.Value, minMaxContinuationToken.Value); - } - - return CosmosObject.Create(dictionary); - } - public static TryCatch TryCreateFromCosmosElement(CosmosElement cosmosElement) { if (!(cosmosElement is CosmosObject cosmosObject)) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/SingleGroupAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/SingleGroupAggregator.cs index a9c47dacfb..3bb44a87d9 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/SingleGroupAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/SingleGroupAggregator.cs @@ -29,8 +29,6 @@ internal abstract class SingleGroupAggregator /// public abstract CosmosElement GetResult(); - public abstract CosmosElement GetCosmosElementContinuationToken(); - public static TryCatch TryCreate( IReadOnlyList aggregates, IReadOnlyDictionary aggregateAliasToAggregateType, @@ -106,11 +104,6 @@ public override CosmosElement GetResult() return this.aggregateValue.Result; } - public override CosmosElement GetCosmosElementContinuationToken() - { - return this.aggregateValue.GetCosmosElementContinuationToken(); - } - public override string ToString() { return this.aggregateValue.ToString(); @@ -152,17 +145,6 @@ public override CosmosElement GetResult() return new OrderedCosmosObject(dictionary, keys); } - public override CosmosElement GetCosmosElementContinuationToken() - { - Dictionary dictionary = new Dictionary(); - foreach (KeyValuePair kvp in this.aliasToValue) - { - dictionary.Add(kvp.Key, kvp.Value.GetCosmosElementContinuationToken()); - } - - return CosmosObject.Create(dictionary); - } - public static TryCatch TryCreate( IReadOnlyDictionary aggregateAliasToAggregateType, IReadOnlyList orderedAliases, @@ -311,8 +293,6 @@ private abstract class AggregateValue public abstract CosmosElement Result { get; } - public abstract CosmosElement GetCosmosElementContinuationToken(); - public override string ToString() { return this.Result.ToString(); @@ -350,11 +330,6 @@ public override void AddValue(CosmosElement aggregateValue) this.aggregator.Aggregate(aggregateItem.Item); } - public override CosmosElement GetCosmosElementContinuationToken() - { - return this.aggregator.GetCosmosElementContinuationToken(); - } - public static TryCatch TryCreate( AggregateOperator aggregateOperator, CosmosElement continuationToken) @@ -422,21 +397,6 @@ public override CosmosElement Result } } - public override CosmosElement GetCosmosElementContinuationToken() - { - Dictionary dictionary = new Dictionary - { - { nameof(this.initialized), CosmosBoolean.Create(this.initialized) } - }; - - if (this.value != null) - { - dictionary.Add(nameof(this.value), this.value); - } - - return CosmosObject.Create(dictionary); - } - public static TryCatch TryCreate(CosmosElement continuationToken) { CosmosElement value; diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/SumAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/SumAggregator.cs index 8a26d04ab9..c0f6ef2f00 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/SumAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/Aggregators/SumAggregator.cs @@ -69,11 +69,6 @@ public CosmosElement GetResult() return CosmosNumber64.Create(this.globalSum); } - public CosmosElement GetCosmosElementContinuationToken() - { - return CosmosNumber64.Create(this.globalSum); - } - public static TryCatch TryCreate(CosmosElement requestContinuationToken) { double partialSum; diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Client.cs deleted file mode 100644 index afdd35c0b4..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Client.cs +++ /dev/null @@ -1,116 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.DCount -{ - using System; - using System.Collections.Generic; - using System.Collections.Immutable; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.CosmosElements.Numbers; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; - using Microsoft.Azure.Cosmos.Tracing; - - internal abstract partial class DCountQueryPipelineStage : QueryPipelineStageBase - { - private sealed class ClientDCountQueryPipelineStage : DCountQueryPipelineStage - { - private ClientDCountQueryPipelineStage( - IQueryPipelineStage source, - long count, - DCountInfo info) - : base(source, count, info) - { - // all the work is done in the base constructor. - } - - public static new TryCatch MonadicCreate( - DCountInfo info, - CosmosElement continuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) - { - if (monadicCreatePipelineStage == null) - { - throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); - } - - TryCatch tryCreateSource = monadicCreatePipelineStage(continuationToken); - if (tryCreateSource.Failed) - { - return tryCreateSource; - } - - ClientDCountQueryPipelineStage stage = new ClientDCountQueryPipelineStage( - source: tryCreateSource.Result, - count: 0, - info: info); - - return TryCatch.FromResult(stage); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (this.returnedFinalPage) - { - return false; - } - - double requestCharge = 0; - IReadOnlyDictionary additionalHeaders = null; - while (await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - TryCatch tryGetPageFromSource = this.inputStage.Current; - if (tryGetPageFromSource.Failed) - { - this.Current = tryGetPageFromSource; - return true; - } - - QueryPage sourcePage = tryGetPageFromSource.Result; - - requestCharge += sourcePage.RequestCharge; - additionalHeaders = sourcePage.AdditionalHeaders; - - cancellationToken.ThrowIfCancellationRequested(); - this.count += sourcePage.Documents.Count; - } - - List finalResult = new List(); - CosmosElement aggregationResult = this.GetFinalResult(); - if (aggregationResult != null) - { - finalResult.Add(aggregationResult); - } - - QueryPage queryPage = new QueryPage( - documents: finalResult, - requestCharge: requestCharge, - activityId: default, - cosmosQueryExecutionInfo: default, - distributionPlanSpec: default, - disallowContinuationTokenMessage: default, - additionalHeaders: additionalHeaders, - state: default, - streaming: default); - - this.Current = TryCatch.FromResult(queryPage); - this.returnedFinalPage = true; - return true; - } - } - } -} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.cs index aa43a55ee6..ebe87484ee 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.cs @@ -7,16 +7,19 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.DCount using System; using System.Collections.Generic; using System.Threading; + using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.CosmosElements.Numbers; using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; + using Microsoft.Azure.Cosmos.Tracing; /// /// Stage that is able to aggregate COUNT(DISTINCT) from multiple continuations and partitions. /// - internal abstract partial class DCountQueryPipelineStage : QueryPipelineStageBase - { + internal class DCountQueryPipelineStage : QueryPipelineStageBase + { /// /// We need to keep track of whether the projection has the 'VALUE' keyword or an alias. /// @@ -44,17 +47,87 @@ public DCountQueryPipelineStage( { this.count = count; this.info = info; + } + + public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + if (trace == null) + { + throw new ArgumentNullException(nameof(trace)); + } + + if (this.returnedFinalPage) + { + return false; + } + + double requestCharge = 0; + IReadOnlyDictionary additionalHeaders = null; + while (await this.inputStage.MoveNextAsync(trace, cancellationToken)) + { + TryCatch tryGetPageFromSource = this.inputStage.Current; + if (tryGetPageFromSource.Failed) + { + this.Current = tryGetPageFromSource; + return true; + } + + QueryPage sourcePage = tryGetPageFromSource.Result; + + requestCharge += sourcePage.RequestCharge; + additionalHeaders = sourcePage.AdditionalHeaders; + + cancellationToken.ThrowIfCancellationRequested(); + this.count += sourcePage.Documents.Count; + } + + List finalResult = new List(); + CosmosElement aggregationResult = this.GetFinalResult(); + if (aggregationResult != null) + { + finalResult.Add(aggregationResult); + } + + QueryPage queryPage = new QueryPage( + documents: finalResult, + requestCharge: requestCharge, + activityId: default, + cosmosQueryExecutionInfo: default, + distributionPlanSpec: default, + disallowContinuationTokenMessage: default, + additionalHeaders: additionalHeaders, + state: default, + streaming: default); + + this.Current = TryCatch.FromResult(queryPage); + this.returnedFinalPage = true; + return true; } public static TryCatch MonadicCreate( DCountInfo info, CosmosElement continuationToken, MonadicCreatePipelineStage monadicCreatePipelineStage) - { - return ClientDCountQueryPipelineStage.MonadicCreate( - info, - continuationToken, - monadicCreatePipelineStage); + { + if (monadicCreatePipelineStage == null) + { + throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); + } + + TryCatch tryCreateSource = monadicCreatePipelineStage(continuationToken); + if (tryCreateSource.Failed) + { + return tryCreateSource; + } + + DCountQueryPipelineStage stage = new DCountQueryPipelineStage( + source: tryCreateSource.Result, + count: 0, + info: info); + + return TryCatch.FromResult(stage); } protected CosmosElement GetFinalResult() diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Client.cs deleted file mode 100644 index 4da81b6d7d..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Client.cs +++ /dev/null @@ -1,264 +0,0 @@ -//------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -//------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - using Newtonsoft.Json; - - internal abstract partial class DistinctQueryPipelineStage : QueryPipelineStageBase - { - /// - /// Client implementaiton of Distinct. Here we only serialize the continuation token if there is a matching DISTINCT. - /// - private sealed class ClientDistinctQueryPipelineStage : DistinctQueryPipelineStage - { - private static readonly string DisallowContinuationTokenMessage = "DISTINCT queries only return continuation tokens when there is a matching ORDER BY clause." + - "For example if your query is 'SELECT DISTINCT VALUE c.name FROM c', then rewrite it as 'SELECT DISTINCT VALUE c.name FROM c ORDER BY c.name'."; - - private readonly DistinctQueryType distinctQueryType; - - private ClientDistinctQueryPipelineStage( - DistinctQueryType distinctQueryType, - DistinctMap distinctMap, - IQueryPipelineStage source) - : base(distinctMap, source) - { - if ((distinctQueryType != DistinctQueryType.Unordered) && (distinctQueryType != DistinctQueryType.Ordered)) - { - throw new ArgumentException($"Unknown {nameof(DistinctQueryType)}: {distinctQueryType}."); - } - - this.distinctQueryType = distinctQueryType; - } - - public static new TryCatch MonadicCreate( - CosmosElement requestContinuation, - MonadicCreatePipelineStage monadicCreatePipelineStage, - DistinctQueryType distinctQueryType) - { - if (monadicCreatePipelineStage == null) - { - throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); - } - - DistinctContinuationToken distinctContinuationToken; - if (requestContinuation != null) - { - if (!DistinctContinuationToken.TryParse(requestContinuation, out distinctContinuationToken)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"Invalid {nameof(DistinctContinuationToken)}: {requestContinuation}")); - } - } - else - { - distinctContinuationToken = new DistinctContinuationToken( - sourceToken: null, - distinctMapToken: null); - } - - CosmosElement distinctMapToken = distinctContinuationToken.DistinctMapToken != null - ? CosmosString.Create(distinctContinuationToken.DistinctMapToken) - : null; - TryCatch tryCreateDistinctMap = DistinctMap.TryCreate( - distinctQueryType, - distinctMapToken); - if (!tryCreateDistinctMap.Succeeded) - { - return TryCatch.FromException(tryCreateDistinctMap.Exception); - } - - CosmosElement sourceToken; - if (distinctContinuationToken.SourceToken != null) - { - TryCatch tryParse = CosmosElement.Monadic.Parse(distinctContinuationToken.SourceToken); - if (tryParse.Failed) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - message: $"Invalid Source Token: {distinctContinuationToken.SourceToken}", - innerException: tryParse.Exception)); - } - - sourceToken = tryParse.Result; - } - else - { - sourceToken = null; - } - - TryCatch tryCreateSource = monadicCreatePipelineStage(sourceToken); - if (!tryCreateSource.Succeeded) - { - return TryCatch.FromException(tryCreateSource.Exception); - } - - return TryCatch.FromResult( - new ClientDistinctQueryPipelineStage( - distinctQueryType, - tryCreateDistinctMap.Result, - tryCreateSource.Result)); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (!await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - this.Current = default; - return false; - } - - TryCatch tryGetSourcePage = this.inputStage.Current; - if (tryGetSourcePage.Failed) - { - this.Current = tryGetSourcePage; - return true; - } - - QueryPage sourcePage = tryGetSourcePage.Result; - - List distinctResults = new List(); - foreach (CosmosElement document in sourcePage.Documents) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (this.distinctMap.Add(document, out UInt128 _)) - { - distinctResults.Add(document); - } - } - - // For clients we write out the continuation token if it's a streaming query. - QueryPage queryPage; - if (this.distinctQueryType == DistinctQueryType.Ordered) - { - QueryState state; - if (sourcePage.State != null) - { - string updatedContinuationToken = new DistinctContinuationToken( - sourceToken: sourcePage.State.Value.ToString(), - distinctMapToken: this.distinctMap.GetContinuationToken()).ToString(); - state = new QueryState(CosmosElement.Parse(updatedContinuationToken)); - } - else - { - state = null; - } - - queryPage = new QueryPage( - documents: distinctResults, - requestCharge: sourcePage.RequestCharge, - activityId: sourcePage.ActivityId, - cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, - distributionPlanSpec: default, - disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage, - additionalHeaders: sourcePage.AdditionalHeaders, - state: state, - streaming: sourcePage.Streaming); - } - else - { - queryPage = new QueryPage( - documents: distinctResults, - requestCharge: sourcePage.RequestCharge, - activityId: sourcePage.ActivityId, - cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, - distributionPlanSpec: default, - disallowContinuationTokenMessage: ClientDistinctQueryPipelineStage.DisallowContinuationTokenMessage, - additionalHeaders: sourcePage.AdditionalHeaders, - state: null, - streaming: sourcePage.Streaming); - } - - this.Current = TryCatch.FromResult(queryPage); - return true; - } - - /// - /// Continuation token for distinct queries. - /// - private sealed class DistinctContinuationToken - { - private static class PropertyNames - { - public const string SourceToken = "SourceToken"; - public const string DistinctMapToken = "DistinctMapToken"; - } - - public DistinctContinuationToken(string sourceToken, string distinctMapToken) - { - this.SourceToken = sourceToken; - this.DistinctMapToken = distinctMapToken; - } - - public string SourceToken { get; } - - public string DistinctMapToken { get; } - - /// - /// Tries to parse a DistinctContinuationToken from a string. - /// - /// The value to parse. - /// The output DistinctContinuationToken. - /// True if we successfully parsed the DistinctContinuationToken, else false. - public static bool TryParse( - CosmosElement cosmosElement, - out DistinctContinuationToken distinctContinuationToken) - { - if (!(cosmosElement is CosmosObject cosmosObject)) - { - distinctContinuationToken = default; - return false; - } - - if (!cosmosObject.TryGetValue( - DistinctContinuationToken.PropertyNames.SourceToken, - out CosmosString sourceToken)) - { - distinctContinuationToken = default; - return false; - } - - if (!cosmosObject.TryGetValue( - DistinctContinuationToken.PropertyNames.DistinctMapToken, - out CosmosString distinctMapToken)) - { - distinctContinuationToken = default; - return false; - } - - distinctContinuationToken = new DistinctContinuationToken(sourceToken.Value, distinctMapToken.Value); - return true; - } - - /// - /// Gets the serialized form of DistinctContinuationToken - /// - /// The serialized form of DistinctContinuationToken - public override string ToString() - { - return JsonConvert.SerializeObject(this); - } - } - } - } -} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.cs index 9bf8b601da..6e50bf0727 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.cs @@ -1,48 +1,264 @@ -// ------------------------------------------------------------ +//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ +//------------------------------------------------------------ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct { using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core.Exceptions; using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; + using Microsoft.Azure.Cosmos.Tracing; + using Newtonsoft.Json; - /// - /// Distinct queries return documents that are distinct with a page. - /// This means that documents are not guaranteed to be distinct across continuations and partitions. - /// The reasoning for this is because the backend treats each continuation of a query as a separate request - /// and partitions are not aware of each other. - /// The solution is that the client keeps a running hash set of all the documents it has already seen, - /// so that when it encounters a duplicate document from another continuation it will not be emitted to the user. - /// The only problem is that if the user chooses to go through the continuation token API for DocumentQuery instead - /// of while(HasMoreResults) ExecuteNextAsync, then will see duplicates across continuations. - /// There is no workaround for that use case, since the continuation token will have to include all the documents seen. - /// - internal abstract partial class DistinctQueryPipelineStage : QueryPipelineStageBase - { + internal class DistinctQueryPipelineStage : QueryPipelineStageBase + { + private static readonly string DisallowContinuationTokenMessage = "DISTINCT queries only return continuation tokens when there is a matching ORDER BY clause." + + "For example if your query is 'SELECT DISTINCT VALUE c.name FROM c', then rewrite it as 'SELECT DISTINCT VALUE c.name FROM c ORDER BY c.name'."; + + private readonly DistinctQueryType distinctQueryType; + /// /// An DistinctMap that efficiently stores the documents that we have already seen. /// private readonly DistinctMap distinctMap; - protected DistinctQueryPipelineStage( + private DistinctQueryPipelineStage( + DistinctQueryType distinctQueryType, DistinctMap distinctMap, IQueryPipelineStage source) : base(source) { + if ((distinctQueryType != DistinctQueryType.Unordered) && (distinctQueryType != DistinctQueryType.Ordered)) + { + throw new ArgumentException($"Unknown {nameof(DistinctQueryType)}: {distinctQueryType}."); + } + + this.distinctQueryType = distinctQueryType; this.distinctMap = distinctMap ?? throw new ArgumentNullException(nameof(distinctMap)); } + public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + if (trace == null) + { + throw new ArgumentNullException(nameof(trace)); + } + + if (!await this.inputStage.MoveNextAsync(trace, cancellationToken)) + { + this.Current = default; + return false; + } + + TryCatch tryGetSourcePage = this.inputStage.Current; + if (tryGetSourcePage.Failed) + { + this.Current = tryGetSourcePage; + return true; + } + + QueryPage sourcePage = tryGetSourcePage.Result; + + List distinctResults = new List(); + foreach (CosmosElement document in sourcePage.Documents) + { + cancellationToken.ThrowIfCancellationRequested(); + + if (this.distinctMap.Add(document, out UInt128 _)) + { + distinctResults.Add(document); + } + } + + // For clients we write out the continuation token if it's a streaming query. + QueryPage queryPage; + if (this.distinctQueryType == DistinctQueryType.Ordered) + { + QueryState state; + if (sourcePage.State != null) + { + string updatedContinuationToken = new DistinctContinuationToken( + sourceToken: sourcePage.State.Value.ToString(), + distinctMapToken: this.distinctMap.GetContinuationToken()).ToString(); + state = new QueryState(CosmosElement.Parse(updatedContinuationToken)); + } + else + { + state = null; + } + + queryPage = new QueryPage( + documents: distinctResults, + requestCharge: sourcePage.RequestCharge, + activityId: sourcePage.ActivityId, + cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, + distributionPlanSpec: default, + disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage, + additionalHeaders: sourcePage.AdditionalHeaders, + state: state, + streaming: sourcePage.Streaming); + } + else + { + queryPage = new QueryPage( + documents: distinctResults, + requestCharge: sourcePage.RequestCharge, + activityId: sourcePage.ActivityId, + cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, + distributionPlanSpec: default, + disallowContinuationTokenMessage: DistinctQueryPipelineStage.DisallowContinuationTokenMessage, + additionalHeaders: sourcePage.AdditionalHeaders, + state: null, + streaming: sourcePage.Streaming); + } + + this.Current = TryCatch.FromResult(queryPage); + return true; + } + public static TryCatch MonadicCreate( CosmosElement requestContinuation, MonadicCreatePipelineStage monadicCreatePipelineStage, DistinctQueryType distinctQueryType) - { - return ClientDistinctQueryPipelineStage.MonadicCreate( - requestContinuation, - monadicCreatePipelineStage, - distinctQueryType); + { + if (monadicCreatePipelineStage == null) + { + throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); + } + + DistinctContinuationToken distinctContinuationToken; + if (requestContinuation != null) + { + if (!DistinctContinuationToken.TryParse(requestContinuation, out distinctContinuationToken)) + { + return TryCatch.FromException( + new MalformedContinuationTokenException( + $"Invalid {nameof(DistinctContinuationToken)}: {requestContinuation}")); + } + } + else + { + distinctContinuationToken = new DistinctContinuationToken( + sourceToken: null, + distinctMapToken: null); + } + + CosmosElement distinctMapToken = distinctContinuationToken.DistinctMapToken != null + ? CosmosString.Create(distinctContinuationToken.DistinctMapToken) + : null; + TryCatch tryCreateDistinctMap = DistinctMap.TryCreate( + distinctQueryType, + distinctMapToken); + if (!tryCreateDistinctMap.Succeeded) + { + return TryCatch.FromException(tryCreateDistinctMap.Exception); + } + + CosmosElement sourceToken; + if (distinctContinuationToken.SourceToken != null) + { + TryCatch tryParse = CosmosElement.Monadic.Parse(distinctContinuationToken.SourceToken); + if (tryParse.Failed) + { + return TryCatch.FromException( + new MalformedContinuationTokenException( + message: $"Invalid Source Token: {distinctContinuationToken.SourceToken}", + innerException: tryParse.Exception)); + } + + sourceToken = tryParse.Result; + } + else + { + sourceToken = null; + } + + TryCatch tryCreateSource = monadicCreatePipelineStage(sourceToken); + if (!tryCreateSource.Succeeded) + { + return TryCatch.FromException(tryCreateSource.Exception); + } + + return TryCatch.FromResult( + new DistinctQueryPipelineStage( + distinctQueryType, + tryCreateDistinctMap.Result, + tryCreateSource.Result)); + } + + /// + /// Continuation token for distinct queries. + /// + private sealed class DistinctContinuationToken + { + private static class PropertyNames + { + public const string SourceToken = "SourceToken"; + public const string DistinctMapToken = "DistinctMapToken"; + } + + public DistinctContinuationToken(string sourceToken, string distinctMapToken) + { + this.SourceToken = sourceToken; + this.DistinctMapToken = distinctMapToken; + } + + public string SourceToken { get; } + + public string DistinctMapToken { get; } + + /// + /// Tries to parse a DistinctContinuationToken from a string. + /// + /// The value to parse. + /// The output DistinctContinuationToken. + /// True if we successfully parsed the DistinctContinuationToken, else false. + public static bool TryParse( + CosmosElement cosmosElement, + out DistinctContinuationToken distinctContinuationToken) + { + if (!(cosmosElement is CosmosObject cosmosObject)) + { + distinctContinuationToken = default; + return false; + } + + if (!cosmosObject.TryGetValue( + DistinctContinuationToken.PropertyNames.SourceToken, + out CosmosString sourceToken)) + { + distinctContinuationToken = default; + return false; + } + + if (!cosmosObject.TryGetValue( + DistinctContinuationToken.PropertyNames.DistinctMapToken, + out CosmosString distinctMapToken)) + { + distinctContinuationToken = default; + return false; + } + + distinctContinuationToken = new DistinctContinuationToken(sourceToken.Value, distinctMapToken.Value); + return true; + } + + /// + /// Gets the serialized form of DistinctContinuationToken + /// + /// The serialized form of DistinctContinuationToken + public override string ToString() + { + return JsonConvert.SerializeObject(this); + } } } -} +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Client.cs deleted file mode 100644 index f469472a3f..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Client.cs +++ /dev/null @@ -1,129 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.GroupBy -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - - internal abstract partial class GroupByQueryPipelineStage : QueryPipelineStageBase - { - private sealed class ClientGroupByQueryPipelineStage : GroupByQueryPipelineStage - { - public const string ContinuationTokenNotSupportedWithGroupBy = "Continuation token is not supported for queries with GROUP BY. Do not use FeedResponse.ResponseContinuation or remove the GROUP BY from the query."; - private ClientGroupByQueryPipelineStage( - IQueryPipelineStage source, - GroupingTable groupingTable, - int pageSize) - : base(source, groupingTable, pageSize) - { - } - - public static new TryCatch MonadicCreate( - CosmosElement requestContinuation, - MonadicCreatePipelineStage monadicCreatePipelineStage, - IReadOnlyList aggregates, - IReadOnlyDictionary groupByAliasToAggregateType, - IReadOnlyList orderedAliases, - bool hasSelectValue, - int pageSize) - { - TryCatch tryCreateGroupingTable = GroupingTable.TryCreateFromContinuationToken( - aggregates, - groupByAliasToAggregateType, - orderedAliases, - hasSelectValue, - continuationToken: null); - - if (tryCreateGroupingTable.Failed) - { - return TryCatch.FromException(tryCreateGroupingTable.Exception); - } - - TryCatch tryCreateSource = monadicCreatePipelineStage(requestContinuation); - if (tryCreateSource.Failed) - { - return tryCreateSource; - } - - IQueryPipelineStage stage = new ClientGroupByQueryPipelineStage( - tryCreateSource.Result, - tryCreateGroupingTable.Result, - pageSize); - - return TryCatch.FromResult(stage); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (this.returnedLastPage) - { - this.Current = default; - return false; - } - - // Draining GROUP BY is broken down into two stages: - - double requestCharge = 0.0; - IReadOnlyDictionary addtionalHeaders = null; - - while (await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - cancellationToken.ThrowIfCancellationRequested(); - - // Stage 1: - // Drain the groupings fully from all continuation and all partitions - TryCatch tryGetSourcePage = this.inputStage.Current; - if (tryGetSourcePage.Failed) - { - this.Current = tryGetSourcePage; - return true; - } - - QueryPage sourcePage = tryGetSourcePage.Result; - - requestCharge += sourcePage.RequestCharge; - addtionalHeaders = sourcePage.AdditionalHeaders; - this.AggregateGroupings(sourcePage.Documents); - } - - // Stage 2: - // Emit the results from the grouping table page by page - IReadOnlyList results = this.groupingTable.Drain(this.pageSize); - if (this.groupingTable.Count == 0) - { - this.returnedLastPage = true; - } - - QueryPage queryPage = new QueryPage( - documents: results, - requestCharge: requestCharge, - activityId: default, - cosmosQueryExecutionInfo: default, - distributionPlanSpec: default, - disallowContinuationTokenMessage: ClientGroupByQueryPipelineStage.ContinuationTokenNotSupportedWithGroupBy, - additionalHeaders: addtionalHeaders, - state: default, - streaming: null); - - this.Current = TryCatch.FromResult(queryPage); - return true; - } - } - } -} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.cs index 7368239429..98ca73215f 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.cs @@ -12,12 +12,12 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.GroupBy using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct; - + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; + /// /// Query execution component that groups groupings across continuations and pages. /// The general idea is a query gets rewritten from this: @@ -45,8 +45,10 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.GroupBy /// So we know how to aggregate each column. /// At the end the columns are stitched together to make the grouped document. /// - internal abstract partial class GroupByQueryPipelineStage : QueryPipelineStageBase - { + internal class GroupByQueryPipelineStage : QueryPipelineStageBase + { + private const string ContinuationTokenNotSupportedWithGroupBy = "Continuation token is not supported for queries with GROUP BY. Do not use FeedResponse.ResponseContinuation or remove the GROUP BY from the query."; + private readonly GroupingTable groupingTable; protected readonly int pageSize; protected bool returnedLastPage; @@ -62,22 +64,38 @@ protected GroupByQueryPipelineStage( } public static TryCatch MonadicCreate( - CosmosElement continuationToken, + CosmosElement requestContinuation, MonadicCreatePipelineStage monadicCreatePipelineStage, IReadOnlyList aggregates, IReadOnlyDictionary groupByAliasToAggregateType, IReadOnlyList orderedAliases, bool hasSelectValue, - int pageSize) - { - return ClientGroupByQueryPipelineStage.MonadicCreate( - continuationToken, - monadicCreatePipelineStage, + int pageSize) + { + TryCatch tryCreateGroupingTable = GroupingTable.TryCreateFromContinuationToken( aggregates, groupByAliasToAggregateType, orderedAliases, hasSelectValue, - pageSize); + continuationToken: null); + + if (tryCreateGroupingTable.Failed) + { + return TryCatch.FromException(tryCreateGroupingTable.Exception); + } + + TryCatch tryCreateSource = monadicCreatePipelineStage(requestContinuation); + if (tryCreateSource.Failed) + { + return tryCreateSource; + } + + IQueryPipelineStage stage = new GroupByQueryPipelineStage( + tryCreateSource.Result, + tryCreateGroupingTable.Result, + pageSize); + + return TryCatch.FromResult(stage); } protected void AggregateGroupings(IReadOnlyList cosmosElements) @@ -90,6 +108,69 @@ protected void AggregateGroupings(IReadOnlyList cosmosElements) } } + public override async ValueTask MoveNextAsync(Tracing.ITrace trace, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + if (trace == null) + { + throw new ArgumentNullException(nameof(trace)); + } + + if (this.returnedLastPage) + { + this.Current = default; + return false; + } + + // Draining GROUP BY is broken down into two stages: + + double requestCharge = 0.0; + IReadOnlyDictionary addtionalHeaders = null; + + while (await this.inputStage.MoveNextAsync(trace, cancellationToken)) + { + cancellationToken.ThrowIfCancellationRequested(); + + // Stage 1: + // Drain the groupings fully from all continuation and all partitions + TryCatch tryGetSourcePage = this.inputStage.Current; + if (tryGetSourcePage.Failed) + { + this.Current = tryGetSourcePage; + return true; + } + + QueryPage sourcePage = tryGetSourcePage.Result; + + requestCharge += sourcePage.RequestCharge; + addtionalHeaders = sourcePage.AdditionalHeaders; + this.AggregateGroupings(sourcePage.Documents); + } + + // Stage 2: + // Emit the results from the grouping table page by page + IReadOnlyList results = this.groupingTable.Drain(this.pageSize); + if (this.groupingTable.Count == 0) + { + this.returnedLastPage = true; + } + + QueryPage queryPage = new QueryPage( + documents: results, + requestCharge: requestCharge, + activityId: default, + cosmosQueryExecutionInfo: default, + distributionPlanSpec: default, + disallowContinuationTokenMessage: GroupByQueryPipelineStage.ContinuationTokenNotSupportedWithGroupBy, + additionalHeaders: addtionalHeaders, + state: default, + streaming: null); + + this.Current = TryCatch.FromResult(queryPage); + return true; + } + /// /// When a group by query gets rewritten the projection looks like: /// @@ -228,17 +309,6 @@ public IReadOnlyList Drain(int maxItemCount) return results; } - public CosmosElement GetCosmosElementContinuationToken() - { - Dictionary dictionary = new Dictionary(); - foreach (KeyValuePair kvp in this.table) - { - dictionary.Add(kvp.Key.ToString(), kvp.Value.GetCosmosElementContinuationToken()); - } - - return CosmosObject.Create(dictionary); - } - public IEnumerator> GetEnumerator => this.table.GetEnumerator(); public static TryCatch TryCreateFromContinuationToken( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/GroupByQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/GroupByQueryPipelineStageTests.cs index c521e2d28e..1cf322bd6c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/GroupByQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/GroupByQueryPipelineStageTests.cs @@ -54,7 +54,7 @@ private static async Task> CreateAndDrainAsync( IQueryPipelineStage source = new MockQueryPipelineStage(pages); TryCatch tryCreateGroupByStage = GroupByQueryPipelineStage.MonadicCreate( - continuationToken: continuationToken, + requestContinuation: continuationToken, monadicCreatePipelineStage: (CosmosElement continuationToken) => TryCatch.FromResult(source), aggregates: new AggregateOperator[] { }, groupByAliasToAggregateType: groupByAliasToAggregateType,