Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Consolidate the pipeline stage classes
  • Loading branch information
neildsh committed Jul 24, 2024
commit dc59f895db378aea4a476d668c99472aa8f2c984
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,99 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate
using static IndexUtilizationHelper;

internal abstract partial class AggregateQueryPipelineStage : QueryPipelineStageBase
{
{
/// <summary>
/// This class does most of the work, since a query like:
///
/// SELECT VALUE AVG(c.age)
/// FROM c
///
/// is really just an aggregation on a single grouping (the whole collection).
/// </summary>
private readonly SingleGroupAggregator singleGroupAggregator;

/// <summary>
/// We need to keep track of whether the projection has the 'VALUE' keyword.
/// </summary>
private readonly bool isValueQuery;

protected bool returnedFinalPage;

/// <summary>
/// Initializes a new instance of the AggregateDocumentQueryExecutionComponent class.
/// </summary>
/// <param name="source">The source component that will supply the local aggregates from multiple continuations and partitions.</param>
/// <param name="singleGroupAggregator">The single group aggregator that we will feed results into.</param>
/// <param name="isValueQuery">Whether or not the query has the 'VALUE' keyword.</param>
/// <remarks>This constructor is private since there is some async initialization that needs to happen in CreateAsync().</remarks>
public AggregateQueryPipelineStage(
IQueryPipelineStage source,
SingleGroupAggregator singleGroupAggregator,
bool isValueQuery)
: base(source)
{
this.singleGroupAggregator = singleGroupAggregator ?? throw new ArgumentNullException(nameof(singleGroupAggregator));
this.isValueQuery = isValueQuery;
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
IReadOnlyList<AggregateOperator> aggregates,
IReadOnlyDictionary<string, AggregateOperator?> aliasToAggregateType,
IReadOnlyList<string> orderedAliases,
bool hasSelectValue,
CosmosElement continuationToken,
MonadicCreatePipelineStage monadicCreatePipelineStage)
{
return ClientAggregateQueryPipelineStage.MonadicCreate(
aggregates,
aliasToAggregateType,
orderedAliases,
hasSelectValue,
continuationToken,
monadicCreatePipelineStage);
}

/// <summary>
/// Struct for getting the payload out of the rewritten projection.
/// </summary>
private readonly struct RewrittenAggregateProjections
{
public RewrittenAggregateProjections(bool isValueAggregateQuery, CosmosElement raw)
{
if (raw == null)
{
throw new ArgumentNullException(nameof(raw));
}

if (isValueAggregateQuery)
{
// SELECT VALUE [{"item": {"sum": SUM(c.blah), "count": COUNT(c.blah)}}]
if (!(raw is CosmosArray aggregates))
{
throw new ArgumentException($"{nameof(RewrittenAggregateProjections)} was not an array for a value aggregate query. Type is: {raw.GetType()}");
}

this.Payload = aggregates[0];
}
else
{
if (!(raw is CosmosObject cosmosObject))
{
throw new ArgumentException($"{nameof(raw)} must not be an object.");
}

if (!cosmosObject.TryGetValue("payload", out CosmosElement cosmosPayload))
{
throw new InvalidOperationException($"Underlying object does not have an 'payload' field.");
}

this.Payload = cosmosPayload ?? throw new ArgumentException($"{nameof(RewrittenAggregateProjections)} does not have a 'payload' property.");
}
}

public CosmosElement Payload { get; }
}

private sealed class ClientAggregateQueryPipelineStage : AggregateQueryPipelineStage
{
private ClientAggregateQueryPipelineStage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,96 +23,5 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate
/// </summary>
internal abstract partial class AggregateQueryPipelineStage : QueryPipelineStageBase
{
/// <summary>
/// This class does most of the work, since a query like:
///
/// SELECT VALUE AVG(c.age)
/// FROM c
///
/// is really just an aggregation on a single grouping (the whole collection).
/// </summary>
private readonly SingleGroupAggregator singleGroupAggregator;

/// <summary>
/// We need to keep track of whether the projection has the 'VALUE' keyword.
/// </summary>
private readonly bool isValueQuery;

protected bool returnedFinalPage;

/// <summary>
/// Initializes a new instance of the AggregateDocumentQueryExecutionComponent class.
/// </summary>
/// <param name="source">The source component that will supply the local aggregates from multiple continuations and partitions.</param>
/// <param name="singleGroupAggregator">The single group aggregator that we will feed results into.</param>
/// <param name="isValueQuery">Whether or not the query has the 'VALUE' keyword.</param>
/// <remarks>This constructor is private since there is some async initialization that needs to happen in CreateAsync().</remarks>
public AggregateQueryPipelineStage(
IQueryPipelineStage source,
SingleGroupAggregator singleGroupAggregator,
bool isValueQuery)
: base(source)
{
this.singleGroupAggregator = singleGroupAggregator ?? throw new ArgumentNullException(nameof(singleGroupAggregator));
this.isValueQuery = isValueQuery;
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
IReadOnlyList<AggregateOperator> aggregates,
IReadOnlyDictionary<string, AggregateOperator?> aliasToAggregateType,
IReadOnlyList<string> orderedAliases,
bool hasSelectValue,
CosmosElement continuationToken,
MonadicCreatePipelineStage monadicCreatePipelineStage)
{
return ClientAggregateQueryPipelineStage.MonadicCreate(
aggregates,
aliasToAggregateType,
orderedAliases,
hasSelectValue,
continuationToken,
monadicCreatePipelineStage);
}

/// <summary>
/// Struct for getting the payload out of the rewritten projection.
/// </summary>
private readonly struct RewrittenAggregateProjections
{
public RewrittenAggregateProjections(bool isValueAggregateQuery, CosmosElement raw)
{
if (raw == null)
{
throw new ArgumentNullException(nameof(raw));
}

if (isValueAggregateQuery)
{
// SELECT VALUE [{"item": {"sum": SUM(c.blah), "count": COUNT(c.blah)}}]
if (!(raw is CosmosArray aggregates))
{
throw new ArgumentException($"{nameof(RewrittenAggregateProjections)} was not an array for a value aggregate query. Type is: {raw.GetType()}");
}

this.Payload = aggregates[0];
}
else
{
if (!(raw is CosmosObject cosmosObject))
{
throw new ArgumentException($"{nameof(raw)} must not be an object.");
}

if (!cosmosObject.TryGetValue("payload", out CosmosElement cosmosPayload))
{
throw new InvalidOperationException($"Underlying object does not have an 'payload' field.");
}

this.Payload = cosmosPayload ?? throw new ArgumentException($"{nameof(RewrittenAggregateProjections)} does not have a 'payload' property.");
}
}

public CosmosElement Payload { get; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,70 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.DCount
{
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.CosmosElements.Numbers;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Tracing;

/// <summary>
/// Stage that is able to aggregate COUNT(DISTINCT) from multiple continuations and partitions.
/// </summary>
internal abstract partial class DCountQueryPipelineStage : QueryPipelineStageBase
{
{
/// <summary>
/// We need to keep track of whether the projection has the 'VALUE' keyword or an alias.
/// </summary>
private readonly DCountInfo info;

/// <summary>
/// This job of this class is to just keep a count.
/// </summary>
private long count;

protected bool returnedFinalPage;

/// <summary>
/// Initializes a new instance of the DCountQueryPipelineStage class.
/// </summary>
/// <param name="source">The source component that will supply the local aggregates from multiple continuations and partitions.</param>
/// <param name="count">The actual dcount that will be reported.</param>
/// <param name="info">Metadata about the original dcount query that is elided in the rewritten query</param>
/// <remarks>This constructor is private since there is some async initialization that needs to happen in CreateAsync().</remarks>
public DCountQueryPipelineStage(
IQueryPipelineStage source,
long count,
DCountInfo info)
: base(source)
{
this.count = count;
this.info = info;
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
DCountInfo info,
CosmosElement continuationToken,
MonadicCreatePipelineStage monadicCreatePipelineStage)
{
return ClientDCountQueryPipelineStage.MonadicCreate(
info,
continuationToken,
monadicCreatePipelineStage);
}

protected CosmosElement GetFinalResult()
{
return this.info.IsValueAggregate ?
CosmosNumber64.Create(this.count) as CosmosElement :
CosmosObject.Create(new Dictionary<string, CosmosElement>
{
{ this.info.DCountAlias, CosmosNumber64.Create(this.count) }
});
}

private sealed class ClientDCountQueryPipelineStage : DCountQueryPipelineStage
{
private ClientDCountQueryPipelineStage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,59 +12,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.DCount
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;

/// <summary>
/// Stage that is able to aggregate COUNT(DISTINCT) from multiple continuations and partitions.
/// </summary>
internal abstract partial class DCountQueryPipelineStage : QueryPipelineStageBase
{
/// <summary>
/// We need to keep track of whether the projection has the 'VALUE' keyword or an alias.
/// </summary>
private readonly DCountInfo info;

/// <summary>
/// This job of this class is to just keep a count.
/// </summary>
private long count;

protected bool returnedFinalPage;

/// <summary>
/// Initializes a new instance of the DCountQueryPipelineStage class.
/// </summary>
/// <param name="source">The source component that will supply the local aggregates from multiple continuations and partitions.</param>
/// <param name="count">The actual dcount that will be reported.</param>
/// <param name="info">Metadata about the original dcount query that is elided in the rewritten query</param>
/// <remarks>This constructor is private since there is some async initialization that needs to happen in CreateAsync().</remarks>
public DCountQueryPipelineStage(
IQueryPipelineStage source,
long count,
DCountInfo info)
: base(source)
{
this.count = count;
this.info = info;
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
DCountInfo info,
CosmosElement continuationToken,
MonadicCreatePipelineStage monadicCreatePipelineStage)
{
return ClientDCountQueryPipelineStage.MonadicCreate(
info,
continuationToken,
monadicCreatePipelineStage);
}

protected CosmosElement GetFinalResult()
{
return this.info.IsValueAggregate ?
CosmosNumber64.Create(this.count) as CosmosElement :
CosmosObject.Create(new Dictionary<string, CosmosElement>
{
{ this.info.DCountAlias, CosmosNumber64.Create(this.count) }
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,31 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct
using Newtonsoft.Json;

internal abstract partial class DistinctQueryPipelineStage : QueryPipelineStageBase
{
{
/// <summary>
/// An DistinctMap that efficiently stores the documents that we have already seen.
/// </summary>
private readonly DistinctMap distinctMap;

protected DistinctQueryPipelineStage(
DistinctMap distinctMap,
IQueryPipelineStage source)
: base(source)
{
this.distinctMap = distinctMap ?? throw new ArgumentNullException(nameof(distinctMap));
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
CosmosElement requestContinuation,
MonadicCreatePipelineStage monadicCreatePipelineStage,
DistinctQueryType distinctQueryType)
{
return ClientDistinctQueryPipelineStage.MonadicCreate(
requestContinuation,
monadicCreatePipelineStage,
distinctQueryType);
}

/// <summary>
/// Client implementaiton of Distinct. Here we only serialize the continuation token if there is a matching DISTINCT.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,5 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct
/// </summary>
internal abstract partial class DistinctQueryPipelineStage : QueryPipelineStageBase
{
/// <summary>
/// An DistinctMap that efficiently stores the documents that we have already seen.
/// </summary>
private readonly DistinctMap distinctMap;

protected DistinctQueryPipelineStage(
DistinctMap distinctMap,
IQueryPipelineStage source)
: base(source)
{
this.distinctMap = distinctMap ?? throw new ArgumentNullException(nameof(distinctMap));
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
CosmosElement requestContinuation,
MonadicCreatePipelineStage monadicCreatePipelineStage,
DistinctQueryType distinctQueryType)
{
return ClientDistinctQueryPipelineStage.MonadicCreate(
requestContinuation,
monadicCreatePipelineStage,
distinctQueryType);
}
}
}
Loading