Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ public static TryCatch<AggregateValue> TryCreate(
tryCreateAggregator = AverageAggregator.TryCreate(continuationToken);
break;

case AggregateOperator.Count:
case AggregateOperator.Count:
case AggregateOperator.CountIf:
tryCreateAggregator = CountAggregator.TryCreate(continuationToken);
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ internal static class CosmosQueryExecutionContextFactory
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
private const string Passthrough = "Passthrough";
private const string Specialized = "Specialized";
private const int PageSizeFactorForTop = 5;
private static readonly Regex QueryInspectionRegex = new Regex(QueryInspectionPattern, RegexOptions.IgnoreCase | RegexOptions.Compiled);

public static IQueryPipelineStage Create(
Expand Down Expand Up @@ -293,21 +292,31 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
}
else
{
bool singleLogicalPartitionKeyQuery = (inputParameters.PartitionKey.HasValue && targetRanges.Count == 1)
|| ((partitionedQueryExecutionInfo.QueryRanges.Count == 1)
&& partitionedQueryExecutionInfo.QueryRanges[0].IsSingleValue);
bool serverStreamingQuery = !partitionedQueryExecutionInfo.QueryInfo.HasAggregates
&& !partitionedQueryExecutionInfo.QueryInfo.HasDistinct
&& !partitionedQueryExecutionInfo.QueryInfo.HasGroupBy;
bool streamingSinglePartitionQuery = singleLogicalPartitionKeyQuery && serverStreamingQuery;

bool clientStreamingQuery = serverStreamingQuery
&& !partitionedQueryExecutionInfo.QueryInfo.HasOrderBy
&& !partitionedQueryExecutionInfo.QueryInfo.HasTop
&& !partitionedQueryExecutionInfo.QueryInfo.HasLimit
&& !partitionedQueryExecutionInfo.QueryInfo.HasOffset;
bool streamingCrossContinuationQuery = !singleLogicalPartitionKeyQuery && clientStreamingQuery;
bool createPassthroughQuery = streamingSinglePartitionQuery || streamingCrossContinuationQuery;
bool hybridSearchQuery = partitionedQueryExecutionInfo.HybridSearchQueryInfo != null;

bool createPassthroughQuery;
if (hybridSearchQuery)
{
createPassthroughQuery = false;
}
else
{
bool singleLogicalPartitionKeyQuery = (inputParameters.PartitionKey.HasValue && targetRanges.Count == 1)
|| ((partitionedQueryExecutionInfo.QueryRanges.Count == 1)
&& partitionedQueryExecutionInfo.QueryRanges[0].IsSingleValue);
bool serverStreamingQuery = !partitionedQueryExecutionInfo.QueryInfo.HasAggregates
&& !partitionedQueryExecutionInfo.QueryInfo.HasDistinct
&& !partitionedQueryExecutionInfo.QueryInfo.HasGroupBy;
bool streamingSinglePartitionQuery = singleLogicalPartitionKeyQuery && serverStreamingQuery;

bool clientStreamingQuery = serverStreamingQuery
&& !partitionedQueryExecutionInfo.QueryInfo.HasOrderBy
&& !partitionedQueryExecutionInfo.QueryInfo.HasTop
&& !partitionedQueryExecutionInfo.QueryInfo.HasLimit
&& !partitionedQueryExecutionInfo.QueryInfo.HasOffset;
bool streamingCrossContinuationQuery = !singleLogicalPartitionKeyQuery && clientStreamingQuery;
createPassthroughQuery = streamingSinglePartitionQuery || streamingCrossContinuationQuery;
}

if (createPassthroughQuery)
{
Expand All @@ -321,7 +330,21 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
}
else
{
tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(documentContainer, cosmosQueryContext, inputParameters, targetRanges, containerQueryProperties, partitionedQueryExecutionInfo);
List<Documents.PartitionKeyRange> allRanges = await cosmosQueryContext.QueryClient.GetTargetPartitionKeyRangesAsync(
cosmosQueryContext.ResourceLink,
containerQueryProperties.ResourceId,
new List<Documents.Routing.Range<string>> { FeedRangeEpk.FullRange.Range },
forceRefresh: false,
trace);

tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(
documentContainer,
cosmosQueryContext,
inputParameters,
targetRanges,
containerQueryProperties,
partitionedQueryExecutionInfo,
allRanges);
}
}

Expand Down Expand Up @@ -376,13 +399,21 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSinglePartitio
targetRange
};

List<Documents.PartitionKeyRange> allRanges = await cosmosQueryContext.QueryClient.GetTargetPartitionKeyRangesAsync(
cosmosQueryContext.ResourceLink,
containerQueryProperties.ResourceId,
new List<Documents.Routing.Range<string>> { FeedRangeEpk.FullRange.Range },
forceRefresh: false,
trace);

tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(
documentContainer,
cosmosQueryContext,
inputParameters,
targetRanges,
containerQueryProperties,
partitionedQueryExecutionInfo);
partitionedQueryExecutionInfo,
allRanges);
}
else
{
Expand All @@ -403,45 +434,21 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
DocumentContainer documentContainer,
CosmosQueryContext cosmosQueryContext,
InputParameters inputParameters,
List<Documents.PartitionKeyRange> targetRanges,
IReadOnlyList<Documents.PartitionKeyRange> targetRanges,
ContainerQueryProperties containerQueryProperties,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo)
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
IReadOnlyList<Documents.PartitionKeyRange> allRanges)
{
SetTestInjectionPipelineType(inputParameters, Specialized);

if (!string.IsNullOrEmpty(partitionedQueryExecutionInfo.QueryInfo.RewrittenQuery))
{
// We need pass down the rewritten query.
SqlQuerySpec rewrittenQuerySpec = new SqlQuerySpec()
{
QueryText = partitionedQueryExecutionInfo.QueryInfo.RewrittenQuery,
Parameters = inputParameters.SqlQuerySpec.Parameters
};

inputParameters = InputParameters.Create(
rewrittenQuerySpec,
inputParameters.InitialUserContinuationToken,
inputParameters.InitialFeedRange,
inputParameters.MaxConcurrency,
inputParameters.MaxItemCount,
inputParameters.MaxBufferedItemCount,
inputParameters.PartitionKey,
inputParameters.Properties,
inputParameters.PartitionedQueryExecutionInfo,
inputParameters.ReturnResultsInDeterministicOrder,
inputParameters.EnableOptimisticDirectExecution,
inputParameters.IsNonStreamingOrderByQueryFeatureDisabled,
inputParameters.EnableDistributedQueryGatewayMode,
inputParameters.TestInjections);
}

return TryCreateSpecializedDocumentQueryExecutionContext(
documentContainer,
cosmosQueryContext,
inputParameters,
partitionedQueryExecutionInfo,
targetRanges,
containerQueryProperties);
containerQueryProperties,
allRanges);
}

private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSpecializedDocumentQueryExecutionContextAsync(
Expand All @@ -468,13 +475,21 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSpecializedDoc
inputParameters.InitialFeedRange,
trace);

List<Documents.PartitionKeyRange> allRanges = await cosmosQueryContext.QueryClient.GetTargetPartitionKeyRangesAsync(
cosmosQueryContext.ResourceLink,
containerQueryProperties.ResourceId,
new List<Documents.Routing.Range<string>> { FeedRangeEpk.FullRange.Range },
forceRefresh: false,
trace);

return TryCreateSpecializedDocumentQueryExecutionContext(
documentContainer,
cosmosQueryContext,
inputParameters,
targetRanges,
containerQueryProperties,
partitionedQueryExecutionInfo);
partitionedQueryExecutionInfo,
allRanges);
}

