Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 58 additions & 58 deletions sdk/cosmos/azure-cosmos-kafka-connect/doc/configuration-reference.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ public class CosmosSourceConfig extends KafkaCosmosConfig {
private static final String DEFAULT_METADATA_STORAGE_TYPE = CosmosMetadataStorageType.KAFKA.getName();

private static final String METADATA_STORAGE_NAME = "azure.cosmos.source.metadata.storage.name";
private static final String METADATA_STORAGE_NAME_DOC = "The resource name of the metadata storage. If metadata storage type is Kafka topic, then this config refers to kafka topic name, the metadata topic will be created if it does not already exist, else it will use the pre-created topic."
+ " If metadata storage type is CosmosDB container, then this config refers to container name, please pre-create the metadata container partitioned by /id.";
private static final String METADATA_STORAGE_NAME_DOC = "The resource name of the metadata storage."
+ " If metadata storage type is Kafka topic, then this config refers to kafka topic name, the metadata topic will be created if it does not already exist, else it will use the pre-created topic."
+ " If metadata storage type is `Cosmos`, then this config refers to container name, for `MasterKey` auth, this container will be created with `AutoScale` with 4000 RU if not already exists, for `ServicePrincipal` auth, it requires the container to be created ahead of time .";
private static final String METADATA_STORAGE_NAME_DISPLAY = "The metadata storage name.";
private static final String DEFAULT_METADATA_STORAGE_NAME = "_cosmos.metadata.topic";

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-test/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue where `FaultInjectionRule` can not apply on partition level when using `Gateway` Mode and non-session consistency - See [40005](https://github.com/Azure/azure-sdk-for-java/pull/40005)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public FaultInjectionServerErrorRuleOnGatewayTests(CosmosClientBuilder clientBui
this.subscriberValidationTimeout = TIMEOUT;
}

@BeforeClass(groups = {"multi-master", "long"}, timeOut = TIMEOUT)
@BeforeClass(groups = {"multi-master", "fast"}, timeOut = TIMEOUT)
public void beforeClass() {
client = getClientBuilder().buildAsyncClient();
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(client);
Expand Down Expand Up @@ -225,79 +225,95 @@ public void faultInjectionServerErrorRuleTests_Region() throws JsonProcessingExc
}
}

