Skip to content

Conversation

@FabianMeiswinkel
Copy link
Member

@FabianMeiswinkel FabianMeiswinkel commented Jan 7, 2025

Description

This PR adds a protected ctor for CosmosAsyncContainer to allow extending Container - for example to add custom diagnostics, error handling, retry policies or addiitonal validation - like disallowing certain query functionality etc.
The CosmosAsyncContainer has several non-final methods already - but there was no public/protected ctor - this was intentional, because the business logic in the CosmosAsyncContainer really does not allow for artificial extensions - like it won't make much sense to try to inject a different store than Cosmso DB because the logc for metrics, diagnostics and even the PagedIterator returned from query etc. is intentionally tight to Cosmos DB.
There have been a few customer reports thought for limited extensibility - for testing mocking, but also to add some shared custom diagnostics and some validations to prevent application teams to violate against best practices (preventing cross-partition queries, disabling scan in query etc.)
So, this PR opens the extensibility in CosmosAsyncContainer a bit more - but since the only protected ctor still requires a CosmosAsnycContainer for the actual implementations it still only allows extensions to wrap around the original Container implementation. This PR is intentionally not trying to extend extensibility to support generic stores etc.

There are two options on how to inject custom containers.

  1. Wrap explicitly.
CosmosClient clientWithoutInterceptor = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .userAgentSuffix("noInterceptor")
            .buildClient();

CosmosContainer normalContainer = clientWithoutInterceptor
    .getDatabase("TestDB")
    .getContainer("TestContainer");

CustomContainer = new CustomContainer(normalContainer);

private static class CustomContainer extends CosmosAsyncContainer {
        protected CustomContainer (CosmosAsyncContainer toBeWrappedContainer) {
            super(toBeWrappedContainer);
        }
 }
  1. Automatically inject customer container for all containers (sync)
CosmosClient clientWithInterceptor = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .userAgentSuffix("withInterceptor")
            .containerCreationInterceptor(originalContainer -> new DisallowQueriesContainer(originalContainer))
            .buildClient();

CosmosContainer customContainer = clientWithInterceptor 
    .getDatabase("TestDB")
    .getContainer("TestContainer");

private static class CustomContainer extends CosmosAsyncContainer {
        protected CustomContainer (CosmosAsyncContainer toBeWrappedContainer) {
            super(toBeWrappedContainer);
        }
 }
  1. Automatically inject customer container for all containers (async)
CosmosAsyncClient asyncClientWithInterceptor = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .userAgentSuffix("withInterceptor")
            .containerCreationInterceptor(originalContainer -> new DisallowQueriesContainer(originalContainer))
            .buildAsyncClient();

CustomContainer customContainer = (CustomContainer)asyncClientWithInterceptor 
    .getDatabase("TestDB")
    .getContainer("TestContainer");

private static class CustomContainer extends CosmosAsyncContainer {
        protected CustomContainer (CosmosAsyncContainer toBeWrappedContainer) {
            super(toBeWrappedContainer);
        }
 }

This PR also adds a new static factory method that allows creating a new CosmosPagedFlux based off of a list of items.

List<ObjectNode> cachedResult = ...
CosmosPagedFlux<ObjectNode> mockFlux = CosmosPagedFlux.createFromList(cachedResult, false);

All SDK Contribution checklist:

  • The pull request does not introduce [breaking changes]
  • CHANGELOG is updated for new features, bug fixes or other significant changes.
  • I have read the contribution guidelines.

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.
  • There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

@FabianMeiswinkel FabianMeiswinkel marked this pull request as draft January 7, 2025 19:06
@FabianMeiswinkel FabianMeiswinkel changed the title Allowing customers to wrap CosmosAsyncContainer DRAFT: Allowing customers to wrap CosmosAsyncContainer Jan 7, 2025
@github-actions github-actions bot added the Cosmos label Jan 7, 2025
@azure-sdk
Copy link
Collaborator