private static TryCatch<IQueryPipelineStage> TryCreateOptimisticDirectExecutionContext(
Expand Down Expand Up @@ -535,66 +550,39 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
CosmosQueryContext cosmosQueryContext,
InputParameters inputParameters,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
List<Documents.PartitionKeyRange> targetRanges,
ContainerQueryProperties containerQueryProperties)
IReadOnlyList<Documents.PartitionKeyRange> targetRanges,
ContainerQueryProperties containerQueryProperties,
IReadOnlyList<Documents.PartitionKeyRange> allRanges)
{
QueryInfo queryInfo = partitionedQueryExecutionInfo.QueryInfo;

// We need to compute the optimal initial page size for order-by queries
long optimalPageSize = inputParameters.MaxItemCount;
if (queryInfo.HasOrderBy)
{
int top;
if (queryInfo.HasTop && (partitionedQueryExecutionInfo.QueryInfo.Top.Value > 0))
{
top = partitionedQueryExecutionInfo.QueryInfo.Top.Value;
}
else if (queryInfo.HasLimit && (partitionedQueryExecutionInfo.QueryInfo.Limit.Value > 0))
{
top = (partitionedQueryExecutionInfo.QueryInfo.Offset ?? 0) + partitionedQueryExecutionInfo.QueryInfo.Limit.Value;
}
else
{
top = 0;
}

if (top > 0)
{
// All partitions should initially fetch about 1/nth of the top value.
long pageSizeWithTop = (long)Math.Min(
Math.Ceiling(top / (double)targetRanges.Count) * CosmosQueryExecutionContextFactory.PageSizeFactorForTop,
top);

optimalPageSize = Math.Min(pageSizeWithTop, optimalPageSize);
}
else if (cosmosQueryContext.IsContinuationExpected)
{
optimalPageSize = (long)Math.Min(
Math.Ceiling(optimalPageSize / (double)targetRanges.Count) * CosmosQueryExecutionContextFactory.PageSizeFactorForTop,
optimalPageSize);
}
}

Debug.Assert(
(optimalPageSize > 0) && (optimalPageSize <= int.MaxValue),
$"Invalid MaxItemCount {optimalPageSize}");
IReadOnlyList<FeedRangeEpk> targetFeedRanges = targetRanges
.Select(range => new FeedRangeEpk(
new Documents.Routing.Range<string>(
min: range.MinInclusive,
max: range.MaxExclusive,
isMinInclusive: true,
isMaxInclusive: false)))
.ToList();

return PipelineFactory.MonadicCreate(
documentContainer: documentContainer,
sqlQuerySpec: inputParameters.SqlQuerySpec,
targetRanges: targetRanges
IReadOnlyList<FeedRangeEpk> allFeedRanges = allRanges
.Select(range => new FeedRangeEpk(
new Documents.Routing.Range<string>(
min: range.MinInclusive,
max: range.MaxExclusive,
isMinInclusive: true,
isMaxInclusive: false)))
.ToList(),
.ToList();

return PipelineFactory.MonadicCreate(
documentContainer: documentContainer,
sqlQuerySpec: inputParameters.SqlQuerySpec,
targetRanges: targetFeedRanges,
partitionKey: inputParameters.PartitionKey,
queryInfo: partitionedQueryExecutionInfo.QueryInfo,
queryPaginationOptions: new QueryExecutionOptions(
pageSizeHint: (int)optimalPageSize),
hybridSearchQueryInfo: partitionedQueryExecutionInfo.HybridSearchQueryInfo,
maxItemCount: inputParameters.MaxItemCount,
containerQueryProperties: containerQueryProperties,
allRanges: allFeedRanges,
isContinuationExpected: cosmosQueryContext.IsContinuationExpected,
maxConcurrency: inputParameters.MaxConcurrency,
requestContinuationToken: inputParameters.InitialUserContinuationToken);
}
Expand Down Expand Up @@ -695,12 +683,12 @@ private static async Task<PartitionedQueryExecutionInfo> GetPartitionedQueryExec
}
else
{
targetRanges = await queryClient.GetTargetPartitionKeyRangesAsync(
resourceLink,
containerQueryProperties.ResourceId,
partitionedQueryExecutionInfo.QueryRanges,
forceRefresh: false,
trace);
targetRanges = await queryClient.GetTargetPartitionKeyRangesAsync(
resourceLink,
containerQueryProperties.ResourceId,
partitionedQueryExecutionInfo.QueryRanges,
forceRefresh: false,
trace);
}

return targetRanges;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.HybridSearch
{
using System;
using System.Collections.Generic;
using Microsoft.Azure.Cosmos.CosmosElements;

internal sealed class FullTextStatistics
{
public long TotalWordCount { get; }

public ReadOnlyMemory<long> HitCounts => this.hitCounts;

private readonly long[] hitCounts;

public FullTextStatistics(long totalWordCount, long[] hitCounts)
{
this.TotalWordCount = totalWordCount;
this.hitCounts = hitCounts;
}

public FullTextStatistics(CosmosObject cosmosObject)
{
if (cosmosObject == null)
{
throw new System.ArgumentNullException($"{nameof(cosmosObject)} must not be null.");
}

if (!cosmosObject.TryGetValue(FieldNames.TotalWordCount, out CosmosNumber totalWordCount))
{
throw new System.ArgumentException($"{FieldNames.TotalWordCount} must exist and be a number");
}

if (!cosmosObject.TryGetValue(FieldNames.HitCounts, out CosmosArray hitCountsArray))
{
throw new System.ArgumentException($"{FieldNames.HitCounts} must exist and be an array");
}

long[] hitCounts = new long[hitCountsArray.Count];
for (int index = 0; index < hitCountsArray.Count; ++index)
{
if (!(hitCountsArray[index] is CosmosNumber cosmosNumber))
{
throw new System.ArgumentException($"{FieldNames.HitCounts} must be an array of numbers");
}

hitCounts[index] = Number64.ToLong(cosmosNumber.Value);
}

this.TotalWordCount = Number64.ToLong(totalWordCount.Value);
this.hitCounts = hitCounts;
}

private static class FieldNames
{
public const string TotalWordCount = "totalWordCount";

public const string HitCounts = "hitCounts";
}
}
}
Loading