@Test(groups = {"multi-master", "long"}, timeOut = 4 * TIMEOUT)
@Test(groups = {"multi-master", "fast"}, timeOut = 4 * TIMEOUT)
public void faultInjectionServerErrorRuleTests_Partition() throws JsonProcessingException {
for (int i = 0; i < 10; i++) {
cosmosAsyncContainer.createItem(TestItem.createNewItem()).block();
}
CosmosAsyncClient testClient = null;

// getting one item from each feedRange
List<FeedRange> feedRanges = cosmosAsyncContainer.getFeedRanges().block();
assertThat(feedRanges.size()).isGreaterThan(1);
try {
testClient = this.getClientBuilder()
.consistencyLevel(this.databaseAccount.getConsistencyPolicy().getDefaultConsistencyLevel())
.buildAsyncClient();

String query = "select * from c";
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setFeedRange(feedRanges.get(0));
TestItem itemOnFeedRange0 = cosmosAsyncContainer.queryItems(query, cosmosQueryRequestOptions, TestItem.class).blockFirst();
CosmosAsyncContainer testContainer =
testClient
.getDatabase(cosmosAsyncContainer.getDatabase().getId())
.getContainer(cosmosAsyncContainer.getId());

cosmosQueryRequestOptions.setFeedRange(feedRanges.get(1));
TestItem itemOnFeedRange1 = cosmosAsyncContainer.queryItems(query, cosmosQueryRequestOptions, TestItem.class).blockFirst();
for (int i = 0; i < 10; i++) {
testContainer.createItem(TestItem.createNewItem()).block();
}

// set rule by feed range
String feedRangeRuleId = "ServerErrorRule-FeedRange-" + UUID.randomUUID();
// getting one item from each feedRange
List<FeedRange> feedRanges = testContainer.getFeedRanges().block();
assertThat(feedRanges.size()).isGreaterThan(1);

String query = "select * from c";
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
cosmosQueryRequestOptions.setFeedRange(feedRanges.get(0));
TestItem itemOnFeedRange0 = testContainer.queryItems(query, cosmosQueryRequestOptions, TestItem.class).blockFirst();

cosmosQueryRequestOptions.setFeedRange(feedRanges.get(1));
TestItem itemOnFeedRange1 = testContainer.queryItems(query, cosmosQueryRequestOptions, TestItem.class).blockFirst();

// set rule by feed range
String feedRangeRuleId = "ServerErrorRule-FeedRange-" + UUID.randomUUID();

FaultInjectionRule serverErrorRuleByFeedRange =
new FaultInjectionRuleBuilder(feedRangeRuleId)
.condition(
new FaultInjectionConditionBuilder()
.connectionType(FaultInjectionConnectionType.GATEWAY)
.endpoints(new FaultInjectionEndpointBuilder(feedRanges.get(0)).build())
.build()
)
.result(
FaultInjectionResultBuilders
.getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST)
.times(1)
.build()
)
.duration(Duration.ofMinutes(5))
.build();

CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(serverErrorRuleByFeedRange)).block();
assertThat(
serverErrorRuleByFeedRange.getRegionEndpoints().size() == this.readRegionMap.size()
&& serverErrorRuleByFeedRange.getRegionEndpoints().containsAll(this.readRegionMap.keySet()));

FaultInjectionRule serverErrorRuleByFeedRange =
new FaultInjectionRuleBuilder(feedRangeRuleId)
.condition(
new FaultInjectionConditionBuilder()
.connectionType(FaultInjectionConnectionType.GATEWAY)
.endpoints(new FaultInjectionEndpointBuilder(feedRanges.get(0)).build())
.build()
)
.result(
FaultInjectionResultBuilders
.getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST)
.times(1)
.build()
)
.duration(Duration.ofMinutes(5))
.build();
// Issue a read item for the same feed range as configured in the fault injection rule
CosmosDiagnostics cosmosDiagnostics =
testContainer
.readItem(itemOnFeedRange0.getId(), new PartitionKey(itemOnFeedRange0.getId()), JsonNode.class)
.block()
.getDiagnostics();

CosmosFaultInjectionHelper.configureFaultInjectionRules(cosmosAsyncContainer, Arrays.asList(serverErrorRuleByFeedRange)).block();
assertThat(
serverErrorRuleByFeedRange.getRegionEndpoints().size() == this.readRegionMap.size()
&& serverErrorRuleByFeedRange.getRegionEndpoints().containsAll(this.readRegionMap.keySet()));

// Issue a read item for the same feed range as configured in the fault injection rule
CosmosDiagnostics cosmosDiagnostics =
cosmosAsyncContainer
.readItem(itemOnFeedRange0.getId(), new PartitionKey(itemOnFeedRange0.getId()), JsonNode.class)
.block()
.getDiagnostics();

this.validateHitCount(serverErrorRuleByFeedRange, 1, OperationType.Read, ResourceType.Document);
this.validateFaultInjectionRuleApplied(
cosmosDiagnostics,
OperationType.Read,
HttpConstants.StatusCodes.TOO_MANY_REQUESTS,
HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE,
feedRangeRuleId,
true
);