API change check

APIView has identified API level changes in this PR and created following API reviews.

com.azure:azure-cosmos

@FabianMeiswinkel FabianMeiswinkel marked this pull request as ready for review January 8, 2025 00:13
@FabianMeiswinkel FabianMeiswinkel changed the title DRAFT: Allowing customers to wrap CosmosAsyncContainer Allowing customers to wrap CosmosAsyncContainer Jan 8, 2025
@Gueorgi
Copy link

Gueorgi commented Jan 8, 2025

This is a good PR to open up the SDK a lot more in an easy to understand way.

  1. It still leaves no option to extend some methods like readAllItems (package private - no modifier) unless one makes their package appear to be part of the SDK one.

  2. One can't provide a different return object in some cases like CosmosPageFlux for example since it is not extendible (cases of classes that are either final or non public etc).

  3. If one needs to use this for a wrapper but then the wrapper's clients also try to use this functionality as well how that can work? The solution looks to be an util that builds a custom wrapper container-create callback and that util/callback also has to accept as param the "client custom" wrapper callbacks as well and invoke these - becomes a bit mor complex to support (a custom wrapper calling another custom wrapper callback that has been passed in etc).

@FabianMeiswinkel
Copy link
Member Author

This is a good PR to open up the SDK a lot more in an easy to understand way.

  1. It still leaves no option to extend some methods like readAllItems (package private - no modifier) unless one makes their package appear to be part of the SDK one.
  2. One can't provide a different return object in some cases like CosmosPageFlux for example since it is not extendible (cases of classes that are either final or non public etc).
  3. If one needs to use this for a wrapper but then the wrapper's clients also try to use this functionality as well how that can work? The solution looks to be an util that builds a custom wrapper container-create callback and that util/callback also has to accept as param the "client custom" wrapper callbacks as well and invoke these - becomes a bit mor complex to support (a custom wrapper calling another custom wrapper callback that has been passed in etc).

Regarding #1 - the public API for readAllItems is meant to read all documents of a logical partition - this one can also be extended -

