From b8156128afaa1c6b769f59b18a2c5c5d742b8a6c Mon Sep 17 00:00:00 2001 From: annie-mac Date: Thu, 2 May 2024 08:16:25 -0700 Subject: [PATCH 1/4] fix --- .../doc/configuration-reference.md | 116 +++++++------- .../source/CosmosSourceConfig.java | 5 +- ...njectionServerErrorRuleOnGatewayTests.java | 150 ++++++++++-------- .../implementation/RxGatewayStoreModel.java | 2 +- .../GatewayServerErrorInjector.java | 88 +++++++++- 5 files changed, 225 insertions(+), 136 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md b/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md index a326f75be4ac..9e5b224b60a1 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md @@ -2,65 +2,65 @@ ## Generic Configuration -| Config Property Name | Default | Description | -|:--------------------------------------------------------------------|:-----------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `azure.cosmos.account.endpoint` | None | Cosmos DB Account Endpoint Uri | -| `azure.cosmos.account.environment` | `Azure` | The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`. | -| `azure.cosmos.account.tenantId` | `""` | The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication. | -| `azure.cosmos.auth.type` | `MasterKey` | There are two auth types are supported currently: `MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal` | -| `azure.cosmos.account.key` | `""` | Cosmos DB Account Key (only required in case of `auth.type` as `MasterKey`) | -| `azure.cosmos.auth.aad.clientId` | `""` | The clientId/ApplicationId of the service principal. Required for `ServicePrincipal` authentication. | -| `azure.cosmos.auth.aad.clientSecret` | `""` | The client secret/password of the service principal. | -| `azure.cosmos.mode.gateway` | `false` | Flag to indicate whether to use gateway mode. By default it is false, means SDK uses direct mode. https://learn.microsoft.com/azure/cosmos-db/nosql/sdk-connection-modes | -| `azure.cosmos.preferredRegionList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). | -| `azure.cosmos.application.name` | `""` | Application name. Will be added as the userAgent suffix. | -| `azure.cosmos.throughputControl.enabled` | `false` | A flag to indicate whether throughput control is enabled. | -| `azure.cosmos.throughputControl.account.endpoint` | `""` | Cosmos DB Throughput Control Account Endpoint Uri. | -| `azure.cosmos.throughputControl.account.environment` | `Azure` | The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`. | -| `azure.cosmos.throughputControl.account.tenantId` | `""` | The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication. | -| `azure.cosmos.throughputControl.auth.type` | `MasterKey` | There are two auth types are supported currently: `MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal` | -| `azure.cosmos.throughputControl.account.key` | `""` | Cosmos DB Throughput Control Account Key (only required in case of `throughputControl.auth.type` as `MasterKey`). | -| `azure.cosmos.throughputControl.auth.aad.clientId` | `""` | The clientId/ApplicationId of the service principal. Required for `ServicePrincipal` authentication. | -| `azure.cosmos.throughputControl.auth.aad.clientSecret` | `""` | The client secret/password of the service principal. | -| `azure.cosmos.throughputControl.preferredRegionList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). | -| `azure.cosmos.throughputControl.mode.gateway` | `false` | Flag to indicate whether to use gateway mode. By default it is false, means SDK uses direct mode. https://learn.microsoft.com/azure/cosmos-db/nosql/sdk-connection-modes | -| `azure.cosmos.throughputControl.group.name` | `""` | Throughput control group name. Since customer is allowed to create many groups for a container, the name should be unique. | -| `azure.cosmos.throughputControl.targetThroughput` | `-1` | Throughput control group target throughput. The value should be larger than 0. | -| `azure.cosmos.throughputControl.targetThroughputThreshold` | `-1` | Throughput control group target throughput threshold. The value should be between (0,1]. | -| `azure.cosmos.throughputControl.priorityLevel` | `None` | Throughput control group priority level. The value can be None, High or Low. | -| `azure.cosmos.throughputControl.globalControl.database.name` | `""` | Database which will be used for throughput global control. | -| `azure.cosmos.throughputControl.globalControl.container.name` | `""` | Container which will be used for throughput global control. | -| `azure.cosmos.throughputControl.globalControl.renewIntervalInMS` | `-1` | This controls how often the client is going to update the throughput usage of itself and adjust its own throughput share based on the throughput usage of other clients. Default is 5s, the allowed min value is 5s. | -| `azure.cosmos.throughputControl.globalControl.expireIntervalInMS` | `-1` | This controls how quickly we will detect the client has been offline and hence allow its throughput share to be taken by other clients. Default is 11s, the allowed min value is 2 * renewIntervalInMS + 1. | +| Config Property Name | Default | Description | +|:------------------------------------------------------------------|:-----------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `azure.cosmos.account.endpoint` | None | Cosmos DB Account Endpoint Uri | +| `azure.cosmos.account.environment` | `Azure` | The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`. | +| `azure.cosmos.account.tenantId` | `""` | The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication. | +| `azure.cosmos.auth.type` | `MasterKey` | There are two auth types are supported currently: `MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal` | +| `azure.cosmos.account.key` | `""` | Cosmos DB Account Key (only required in case of `auth.type` as `MasterKey`) | +| `azure.cosmos.auth.aad.clientId` | `""` | The clientId/ApplicationId of the service principal. Required for `ServicePrincipal` authentication. | +| `azure.cosmos.auth.aad.clientSecret` | `""` | The client secret/password of the service principal. | +| `azure.cosmos.mode.gateway` | `false` | Flag to indicate whether to use gateway mode. By default it is false, means SDK uses direct mode. https://learn.microsoft.com/azure/cosmos-db/nosql/sdk-connection-modes | +| `azure.cosmos.preferredRegionList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). | +| `azure.cosmos.application.name` | `""` | Application name. Will be added as the userAgent suffix. | +| `azure.cosmos.throughputControl.enabled` | `false` | A flag to indicate whether throughput control is enabled. | +| `azure.cosmos.throughputControl.account.endpoint` | `""` | Cosmos DB Throughput Control Account Endpoint Uri. | +| `azure.cosmos.throughputControl.account.environment` | `Azure` | The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`. | +| `azure.cosmos.throughputControl.account.tenantId` | `""` | The tenantId of the CosmosDB account. Required for `ServicePrincipal` authentication. | +| `azure.cosmos.throughputControl.auth.type` | `MasterKey` | There are two auth types are supported currently: `MasterKey`(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), `ServicePrincipal` | +| `azure.cosmos.throughputControl.account.key` | `""` | Cosmos DB Throughput Control Account Key (only required in case of `throughputControl.auth.type` as `MasterKey`). | +| `azure.cosmos.throughputControl.auth.aad.clientId` | `""` | The clientId/ApplicationId of the service principal. Required for `ServicePrincipal` authentication. | +| `azure.cosmos.throughputControl.auth.aad.clientSecret` | `""` | The client secret/password of the service principal. | +| `azure.cosmos.throughputControl.preferredRegionList` | `[]` | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated kafka cluster with your Cosmos DB account and pass the kafka cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet&preserve-view=true). | +| `azure.cosmos.throughputControl.mode.gateway` | `false` | Flag to indicate whether to use gateway mode. By default it is false, means SDK uses direct mode. https://learn.microsoft.com/azure/cosmos-db/nosql/sdk-connection-modes | +| `azure.cosmos.throughputControl.group.name` | `""` | Throughput control group name. Since customer is allowed to create many groups for a container, the name should be unique. | +| `azure.cosmos.throughputControl.targetThroughput` | `-1` | Throughput control group target throughput. The value should be larger than 0. | +| `azure.cosmos.throughputControl.targetThroughputThreshold` | `-1` | Throughput control group target throughput threshold. The value should be between (0,1]. | +| `azure.cosmos.throughputControl.priorityLevel` | `None` | Throughput control group priority level. The value can be None, High or Low. | +| `azure.cosmos.throughputControl.globalControl.database.name` | `""` | Database which will be used for throughput global control. | +| `azure.cosmos.throughputControl.globalControl.container.name` | `""` | Container which will be used for throughput global control. | +| `azure.cosmos.throughputControl.globalControl.renewIntervalInMS` | `-1` | This controls how often the client is going to update the throughput usage of itself and adjust its own throughput share based on the throughput usage of other clients. Default is 5s, the allowed min value is 5s. | +| `azure.cosmos.throughputControl.globalControl.expireIntervalInMS` | `-1` | This controls how quickly we will detect the client has been offline and hence allow its throughput share to be taken by other clients. Default is 11s, the allowed min value is 2 * renewIntervalInMS + 1. | ## Source Connector Configuration -| Config Property Name | Default | Description | -|:--------------------------------------------------|:---------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `azure.cosmos.source.database.name` | None | Cosmos DB database name. | -| `azure.cosmos.source.containers.includeAll` | `false` | Flag to indicate whether reading from all containers. | -| `azure.cosmos.source.containers.includedList` | `[]` | Containers included. This config will be ignored if azure.cosmos.source.containers.includeAll is true. | -| `azure.cosmos.source.containers.topicMap` | `[]` | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. By default, container name is used as the name of the kafka topic to publish data to, can use this property to override the default config | -| `azure.cosmos.source.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (Now, Beginning or a certain point in time (UTC) for example 2020-02-10T14:15:03) - the default value is 'Beginning'. | -| `azure.cosmos.source.changeFeed.mode` | `LatestVersion` | ChangeFeed mode (LatestVersion or AllVersionsAndDeletes). | -| `azure.cosmos.source.changeFeed.maxItemCountHint` | `1000` | The maximum number of documents returned in a single change feed request. But the number of items received might be higher than the specified value if multiple items are changed by the same transaction. | -| `azure.cosmos.source.metadata.poll.delay.ms` | `300000` | Indicates how often to check the metadata changes (including container split/merge, adding/removing/recreated containers). When changes are detected, it will reconfigure the tasks. Default is 5 minutes. | -| `azure.cosmos.source.metadata.storage.type` | `Kafka` | The storage type of the metadata. Two types are supported: Cosmos, Kafka. | -| `azure.cosmos.source.metadata.storage.name` | `_cosmos.metadata.topic` | The resource name of the metadata storage. If metadata storage type is Kafka topic, then this config refers to kafka topic name, the metadata topic will be created if it does not already exist, else it will use the pre-created topic. If metadata storage type is CosmosDB container, then this config refers to container name, please pre-create the metadata container partitioned by /id. | -| `azure.cosmos.source.messageKey.enabled` | `true` | Whether to set the kafka record message key. | -| `azure.cosmos.source.messageKey.field` | `id` | The field to use as the message key. | +| Config Property Name | Default | Description | +|:--------------------------------------------------|:-------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `azure.cosmos.source.database.name` | None | Cosmos DB database name. | +| `azure.cosmos.source.containers.includeAll` | `false` | Flag to indicate whether reading from all containers. | +| `azure.cosmos.source.containers.includedList` | `[]` | Containers included. This config will be ignored if azure.cosmos.source.containers.includeAll is true. | +| `azure.cosmos.source.containers.topicMap` | `[]` | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. By default, container name is used as the name of the kafka topic to publish data to, can use this property to override the default config | +| `azure.cosmos.source.changeFeed.startFrom` | `Beginning` | ChangeFeed Start from settings (Now, Beginning or a certain point in time (UTC) for example 2020-02-10T14:15:03) - the default value is 'Beginning'. | +| `azure.cosmos.source.changeFeed.mode` | `LatestVersion` | ChangeFeed mode (LatestVersion or AllVersionsAndDeletes). | +| `azure.cosmos.source.changeFeed.maxItemCountHint` | `1000` | The maximum number of documents returned in a single change feed request. But the number of items received might be higher than the specified value if multiple items are changed by the same transaction. | +| `azure.cosmos.source.metadata.poll.delay.ms` | `300000` | Indicates how often to check the metadata changes (including container split/merge, adding/removing/recreated containers). When changes are detected, it will reconfigure the tasks. Default is 5 minutes. | +| `azure.cosmos.source.metadata.storage.type` | `Kafka` | The storage type of the metadata. Two types are supported: Cosmos, Kafka. | +| `azure.cosmos.source.metadata.storage.name` | `_cosmos.metadata.topic` | The resource name of the metadata storage. If metadata storage type is Kafka topic, then this config refers to kafka topic name, the metadata topic will be created if it does not already exist, else it will use the pre-created topic. If metadata storage type is `Cosmos`, then this config refers to container name, for `MasterKey` auth, this container will be created with `AutoScale` with 4000 RU if not already exists, for `ServicePrincipal` auth, it requires the container to be created ahead of time . | +| `azure.cosmos.source.messageKey.enabled` | `true` | Whether to set the kafka record message key. | +| `azure.cosmos.source.messageKey.field` | `id` | The field to use as the message key. | ## Sink Connector Configuration -| Config Property Name | Default | Description | -|:-------------------------------------------------------|:--------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `azure.cosmos.sink.database.name` | `""` | Cosmos DB database name. | -| `azure.cosmos.sink.containers.topicMap` | `""` | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. | -| `azure.cosmos.sink.errors.tolerance.level` | `None` | Error tolerance level after exhausting all retries. `None` for fail on error. `All` for log and continue | -| `azure.cosmos.sink.bulk.enabled` | `true` | Flag to indicate whether Cosmos DB bulk mode is enabled for Sink connector. By default it is true. | -| `azure.cosmos.sink.bulk.maxConcurrentCosmosPartitions` | `-1` | Cosmos DB Item Write Max Concurrent Cosmos Partitions. If not specified it will be determined based on the number of the container's physical partitions which would indicate every batch is expected to have data from all Cosmos physical partitions. If specified it indicates from at most how many Cosmos Physical Partitions each batch contains data. So this config can be used to make bulk processing more efficient when input data in each batch has been repartitioned to balance to how many Cosmos partitions each batch needs to write. This is mainly useful for very large containers (with hundreds of physical partitions. | -| `azure.cosmos.sink.bulk.initialBatchSize` | `1` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 1. Reduce this when you want to avoid that the first few requests consume too many RUs. | -| `azure.cosmos.sink.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: `ItemOverwrite` (using upsert), `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts), `ItemDelete` (deletes based on id/pk of data frame), `ItemDeleteIfNotModified` (deletes based on id/pk of data frame if etag hasn't changed since collecting id/pk), `ItemOverwriteIfNotModified` (using create if etag is empty, update/replace with etag pre-condition otherwise, if document was updated the pre-condition failure is ignored), `ItemPatch` (Partial update all documents based on the patch config) | -| `azure.cosmos.sink.maxRetryCount` | `10` | Cosmos DB max retry attempts on write failures for Sink connector. By default, the connector will retry on transient write errors for up to 10 times. | -| `azure.cosmos.sink.id.strategy` | `ProvidedInValueStrategy` | A strategy used to populate the document with an ``id``. Valid strategies are: ``TemplateStrategy``, ``FullKeyStrategy``, ``KafkaMetadataStrategy``, ``ProvidedInKeyStrategy``, ``ProvidedInValueStrategy``. Configuration properties prefixed with``id.strategy`` are passed through to the strategy. For example, when using ``id.strategy=TemplateStrategy`` , the property ``id.strategy.template`` is passed through to the template strategy and used to specify the template string to be used in constructing the ``id``. | -| `azure.cosmos.sink.write.patch.operationType.default` | `Set` | Default Cosmos DB patch operation type. Supported ones include none, add, set, replace, remove, increment. Choose none for no-op, for others please reference [here](https://docs.microsoft.com/azure/cosmos-db/partial-document-update#supported-operations) for full context. | -| `azure.cosmos.sink.write.patch.property.configs` | `""` | Cosmos DB patch json property configs. It can contain multiple definitions matching the following patterns separated by comma. property(jsonProperty).op(operationType) or property(jsonProperty).path(patchInCosmosdb).op(operationType) - The difference of the second pattern is that it also allows you to define a different cosmosdb path. Note: It does not support nested json property config. | -| `azure.cosmos.sink.write.patch.filter` | `""` | Used for [Conditional patch](https://docs.microsoft.com/azure/cosmos-db/partial-document-update-getting-started#java) | +| Config Property Name | Default | Description | +|:-----------------------------------------------------------|:--------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `azure.cosmos.sink.database.name` | `""` | Cosmos DB database name. | +| `azure.cosmos.sink.containers.topicMap` | `""` | A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2. | +| `azure.cosmos.sink.errors.tolerance.level` | `None` | Error tolerance level after exhausting all retries. `None` for fail on error. `All` for log and continue | +| `azure.cosmos.sink.bulk.enabled` | `true` | Flag to indicate whether Cosmos DB bulk mode is enabled for Sink connector. By default it is true. | +| `azure.cosmos.sink.bulk.maxConcurrentCosmosPartitions` | `-1` | Cosmos DB Item Write Max Concurrent Cosmos Partitions. If not specified it will be determined based on the number of the container's physical partitions which would indicate every batch is expected to have data from all Cosmos physical partitions. If specified it indicates from at most how many Cosmos Physical Partitions each batch contains data. So this config can be used to make bulk processing more efficient when input data in each batch has been repartitioned to balance to how many Cosmos partitions each batch needs to write. This is mainly useful for very large containers (with hundreds of physical partitions. | +| `azure.cosmos.sink.bulk.initialBatchSize` | `1` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 1. Reduce this when you want to avoid that the first few requests consume too many RUs. | +| `azure.cosmos.sink.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: `ItemOverwrite` (using upsert), `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts), `ItemDelete` (deletes based on id/pk of data frame), `ItemDeleteIfNotModified` (deletes based on id/pk of data frame if etag hasn't changed since collecting id/pk), `ItemOverwriteIfNotModified` (using create if etag is empty, update/replace with etag pre-condition otherwise, if document was updated the pre-condition failure is ignored), `ItemPatch` (Partial update all documents based on the patch config) | +| `azure.cosmos.sink.maxRetryCount` | `10` | Cosmos DB max retry attempts on write failures for Sink connector. By default, the connector will retry on transient write errors for up to 10 times. | +| `azure.cosmos.sink.id.strategy` | `ProvidedInValueStrategy` | A strategy used to populate the document with an ``id``. Valid strategies are: ``TemplateStrategy``, ``FullKeyStrategy``, ``KafkaMetadataStrategy``, ``ProvidedInKeyStrategy``, ``ProvidedInValueStrategy``. Configuration properties prefixed with``id.strategy`` are passed through to the strategy. For example, when using ``id.strategy=TemplateStrategy`` , the property ``id.strategy.template`` is passed through to the template strategy and used to specify the template string to be used in constructing the ``id``. | +| `azure.cosmos.sink.write.patch.operationType.default` | `Set` | Default Cosmos DB patch operation type. Supported ones include none, add, set, replace, remove, increment. Choose none for no-op, for others please reference [here](https://docs.microsoft.com/azure/cosmos-db/partial-document-update#supported-operations) for full context. | +| `azure.cosmos.sink.write.patch.property.configs` | `""` | Cosmos DB patch json property configs. It can contain multiple definitions matching the following patterns separated by comma. property(jsonProperty).op(operationType) or property(jsonProperty).path(patchInCosmosdb).op(operationType) - The difference of the second pattern is that it also allows you to define a different cosmosdb path. Note: It does not support nested json property config. | +| `azure.cosmos.sink.write.patch.filter` | `""` | Used for [Conditional patch](https://docs.microsoft.com/azure/cosmos-db/partial-document-update-getting-started#java) | diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java index 2cc8ee56c734..ba7a2821ac20 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/source/CosmosSourceConfig.java @@ -79,8 +79,9 @@ public class CosmosSourceConfig extends KafkaCosmosConfig { private static final String DEFAULT_METADATA_STORAGE_TYPE = CosmosMetadataStorageType.KAFKA.getName(); private static final String METADATA_STORAGE_NAME = "azure.cosmos.source.metadata.storage.name"; - private static final String METADATA_STORAGE_NAME_DOC = "The resource name of the metadata storage. If metadata storage type is Kafka topic, then this config refers to kafka topic name, the metadata topic will be created if it does not already exist, else it will use the pre-created topic." - + " If metadata storage type is CosmosDB container, then this config refers to container name, please pre-create the metadata container partitioned by /id."; + private static final String METADATA_STORAGE_NAME_DOC = "The resource name of the metadata storage." + + " If metadata storage type is Kafka topic, then this config refers to kafka topic name, the metadata topic will be created if it does not already exist, else it will use the pre-created topic." + + " If metadata storage type is `Cosmos`, then this config refers to container name, for `MasterKey` auth, this container will be created with `AutoScale` with 4000 RU if not already exists, for `ServicePrincipal` auth, it requires the container to be created ahead of time ."; private static final String METADATA_STORAGE_NAME_DISPLAY = "The metadata storage name."; private static final String DEFAULT_METADATA_STORAGE_NAME = "_cosmos.metadata.topic"; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayTests.java index 2be8f657c269..524c9b86d0cd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayTests.java @@ -75,7 +75,7 @@ public FaultInjectionServerErrorRuleOnGatewayTests(CosmosClientBuilder clientBui this.subscriberValidationTimeout = TIMEOUT; } - @BeforeClass(groups = {"multi-master", "long"}, timeOut = TIMEOUT) + @BeforeClass(groups = {"multi-master", "fast"}, timeOut = TIMEOUT) public void beforeClass() { client = getClientBuilder().buildAsyncClient(); AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(client); @@ -225,79 +225,95 @@ public void faultInjectionServerErrorRuleTests_Region() throws JsonProcessingExc } } - @Test(groups = {"multi-master", "long"}, timeOut = 4 * TIMEOUT) + @Test(groups = {"multi-master", "fast"}, timeOut = 4 * TIMEOUT) public void faultInjectionServerErrorRuleTests_Partition() throws JsonProcessingException { - for (int i = 0; i < 10; i++) { - cosmosAsyncContainer.createItem(TestItem.createNewItem()).block(); - } + CosmosAsyncClient testClient = null; - // getting one item from each feedRange - List feedRanges = cosmosAsyncContainer.getFeedRanges().block(); - assertThat(feedRanges.size()).isGreaterThan(1); + try { + testClient = this.getClientBuilder() + .consistencyLevel(this.databaseAccount.getConsistencyPolicy().getDefaultConsistencyLevel()) + .buildAsyncClient(); - String query = "select * from c"; - CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); - cosmosQueryRequestOptions.setFeedRange(feedRanges.get(0)); - TestItem itemOnFeedRange0 = cosmosAsyncContainer.queryItems(query, cosmosQueryRequestOptions, TestItem.class).blockFirst(); + CosmosAsyncContainer testContainer = + testClient + .getDatabase(cosmosAsyncContainer.getDatabase().getId()) + .getContainer(cosmosAsyncContainer.getId()); - cosmosQueryRequestOptions.setFeedRange(feedRanges.get(1)); - TestItem itemOnFeedRange1 = cosmosAsyncContainer.queryItems(query, cosmosQueryRequestOptions, TestItem.class).blockFirst(); + for (int i = 0; i < 10; i++) { + testContainer.createItem(TestItem.createNewItem()).block(); + } - // set rule by feed range - String feedRangeRuleId = "ServerErrorRule-FeedRange-" + UUID.randomUUID(); + // getting one item from each feedRange + List feedRanges = testContainer.getFeedRanges().block(); + assertThat(feedRanges.size()).isGreaterThan(1); + + String query = "select * from c"; + CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); + cosmosQueryRequestOptions.setFeedRange(feedRanges.get(0)); + TestItem itemOnFeedRange0 = testContainer.queryItems(query, cosmosQueryRequestOptions, TestItem.class).blockFirst(); + + cosmosQueryRequestOptions.setFeedRange(feedRanges.get(1)); + TestItem itemOnFeedRange1 = testContainer.queryItems(query, cosmosQueryRequestOptions, TestItem.class).blockFirst(); + + // set rule by feed range + String feedRangeRuleId = "ServerErrorRule-FeedRange-" + UUID.randomUUID(); + + FaultInjectionRule serverErrorRuleByFeedRange = + new FaultInjectionRuleBuilder(feedRangeRuleId) + .condition( + new FaultInjectionConditionBuilder() + .connectionType(FaultInjectionConnectionType.GATEWAY) + .endpoints(new FaultInjectionEndpointBuilder(feedRanges.get(0)).build()) + .build() + ) + .result( + FaultInjectionResultBuilders + .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) + .times(1) + .build() + ) + .duration(Duration.ofMinutes(5)) + .build(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(serverErrorRuleByFeedRange)).block(); + assertThat( + serverErrorRuleByFeedRange.getRegionEndpoints().size() == this.readRegionMap.size() + && serverErrorRuleByFeedRange.getRegionEndpoints().containsAll(this.readRegionMap.keySet())); - FaultInjectionRule serverErrorRuleByFeedRange = - new FaultInjectionRuleBuilder(feedRangeRuleId) - .condition( - new FaultInjectionConditionBuilder() - .connectionType(FaultInjectionConnectionType.GATEWAY) - .endpoints(new FaultInjectionEndpointBuilder(feedRanges.get(0)).build()) - .build() - ) - .result( - FaultInjectionResultBuilders - .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) - .times(1) - .build() - ) - .duration(Duration.ofMinutes(5)) - .build(); + // Issue a read item for the same feed range as configured in the fault injection rule + CosmosDiagnostics cosmosDiagnostics = + testContainer + .readItem(itemOnFeedRange0.getId(), new PartitionKey(itemOnFeedRange0.getId()), JsonNode.class) + .block() + .getDiagnostics(); - CosmosFaultInjectionHelper.configureFaultInjectionRules(cosmosAsyncContainer, Arrays.asList(serverErrorRuleByFeedRange)).block(); - assertThat( - serverErrorRuleByFeedRange.getRegionEndpoints().size() == this.readRegionMap.size() - && serverErrorRuleByFeedRange.getRegionEndpoints().containsAll(this.readRegionMap.keySet())); - - // Issue a read item for the same feed range as configured in the fault injection rule - CosmosDiagnostics cosmosDiagnostics = - cosmosAsyncContainer - .readItem(itemOnFeedRange0.getId(), new PartitionKey(itemOnFeedRange0.getId()), JsonNode.class) - .block() - .getDiagnostics(); - - this.validateHitCount(serverErrorRuleByFeedRange, 1, OperationType.Read, ResourceType.Document); - this.validateFaultInjectionRuleApplied( - cosmosDiagnostics, - OperationType.Read, - HttpConstants.StatusCodes.TOO_MANY_REQUESTS, - HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE, - feedRangeRuleId, - true - ); - - // Issue a read item to different feed range - try { - cosmosDiagnostics = cosmosAsyncContainer - .readItem(itemOnFeedRange1.getId(), new PartitionKey(itemOnFeedRange1.getId()), JsonNode.class) - .block() - .getDiagnostics(); - this.validateNoFaultInjectionApplied(cosmosDiagnostics, OperationType.Read, FAULT_INJECTION_RULE_NON_APPLICABLE_ADDRESS); + this.validateHitCount(serverErrorRuleByFeedRange, 1, OperationType.Read, ResourceType.Document); + this.validateFaultInjectionRuleApplied( + cosmosDiagnostics, + OperationType.Read, + HttpConstants.StatusCodes.TOO_MANY_REQUESTS, + HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE, + feedRangeRuleId, + true + ); + + // Issue a read item to different feed range + try { + cosmosDiagnostics = testContainer + .readItem(itemOnFeedRange1.getId(), new PartitionKey(itemOnFeedRange1.getId()), JsonNode.class) + .block() + .getDiagnostics(); + this.validateNoFaultInjectionApplied(cosmosDiagnostics, OperationType.Read, FAULT_INJECTION_RULE_NON_APPLICABLE_ADDRESS); + } finally { + serverErrorRuleByFeedRange.disable(); + } } finally { - serverErrorRuleByFeedRange.disable(); + safeClose(testClient); } + } - @Test(groups = {"multi-master", "long"}, timeOut = 4 * TIMEOUT) + @Test(groups = {"multi-master", "fast"}, timeOut = 4 * TIMEOUT) public void faultInjectionServerErrorRuleTests_ServerResponseDelay() throws JsonProcessingException { // define another rule which can simulate timeout String timeoutRuleId = "serverErrorRule-responseDelay-" + UUID.randomUUID(); @@ -347,7 +363,7 @@ public void faultInjectionServerErrorRuleTests_ServerResponseDelay() throws Json } } - @Test(groups = {"multi-master", "long"}, timeOut = 4 * TIMEOUT) + @Test(groups = {"multi-master", "fast"}, timeOut = 4 * TIMEOUT) public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws JsonProcessingException { // simulate high channel acquisition/connectionTimeout String ruleId = "serverErrorRule-serverConnectionDelay-" + UUID.randomUUID(); @@ -388,7 +404,7 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws Js } } - @Test(groups = {"multi-master", "long"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT) + @Test(groups = {"multi-master", "fast"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT) public void faultInjectionServerErrorRuleTests_ServerErrorResponse( FaultInjectionServerErrorType serverErrorType, boolean canRetry, @@ -460,7 +476,7 @@ public void faultInjectionServerErrorRuleTests_ServerErrorResponse( } } - @Test(groups = {"multi-master", "long"}, dataProvider = "operationTypeProvider", timeOut = TIMEOUT) + @Test(groups = {"multi-master", "fast"}, dataProvider = "operationTypeProvider", timeOut = TIMEOUT) public void faultInjectionServerErrorRuleTests_HitLimit( OperationType operationType, FaultInjectionOperationType faultInjectionOperationType) throws JsonProcessingException { @@ -518,7 +534,7 @@ public void faultInjectionServerErrorRuleTests_HitLimit( } } - @AfterClass(groups = {"multi-master", "long"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = {"multi-master", "fast"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 798e96353477..bd054cd6822b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -596,7 +596,7 @@ public Flux submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiv @Override public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider, Configs configs) { if (this.gatewayServerErrorInjector == null) { - this.gatewayServerErrorInjector = new GatewayServerErrorInjector(configs); + this.gatewayServerErrorInjector = new GatewayServerErrorInjector(configs, collectionCache, partitionKeyRangeCache); } this.gatewayServerErrorInjector.registerServerErrorInjector(injectorProvider.getServerErrorInjector()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java index 812d405ab5ff..d3fd6d2292f4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java @@ -3,13 +3,21 @@ package com.azure.cosmos.implementation.faultinjection; +import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.PartitionKeyRange; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.caches.RxCollectionCache; +import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; import com.azure.cosmos.implementation.http.HttpRequest; import com.azure.cosmos.implementation.http.HttpResponse; import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; +import com.azure.cosmos.implementation.routing.PartitionKeyInternal; +import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; import io.netty.channel.ConnectTimeoutException; import io.netty.handler.timeout.ReadTimeoutException; import reactor.core.publisher.Mono; @@ -25,12 +33,23 @@ public class GatewayServerErrorInjector { private final Configs configs; + private final RxCollectionCache collectionCache; + private final RxPartitionKeyRangeCache partitionKeyRangeCache; private List faultInjectors = new ArrayList<>(); - public GatewayServerErrorInjector(Configs configs) { + public GatewayServerErrorInjector( + Configs configs, + RxCollectionCache collectionCache, + RxPartitionKeyRangeCache partitionKeyRangeCache) { checkNotNull(configs, "Argument 'configs' can not be null"); this.configs = configs; + this.collectionCache = collectionCache; + this.partitionKeyRangeCache = partitionKeyRangeCache; + } + + public GatewayServerErrorInjector(Configs configs) { + this(configs, null, null); } public void registerServerErrorInjector(IServerErrorInjector serverErrorInjector) { @@ -38,18 +57,71 @@ public void registerServerErrorInjector(IServerErrorInjector serverErrorInjector this.faultInjectors.add(serverErrorInjector); } + private Mono> resolvePartitionKeyRange(RxDocumentServiceRequest request) { + // faultInjection rule can be configured to only apply for a certain partition + // but in the normal flow, only session consistency will populate the resolvePartitionKey when apply session token + // so for other consistencies, we need to calculate here + + if (this.collectionCache == null || this.partitionKeyRangeCache == null) { + return Mono.just(new Utils.ValueHolder<>(null)); + } + + if (request == null || request.requestContext == null) { + return Mono.just(new Utils.ValueHolder<>(null)); + } + + if (request.requestContext.resolvedPartitionKeyRange != null) { + return Mono.just(Utils.ValueHolder.initialize(request.requestContext.resolvedPartitionKeyRange)); + } + + return this.collectionCache + .resolveCollectionAsync( + BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request) + .flatMap(collectionValueHolder -> { + return partitionKeyRangeCache + .tryLookupAsync( + BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), + collectionValueHolder.v.getResourceId(), + null, + null) + .flatMap(collectionRoutingMapValueHolder -> { + String partitionKeyRangeId = + request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID); + PartitionKeyInternal partitionKeyInternal = request.getPartitionKeyInternal(); + if (StringUtils.isNotEmpty(partitionKeyRangeId)) { + PartitionKeyRange range = + collectionRoutingMapValueHolder.v.getRangeByPartitionKeyRangeId(partitionKeyRangeId); + request.requestContext.resolvedPartitionKeyRange = range; + } else if (partitionKeyInternal != null) { + String effectivePartitionKeyString = PartitionKeyInternalHelper + .getEffectivePartitionKeyString( + partitionKeyInternal, + collectionValueHolder.v.getPartitionKey()); + PartitionKeyRange range = + collectionRoutingMapValueHolder.v.getRangeByEffectivePartitionKey(effectivePartitionKeyString); + request.requestContext.resolvedPartitionKeyRange = range; + } + + return Mono.just(Utils.ValueHolder.initialize(request.requestContext.resolvedPartitionKeyRange)); + }); + }); + } + public Mono injectGatewayErrors( Duration responseTimeout, HttpRequest httpRequest, RxDocumentServiceRequest serviceRequest, Mono originalResponseMono) { - return injectGatewayErrors( - responseTimeout, - httpRequest, - serviceRequest, - originalResponseMono, - serviceRequest.requestContext.resolvedPartitionKeyRange != null - ? Arrays.asList(serviceRequest.requestContext.resolvedPartitionKeyRange.getId()) : null); + + return this.resolvePartitionKeyRange(serviceRequest) + .flatMap(resolvedPartitionKeyRangeValueHolder -> { + return injectGatewayErrors( + responseTimeout, + httpRequest, + serviceRequest, + originalResponseMono, + resolvedPartitionKeyRangeValueHolder.v == null ? null : Arrays.asList(resolvedPartitionKeyRangeValueHolder.v.getId())); + }); } public Mono injectGatewayErrors( From 011254dd0e54edc16899c146ef5da735b07fcef7 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Thu, 2 May 2024 08:22:48 -0700 Subject: [PATCH 2/4] update changelog --- sdk/cosmos/azure-cosmos-test/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos-test/CHANGELOG.md b/sdk/cosmos/azure-cosmos-test/CHANGELOG.md index 27bdb326a33c..2c6b3810f651 100644 --- a/sdk/cosmos/azure-cosmos-test/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-test/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed an issue where `FaultInjectionRule` can not apply on partition level when using `Gateway` Mode and non-session consistency - See [40005](https://github.com/Azure/azure-sdk-for-java/pull/40005) #### Other Changes From d13065b9b1aa0244300483c8667aa04061dd8945 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Thu, 2 May 2024 16:35:16 -0700 Subject: [PATCH 3/4] fix --- .../faultinjection/GatewayServerErrorInjector.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java index d3fd6d2292f4..7c93bf31d04c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/faultinjection/GatewayServerErrorInjector.java @@ -8,6 +8,7 @@ import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; @@ -61,13 +62,16 @@ private Mono> resolvePartitionKeyRange(RxDo // faultInjection rule can be configured to only apply for a certain partition // but in the normal flow, only session consistency will populate the resolvePartitionKey when apply session token // so for other consistencies, we need to calculate here + if (request.getResourceType() != ResourceType.Document) { + return Mono.just(Utils.ValueHolder.initialize(null)); + } if (this.collectionCache == null || this.partitionKeyRangeCache == null) { - return Mono.just(new Utils.ValueHolder<>(null)); + return Mono.just(Utils.ValueHolder.initialize(null)); } if (request == null || request.requestContext == null) { - return Mono.just(new Utils.ValueHolder<>(null)); + return Mono.just(Utils.ValueHolder.initialize(null)); } if (request.requestContext.resolvedPartitionKeyRange != null) { From 47de3027daba0a66390b985920ae7b98e60ae239 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Thu, 2 May 2024 17:32:34 -0700 Subject: [PATCH 4/4] fix logging --- .../com/azure/cosmos/implementation/GlobalEndpointManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index 7dc27b329c8c..3b4077ef7660 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -303,7 +303,7 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { this::getDatabaseAccountAsync); return databaseAccountObs.flatMap(dbAccount -> { - logger.info("db account retrieved {}", databaseAccountObs); + logger.info("db account retrieved {}", dbAccount); this.refreshInBackground.set(false); return this.refreshLocationPrivateAsync(dbAccount); });