// Issue a read item to different feed range
try {
cosmosDiagnostics = cosmosAsyncContainer
.readItem(itemOnFeedRange1.getId(), new PartitionKey(itemOnFeedRange1.getId()), JsonNode.class)
.block()
.getDiagnostics();
this.validateNoFaultInjectionApplied(cosmosDiagnostics, OperationType.Read, FAULT_INJECTION_RULE_NON_APPLICABLE_ADDRESS);
this.validateHitCount(serverErrorRuleByFeedRange, 1, OperationType.Read, ResourceType.Document);
this.validateFaultInjectionRuleApplied(
cosmosDiagnostics,
OperationType.Read,
HttpConstants.StatusCodes.TOO_MANY_REQUESTS,
HttpConstants.SubStatusCodes.USER_REQUEST_RATE_TOO_LARGE,
feedRangeRuleId,
true
);

// Issue a read item to different feed range
try {
cosmosDiagnostics = testContainer
.readItem(itemOnFeedRange1.getId(), new PartitionKey(itemOnFeedRange1.getId()), JsonNode.class)
.block()
.getDiagnostics();
this.validateNoFaultInjectionApplied(cosmosDiagnostics, OperationType.Read, FAULT_INJECTION_RULE_NON_APPLICABLE_ADDRESS);
} finally {
serverErrorRuleByFeedRange.disable();
}
} finally {
serverErrorRuleByFeedRange.disable();
safeClose(testClient);
}

}

@Test(groups = {"multi-master", "long"}, timeOut = 4 * TIMEOUT)
@Test(groups = {"multi-master", "fast"}, timeOut = 4 * TIMEOUT)
public void faultInjectionServerErrorRuleTests_ServerResponseDelay() throws JsonProcessingException {
// define another rule which can simulate timeout
String timeoutRuleId = "serverErrorRule-responseDelay-" + UUID.randomUUID();
Expand Down Expand Up @@ -347,7 +363,7 @@ public void faultInjectionServerErrorRuleTests_ServerResponseDelay() throws Json
}
}

@Test(groups = {"multi-master", "long"}, timeOut = 4 * TIMEOUT)
@Test(groups = {"multi-master", "fast"}, timeOut = 4 * TIMEOUT)
public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws JsonProcessingException {
// simulate high channel acquisition/connectionTimeout
String ruleId = "serverErrorRule-serverConnectionDelay-" + UUID.randomUUID();
Expand Down Expand Up @@ -388,7 +404,7 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws Js
}
}

@Test(groups = {"multi-master", "long"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT)
@Test(groups = {"multi-master", "fast"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT)
public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
FaultInjectionServerErrorType serverErrorType,
boolean canRetry,
Expand Down Expand Up @@ -460,7 +476,7 @@ public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
}
}

@Test(groups = {"multi-master", "long"}, dataProvider = "operationTypeProvider", timeOut = TIMEOUT)
@Test(groups = {"multi-master", "fast"}, dataProvider = "operationTypeProvider", timeOut = TIMEOUT)
public void faultInjectionServerErrorRuleTests_HitLimit(
OperationType operationType,
FaultInjectionOperationType faultInjectionOperationType) throws JsonProcessingException {
Expand Down Expand Up @@ -518,7 +534,7 @@ public void faultInjectionServerErrorRuleTests_HitLimit(
}
}