/**
* Reads all the items of a logical partition
* <!-- src_embed com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <pre>
* cosmosAsyncContainer
* .readAllItems&#40;new PartitionKey&#40;partitionKey&#41;, Passenger.class&#41;
* .byPage&#40;100&#41;
* .flatMap&#40;passengerFeedResponse -&gt; &#123;
* for &#40;Passenger passenger : passengerFeedResponse.getResults&#40;&#41;&#41; &#123;
* System.out.println&#40;passenger&#41;;
* &#125;
* return Flux.empty&#40;&#41;;
* &#125;&#41;
* .subscribe&#40;&#41;;
* </pre>
* <!-- end com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed responses of the read Cosmos items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param partitionKey the partition key value of the documents that need to be read
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
Class<T> classType) {
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
queryRequestOptions.setPartitionKey(partitionKey);
return this.readAllItems(partitionKey, queryRequestOptions, classType);
}
/**
* Reads all the items of a logical partition
* <!-- src_embed com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <pre>
* cosmosAsyncContainer
* .readAllItems&#40;new PartitionKey&#40;partitionKey&#41;, Passenger.class&#41;
* .byPage&#40;100&#41;
* .flatMap&#40;passengerFeedResponse -&gt; &#123;
* for &#40;Passenger passenger : passengerFeedResponse.getResults&#40;&#41;&#41; &#123;
* System.out.println&#40;passenger&#41;;
* &#125;
* return Flux.empty&#40;&#41;;
* &#125;&#41;
* .subscribe&#40;&#41;;
* </pre>
* <!-- end com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed responses of the read Cosmos items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param partitionKey the partition key value of the documents that need to be read
* @param options the feed options (Optional).
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
CosmosQueryRequestOptions options,
Class<T> classType) {
CosmosAsyncClient client = this.getDatabase().getClient();
final CosmosQueryRequestOptions requestOptions = options == null ? new CosmosQueryRequestOptions() : options;
requestOptions.setPartitionKey(partitionKey);
CosmosQueryRequestOptionsBase<?> cosmosQueryRequestOptionsImpl = queryOptionsAccessor.getImpl(requestOptions);
applyPolicies(OperationType.Query, ResourceType.Document, cosmosQueryRequestOptionsImpl, this.readManyItemsSpanName);
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
QueryFeedOperationState state = new QueryFeedOperationState(
client,
this.readAllItemsOfLogicalPartitionSpanName,
database.getId(),
this.getId(),
ResourceType.Document,
OperationType.ReadFeed,
queryOptionsAccessor.getQueryNameOrDefault(requestOptions, this.readAllItemsOfLogicalPartitionSpanName),
requestOptions,
pagedFluxOptions
);
pagedFluxOptions.setFeedOperationState(state);
return getDatabase()
.getDocClientWrapper()
.readAllDocuments(getLink(), partitionKey, state, classType)
.map(response -> prepareFeedResponse(response, false));
});
}
. I assume you are referring to the internal readAllItems without PK - this functionally is equivalent to doing a query of "SELECT * from c" - so, the API is internal only both in the "normal" container and as such cannot be extended - but the queryItems can be - so, I don't think this is really an issue.

Regarding #2 - ACK - this is true and it is (an intentional) limitation of CosmosPagedFlux - let's go through the concrete use cases to see whether/how we can unblock them by allowing public CosmosPagedFlux overloads or factory methods

Regarding #3 - Correct - pipelining would be needed in this case - but I don't quite see why that is a problem.

@Gueorgi
Copy link

Gueorgi commented Jan 10, 2025

This is a good PR to open up the SDK a lot more in an easy to understand way.

  1. It still leaves no option to extend some methods like readAllItems (package private - no modifier) unless one makes their package appear to be part of the SDK one.
  2. One can't provide a different return object in some cases like CosmosPageFlux for example since it is not extendible (cases of classes that are either final or non public etc).
  3. If one needs to use this for a wrapper but then the wrapper's clients also try to use this functionality as well how that can work? The solution looks to be an util that builds a custom wrapper container-create callback and that util/callback also has to accept as param the "client custom" wrapper callbacks as well and invoke these - becomes a bit mor complex to support (a custom wrapper calling another custom wrapper callback that has been passed in etc).

Regarding #1 - the public API for readAllItems is meant to read all documents of a logical partition - this one can also be extended -

/**
* Reads all the items of a logical partition
* <!-- src_embed com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <pre>
* cosmosAsyncContainer
* .readAllItems&#40;new PartitionKey&#40;partitionKey&#41;, Passenger.class&#41;
* .byPage&#40;100&#41;
* .flatMap&#40;passengerFeedResponse -&gt; &#123;
* for &#40;Passenger passenger : passengerFeedResponse.getResults&#40;&#41;&#41; &#123;
* System.out.println&#40;passenger&#41;;
* &#125;
* return Flux.empty&#40;&#41;;
* &#125;&#41;
* .subscribe&#40;&#41;;
* </pre>
* <!-- end com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed responses of the read Cosmos items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param partitionKey the partition key value of the documents that need to be read
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
Class<T> classType) {
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
queryRequestOptions.setPartitionKey(partitionKey);
return this.readAllItems(partitionKey, queryRequestOptions, classType);
}
/**
* Reads all the items of a logical partition
* <!-- src_embed com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <pre>
* cosmosAsyncContainer
* .readAllItems&#40;new PartitionKey&#40;partitionKey&#41;, Passenger.class&#41;
* .byPage&#40;100&#41;
* .flatMap&#40;passengerFeedResponse -&gt; &#123;
* for &#40;Passenger passenger : passengerFeedResponse.getResults&#40;&#41;&#41; &#123;
* System.out.println&#40;passenger&#41;;
* &#125;
* return Flux.empty&#40;&#41;;
* &#125;&#41;
* .subscribe&#40;&#41;;
* </pre>
* <!-- end com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed responses of the read Cosmos items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param partitionKey the partition key value of the documents that need to be read
* @param options the feed options (Optional).
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
CosmosQueryRequestOptions options,
Class<T> classType) {
CosmosAsyncClient client = this.getDatabase().getClient();
final CosmosQueryRequestOptions requestOptions = options == null ? new CosmosQueryRequestOptions() : options;
requestOptions.setPartitionKey(partitionKey);
CosmosQueryRequestOptionsBase<?> cosmosQueryRequestOptionsImpl = queryOptionsAccessor.getImpl(requestOptions);
applyPolicies(OperationType.Query, ResourceType.Document, cosmosQueryRequestOptionsImpl, this.readManyItemsSpanName);
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
QueryFeedOperationState state = new QueryFeedOperationState(
client,
this.readAllItemsOfLogicalPartitionSpanName,
database.getId(),
this.getId(),
ResourceType.Document,
OperationType.ReadFeed,
queryOptionsAccessor.getQueryNameOrDefault(requestOptions, this.readAllItemsOfLogicalPartitionSpanName),
requestOptions,
pagedFluxOptions
);
pagedFluxOptions.setFeedOperationState(state);
return getDatabase()
.getDocClientWrapper()
.readAllDocuments(getLink(), partitionKey, state, classType)
.map(response -> prepareFeedResponse(response, false));
});
}

. I assume you are referring to the internal readAllItems without PK - this functionally is equivalent to doing a query of "SELECT * from c" - so, the API is internal only both in the "normal" container and as such cannot be extended - but the queryItems can be - so, I don't think this is really an issue.
Regarding #2 - ACK - this is true and it is (an intentional) limitation of CosmosPagedFlux - let's go through the concrete use cases to see whether/how we can unblock them by allowing public CosmosPagedFlux overloads or factory methods

Regarding #3 - Correct - pipelining would be needed in this case - but I don't quite see why that is a problem.

on #1 I aggree - not an issue
on #2 the use case is providing mocks or cached data etc - not being able to create CosmosPageFlux means one can't return anything else but null or what was returned by the unerlying SDK. So this prevents the wrapping being used for mocks and other purposes (i.e. fetch from different cosmos client - cold storage etc).
on #3 - in other wrappers like JDBC one can get a wrapper (not knowing it is a wrapper) and create a new one on top of it as well - and this can be mostly trasparent to the end user/consumer of the wrapper. Happens with JDBC connection pooling, DataSources etc. Here for one wrapper to be used it will not be transparent - the client has to willingly create the wrapper - via some utility and this wrapper will not have a chance of being used in 2nd wrapper unless that 2nd warpper directly calls the first wrapper callback (not transparent).

@github-actions
Copy link
Contributor

Hi @FabianMeiswinkel. Thank you for your interest in helping to improve the Azure SDK experience and for your contribution. We've noticed that there hasn't been recent engagement on this pull request. If this is still an active work stream, please let us know by pushing some changes or leaving a comment. Otherwise, we'll close this out in 7 days.

@github-actions github-actions bot added the no-recent-activity There has been no recent activity on this issue. label Mar 14, 2025
@FabianMeiswinkel
Copy link
Member Author

keep this active please

@github-actions github-actions bot removed the no-recent-activity There has been no recent activity on this issue. label Mar 14, 2025
Copy link

@Gueorgi Gueorgi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM #2 issue is adressed and #3 limitation is something one I can live with. #1 was non issue.

Copy link
Member

@tvaron3 tvaron3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@kushagraThapar kushagraThapar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except few nits.

@FabianMeiswinkel FabianMeiswinkel merged commit 0b7dcd1 into Azure:main Apr 22, 2025
29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants