Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ internal static class CosmosQueryExecutionContextFactory
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private const string QueryInspectionPattern = @"\s+(GROUP\s+BY\s+|COUNT\s*\(|MIN\s*\(|MAX\s*\(|AVG\s*\(|SUM\s*\(|DISTINCT\s+)";
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
private const string Passthrough = "Passthrough";
private const string Specialized = "Specialized";
private const int PageSizeFactorForTop = 5;
private static readonly Regex QueryInspectionRegex = new Regex(QueryInspectionPattern, RegexOptions.IgnoreCase | RegexOptions.Compiled);
Expand Down Expand Up @@ -161,76 +160,12 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
}

PartitionedQueryExecutionInfo partitionedQueryExecutionInfo;
if (inputParameters.ForcePassthrough)
{
partitionedQueryExecutionInfo = new PartitionedQueryExecutionInfo()
{
QueryInfo = new QueryInfo()
{
Aggregates = null,
DistinctType = DistinctQueryType.None,
GroupByAliases = null,
GroupByAliasToAggregateType = null,
GroupByExpressions = null,
HasSelectValue = false,
Limit = null,
Offset = null,
OrderBy = null,
OrderByExpressions = null,
RewrittenQuery = null,
Top = null,
},
QueryRanges = new List<Documents.Routing.Range<string>>(),
};
}
else if (queryPlanFromContinuationToken != null)
if (queryPlanFromContinuationToken != null)
{
partitionedQueryExecutionInfo = queryPlanFromContinuationToken;
}
else
{
// If the query would go to gateway, but we have a partition key,
// then try seeing if we can execute as a passthrough using client side only logic.
// This is to short circuit the need to go to the gateway to get the query plan.
if (cosmosQueryContext.QueryClient.BypassQueryParsing()
&& inputParameters.PartitionKey.HasValue)
{
bool parsed;
SqlQuery sqlQuery;
using (ITrace queryParseTrace = createQueryPipelineTrace.StartChild("Parse Query", TraceComponent.Query, Tracing.TraceLevel.Info))
{
parsed = SqlQueryParser.TryParse(inputParameters.SqlQuerySpec.QueryText, out sqlQuery);
}

if (parsed)
{
bool hasDistinct = sqlQuery.SelectClause.HasDistinct;
bool hasGroupBy = sqlQuery.GroupByClause != default;
bool hasAggregates = AggregateProjectionDetector.HasAggregate(sqlQuery.SelectClause.SelectSpec);
bool createPassthroughQuery = !hasAggregates && !hasDistinct && !hasGroupBy;

if (createPassthroughQuery)
{
SetTestInjectionPipelineType(inputParameters, Passthrough);

// Only thing that matters is that we target the correct range.
Documents.PartitionKeyDefinition partitionKeyDefinition = GetPartitionKeyDefinition(inputParameters, containerQueryProperties);
List<Documents.PartitionKeyRange> targetRanges = await cosmosQueryContext.QueryClient.GetTargetPartitionKeyRangesAsync(
cosmosQueryContext.ResourceLink,
containerQueryProperties.ResourceId,
containerQueryProperties.EffectiveRangesForPartitionKey,
forceRefresh: false,
createQueryPipelineTrace);

return CosmosQueryExecutionContextFactory.TryCreatePassthroughQueryExecutionContext(
documentContainer,
inputParameters,
targetRanges,
cancellationToken);
}
}
}

partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoAsync(
cosmosQueryContext,
inputParameters,
Expand Down Expand Up @@ -270,24 +205,6 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
inputParameters.InitialFeedRange,
trace);

bool singleLogicalPartitionKeyQuery = inputParameters.PartitionKey.HasValue
|| ((partitionedQueryExecutionInfo.QueryRanges.Count == 1)
&& partitionedQueryExecutionInfo.QueryRanges[0].IsSingleValue);
bool serverStreamingQuery = !partitionedQueryExecutionInfo.QueryInfo.HasAggregates
&& !partitionedQueryExecutionInfo.QueryInfo.HasDistinct
&& !partitionedQueryExecutionInfo.QueryInfo.HasGroupBy;
bool streamingSinglePartitionQuery = singleLogicalPartitionKeyQuery && serverStreamingQuery;

bool clientStreamingQuery =
serverStreamingQuery
&& !partitionedQueryExecutionInfo.QueryInfo.HasOrderBy
&& !partitionedQueryExecutionInfo.QueryInfo.HasTop
&& !partitionedQueryExecutionInfo.QueryInfo.HasLimit
&& !partitionedQueryExecutionInfo.QueryInfo.HasOffset;
bool streamingCrossContinuationQuery = !singleLogicalPartitionKeyQuery && clientStreamingQuery;

bool createPassthroughQuery = streamingSinglePartitionQuery || streamingCrossContinuationQuery;

TryCatch<IQueryPipelineStage> tryCreatePipelineStage;

Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync(
Expand All @@ -311,20 +228,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
}
else
{
if (createPassthroughQuery)
{
SetTestInjectionPipelineType(inputParameters, Passthrough);

tryCreatePipelineStage = CosmosQueryExecutionContextFactory.TryCreatePassthroughQueryExecutionContext(
documentContainer,
inputParameters,
targetRanges,
cancellationToken);
}
else
{
tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(documentContainer, cosmosQueryContext, inputParameters, targetRanges, partitionedQueryExecutionInfo, cancellationToken);
}
tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(documentContainer, cosmosQueryContext, inputParameters, targetRanges, partitionedQueryExecutionInfo, cancellationToken);
}

return tryCreatePipelineStage;
Expand Down Expand Up @@ -432,7 +336,6 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
inputParameters.PartitionedQueryExecutionInfo,
inputParameters.ExecutionEnvironment,
inputParameters.ReturnResultsInDeterministicOrder,
inputParameters.ForcePassthrough,
inputParameters.EnableOptimisticDirectExecution,
inputParameters.TestInjections);
}
Expand Down Expand Up @@ -509,33 +412,6 @@ private static TryCatch<IQueryPipelineStage> TryCreateOptimisticDirectExecutionC
cancellationToken: cancellationToken);
}

private static TryCatch<IQueryPipelineStage> TryCreatePassthroughQueryExecutionContext(
DocumentContainer documentContainer,
InputParameters inputParameters,
List<Documents.PartitionKeyRange> targetRanges,
CancellationToken cancellationToken)
{
// Return a parallel context, since we still want to be able to handle splits and concurrency / buffering.
return ParallelCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
sqlQuerySpec: inputParameters.SqlQuerySpec,
targetRanges: targetRanges
.Select(range => new FeedRangeEpk(
new Documents.Routing.Range<string>(
min: range.MinInclusive,
max: range.MaxExclusive,
isMinInclusive: true,
isMaxInclusive: false)))
.ToList(),
queryPaginationOptions: new QueryPaginationOptions(
pageSizeHint: inputParameters.MaxItemCount),
partitionKey: inputParameters.PartitionKey,
prefetchPolicy: PrefetchPolicy.PrefetchSinglePage,
maxConcurrency: inputParameters.MaxConcurrency,
cancellationToken: cancellationToken,
continuationToken: inputParameters.InitialUserContinuationToken);
}

private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryExecutionContext(
DocumentContainer documentContainer,
CosmosQueryContext cosmosQueryContext,
Expand Down Expand Up @@ -742,14 +618,10 @@ private static void SetTestInjectionPipelineType(InputParameters inputParameters
{
responseStats.PipelineType = TestInjections.PipelineType.OptimisticDirectExecution;
}
else if (pipelineType == Specialized)
else
{
responseStats.PipelineType = TestInjections.PipelineType.Specialized;
}
else
{
responseStats.PipelineType = TestInjections.PipelineType.Passthrough;
}
}
}

Expand Down Expand Up @@ -865,7 +737,6 @@ public InputParameters(
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
ExecutionEnvironment? executionEnvironment,
bool? returnResultsInDeterministicOrder,
bool forcePassthrough,
bool enableOptimisticDirectExecution,
TestInjections testInjections)
{
Expand Down Expand Up @@ -899,7 +770,6 @@ public InputParameters(
this.PartitionedQueryExecutionInfo = partitionedQueryExecutionInfo;
this.ExecutionEnvironment = executionEnvironment.GetValueOrDefault(InputParameters.DefaultExecutionEnvironment);
this.ReturnResultsInDeterministicOrder = returnResultsInDeterministicOrder.GetValueOrDefault(InputParameters.DefaultReturnResultsInDeterministicOrder);
this.ForcePassthrough = forcePassthrough;
this.EnableOptimisticDirectExecution = enableOptimisticDirectExecution;
this.TestInjections = testInjections;
}
Expand All @@ -916,7 +786,6 @@ public InputParameters(
public ExecutionEnvironment ExecutionEnvironment { get; }
public bool ReturnResultsInDeterministicOrder { get; }
public TestInjections TestInjections { get; }
public bool ForcePassthrough { get; }
public bool EnableOptimisticDirectExecution { get; }

public InputParameters WithContinuationToken(CosmosElement token)
Expand All @@ -933,7 +802,6 @@ public InputParameters WithContinuationToken(CosmosElement token)
this.PartitionedQueryExecutionInfo,
this.ExecutionEnvironment,
this.ReturnResultsInDeterministicOrder,
this.ForcePassthrough,
this.EnableOptimisticDirectExecution,
this.TestInjections);
}
Expand Down
1 change: 0 additions & 1 deletion Microsoft.Azure.Cosmos/src/Query/Core/TestInjections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ internal sealed class TestInjections
{
public enum PipelineType
{
Passthrough,
Specialized,
OptimisticDirectExecution,
}
Expand Down
2 changes: 0 additions & 2 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public static QueryIterator Create(
string resourceLink,
bool isContinuationExpected,
bool allowNonValueAggregateQuery,
bool forcePassthrough,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
Documents.ResourceType resourceType)
{
Expand Down Expand Up @@ -145,7 +144,6 @@ public static QueryIterator Create(
partitionedQueryExecutionInfo: partitionedQueryExecutionInfo,
executionEnvironment: queryRequestOptions.ExecutionEnvironment,
returnResultsInDeterministicOrder: queryRequestOptions.ReturnResultsInDeterministicOrder,
forcePassthrough: forcePassthrough,
enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution,
testInjections: queryRequestOptions.TestSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public override async Task<TryExecuteQueryResult> TryExecuteQueryAsync(
if (feedRangeInternal != null)
{
// The user has scoped down to a physical partition or logical partition.
// In either case let the query execute as a passthrough.
// In either case, Ode pipeline should be able to execute this query if Ode has been enabled by the user.
QueryIterator passthroughQueryIterator = QueryIterator.Create(
containerCore: this,
client: this.queryClient,
Expand All @@ -374,7 +374,6 @@ public override async Task<TryExecuteQueryResult> TryExecuteQueryAsync(
resourceLink: this.LinkUri,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
forcePassthrough: true, // Forcing a passthrough, since we don't want to get the query plan nor try to rewrite it.
partitionedQueryExecutionInfo: null,
resourceType: ResourceType.Document);

Expand Down Expand Up @@ -438,7 +437,6 @@ public override async Task<TryExecuteQueryResult> TryExecuteQueryAsync(
resourceLink: this.LinkUri,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
forcePassthrough: false,
partitionedQueryExecutionInfo: queryPlan,
resourceType: ResourceType.Document);

Expand Down Expand Up @@ -835,7 +833,6 @@ public override FeedIteratorInternal GetItemQueryStreamIteratorInternal(
resourceLink: this.LinkUri,
isContinuationExpected: isContinuationExcpected,
allowNonValueAggregateQuery: true,
forcePassthrough: false,
partitionedQueryExecutionInfo: null,
resourceType: ResourceType.Document);
}
Expand Down Expand Up @@ -874,7 +871,6 @@ public override FeedIteratorInternal GetReadFeedIterator(
resourceLink: resourceLink,
isContinuationExpected: false,
allowNonValueAggregateQuery: true,
forcePassthrough: false,
partitionedQueryExecutionInfo: null,
resourceType: resourceType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public async Task TestPassingOptimisticDirectExecutionQueries()
partitionKey: partitionKeyValue,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.NonGroupByAndNoContinuationTokenPageSizeOptions,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),

// Simple query (requiresDist = false)
CreateInput(
Expand Down Expand Up @@ -277,14 +277,14 @@ public async Task TestPassingOptimisticDirectExecutionQueries()
partitionKey: null,
enableOptimisticDirectExecution: true,
pageSizeOptions: PageSizeOptions.NonGroupByAndNoContinuationTokenPageSizeOptions,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),
CreateInput(
query: $"SELECT VALUE r.numberField FROM r",
expectedResult: first7Integers,
partitionKey: null,
enableOptimisticDirectExecution: true,
pageSizeOptions: PageSizeOptions.NonGroupByWithContinuationTokenPageSizeOptions,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),

// DISTINCT with ORDER BY (requiresDist = true)
CreateInput(
Expand Down Expand Up @@ -445,28 +445,28 @@ public async Task TestQueriesWithPartitionKeyNone()
partitionKey: PartitionKey.None,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.PageSize100,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),
CreateInput(
query: $"SELECT VALUE r.{NumberField} FROM r ORDER BY r.{NumberField} ASC",
expectedResult: first400Integers,
partitionKey: PartitionKey.None,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.PageSize100,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),
CreateInput(
query: $"SELECT VALUE r.{NumberField} FROM r ORDER BY r.{NumberField} DESC",
expectedResult: first400IntegersReversed,
partitionKey: PartitionKey.None,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.PageSize100,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),
CreateInput(
query: $"SELECT VALUE r.numberField FROM r WHERE r.{NumberField} BETWEEN 0 AND {NumberOfDocuments} OFFSET 1 LIMIT 1",
expectedResult: new List<int> { 1 },
partitionKey: PartitionKey.None,
enableOptimisticDirectExecution: false,
pageSizeOptions: PageSizeOptions.PageSize100,
expectedPipelineType: TestInjections.PipelineType.Passthrough),
expectedPipelineType: TestInjections.PipelineType.Specialized),

//TODO: Change expectedPipelineType to OptimisticDirectExecution once emulator is updated to 0415
CreateInput(
Expand Down
Loading