@AfterClass(groups = {"multi-master", "long"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
@AfterClass(groups = {"multi-master", "fast"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
safeClose(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public Flux<Void> submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiv
@Override
public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider, Configs configs) {
if (this.gatewayServerErrorInjector == null) {
this.gatewayServerErrorInjector = new GatewayServerErrorInjector(configs);
this.gatewayServerErrorInjector = new GatewayServerErrorInjector(configs, collectionCache, partitionKeyRangeCache);
}

this.gatewayServerErrorInjector.registerServerErrorInjector(injectorProvider.getServerErrorInjector());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@

package com.azure.cosmos.implementation.faultinjection;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.HttpResponse;
import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.timeout.ReadTimeoutException;
import reactor.core.publisher.Mono;
Expand All @@ -25,31 +33,95 @@
public class GatewayServerErrorInjector {

private final Configs configs;
private final RxCollectionCache collectionCache;
private final RxPartitionKeyRangeCache partitionKeyRangeCache;

private List<IServerErrorInjector> faultInjectors = new ArrayList<>();

public GatewayServerErrorInjector(Configs configs) {
public GatewayServerErrorInjector(
Configs configs,
RxCollectionCache collectionCache,
RxPartitionKeyRangeCache partitionKeyRangeCache) {
checkNotNull(configs, "Argument 'configs' can not be null");
this.configs = configs;
this.collectionCache = collectionCache;
this.partitionKeyRangeCache = partitionKeyRangeCache;
}

public GatewayServerErrorInjector(Configs configs) {
this(configs, null, null);
}

public void registerServerErrorInjector(IServerErrorInjector serverErrorInjector) {
checkNotNull(serverErrorInjector, "Argument 'serverErrorInjector' can not be null");
this.faultInjectors.add(serverErrorInjector);
}

private Mono<Utils.ValueHolder<PartitionKeyRange>> resolvePartitionKeyRange(RxDocumentServiceRequest request) {
// faultInjection rule can be configured to only apply for a certain partition
// but in the normal flow, only session consistency will populate the resolvePartitionKey when apply session token
// so for other consistencies, we need to calculate here

if (this.collectionCache == null || this.partitionKeyRangeCache == null) {
return Mono.just(new Utils.ValueHolder<>(null));
}

if (request == null || request.requestContext == null) {
return Mono.just(new Utils.ValueHolder<>(null));
}

if (request.requestContext.resolvedPartitionKeyRange != null) {
return Mono.just(Utils.ValueHolder.initialize(request.requestContext.resolvedPartitionKeyRange));
}

return this.collectionCache
.resolveCollectionAsync(
BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request)
.flatMap(collectionValueHolder -> {
return partitionKeyRangeCache
.tryLookupAsync(
BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
collectionValueHolder.v.getResourceId(),
null,
null)
.flatMap(collectionRoutingMapValueHolder -> {
String partitionKeyRangeId =
request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID);
PartitionKeyInternal partitionKeyInternal = request.getPartitionKeyInternal();
if (StringUtils.isNotEmpty(partitionKeyRangeId)) {
PartitionKeyRange range =
collectionRoutingMapValueHolder.v.getRangeByPartitionKeyRangeId(partitionKeyRangeId);
request.requestContext.resolvedPartitionKeyRange = range;
} else if (partitionKeyInternal != null) {
String effectivePartitionKeyString = PartitionKeyInternalHelper
.getEffectivePartitionKeyString(
partitionKeyInternal,
collectionValueHolder.v.getPartitionKey());
PartitionKeyRange range =
collectionRoutingMapValueHolder.v.getRangeByEffectivePartitionKey(effectivePartitionKeyString);
request.requestContext.resolvedPartitionKeyRange = range;
}

return Mono.just(Utils.ValueHolder.initialize(request.requestContext.resolvedPartitionKeyRange));
});
});
}

public Mono<HttpResponse> injectGatewayErrors(
Duration responseTimeout,
HttpRequest httpRequest,
RxDocumentServiceRequest serviceRequest,
Mono<HttpResponse> originalResponseMono) {
return injectGatewayErrors(
responseTimeout,
httpRequest,
serviceRequest,
originalResponseMono,
serviceRequest.requestContext.resolvedPartitionKeyRange != null
? Arrays.asList(serviceRequest.requestContext.resolvedPartitionKeyRange.getId()) : null);

return this.resolvePartitionKeyRange(serviceRequest)
.flatMap(resolvedPartitionKeyRangeValueHolder -> {
return injectGatewayErrors(
responseTimeout,
httpRequest,
serviceRequest,
originalResponseMono,
resolvedPartitionKeyRangeValueHolder.v == null ? null : Arrays.asList(resolvedPartitionKeyRangeValueHolder.v.getId()));
});
}

public Mono<HttpResponse> injectGatewayErrors(
Expand Down