From a229099fadb8f185eb3e50464390ace9d1df549a Mon Sep 17 00:00:00 2001 From: philipthomas Date: Tue, 6 Feb 2024 15:25:52 -0500 Subject: [PATCH 01/17] test to prove no exception, no documents when change feed more is changed. --- ...orBuilderWithAllVersionsAndDeletesTests.cs | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs index 9292e4d1e7..85b04b5736 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs @@ -6,10 +6,13 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed { using System; using System.Collections.Generic; + using System.Diagnostics; + using System.Diagnostics.Metrics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json; [TestClass] [TestCategory("ChangeFeedProcessor with AllVersionsAndDeletes")] @@ -155,5 +158,149 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() Assert.Fail(exception.ToString()); } } + + /// + /// + /// + [TestMethod] + [Owner("philipthomas-MSFT")] + [Description("")] + public async Task WhenAllVersionsAndDeletesDocumentsAreReadAndLeaseContainerIsTheSameThenSwitchedToLatestVersionExpectsAnExceptionTestMeAsync() + { + int documentCount = 10; + _ = await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .IngestDocumentsAsync( + monitoredContainer: this.Container, + documentCount: documentCount); + + ManualResetEvent allDocsProcessed = new(false); + + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.ArrangeActAssetLastestVersionChangeFeedProcessorAsync( + monitoredContainer: this.Container, + documentCount: documentCount, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed); + + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( + monitoredContainer: this.Container, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed); + } + + private static async Task> IngestDocumentsAsync(Container monitoredContainer, int documentCount) + { + List docs = new(); + + for (int i = 0; i < documentCount; i++) + { + ItemResponse response = await monitoredContainer.CreateItemAsync(new { id = i.ToString(), pk = i.ToString(), description = $"original test{i}" }, partitionKey: new PartitionKey(i.ToString())); + docs.Add(response.Resource); + + await Task.Delay(1000); + } + + return docs; + } + + private static async Task ArrangeActAssetLastestVersionChangeFeedProcessorAsync( + ContainerInternal monitoredContainer, + Container leaseContainer, + long documentCount, + ManualResetEvent allDocsProcessed) + { + Exception exception = default; + long counter = 0; + ChangeFeedProcessor latestVersionProcessorAtomic = null; + + ChangeFeedProcessorBuilder latestVersionProcessorBuilder = monitoredContainer + .GetChangeFeedProcessorBuilder(processorName: $"{nameof(ChangeFeedMode.LatestVersion)}", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection documents, CancellationToken token) => + { + Console.WriteLine($"Reading {nameof(documents)} in {nameof(ChangeFeedMode.LatestVersion)} mode: {JsonConvert.SerializeObject(documents)}"); + Console.WriteLine($"{nameof(counter)}: {counter}"); + Console.WriteLine($"{nameof(context.LeaseToken)}: {context.LeaseToken}"); + + if (counter == documentCount / 2) + { + Console.WriteLine($"stopping {nameof(latestVersionProcessorAtomic)}"); + + latestVersionProcessorAtomic.StopAsync(); + + return Task.CompletedTask; + } + + counter++; + + return Task.CompletedTask; + }) + .WithInstanceName(Guid.NewGuid().ToString()) + .WithMaxItems(1) + .WithStartFromBeginning() + .WithLeaseContainer(leaseContainer) + .WithErrorNotification((leaseToken, error) => + { + exception = error.InnerException; + Console.WriteLine(error.ToString()); + + return Task.CompletedTask; + }); + + ChangeFeedProcessor latestVersionProcessor = latestVersionProcessorBuilder.Build(); + Interlocked.Exchange(ref latestVersionProcessorAtomic, latestVersionProcessor); + + await latestVersionProcessor.StartAsync(); + await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); + bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + + await latestVersionProcessor.StopAsync(); + + if (exception != default) + { + Assert.Fail(exception.ToString()); + } + } + + private static async Task ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( + ContainerInternal monitoredContainer, + Container leaseContainer, + ManualResetEvent allDocsProcessed) + { + Exception exception = default; + long counter = 0; + ChangeFeedProcessor allVersionsAndDeletesProcessor = monitoredContainer + .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: $"{nameof(ChangeFeedMode.AllVersionsAndDeletes)}", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection> documents, CancellationToken token) => + { + Console.WriteLine($"Reading {nameof(documents)} in {nameof(ChangeFeedMode.AllVersionsAndDeletes)} mode: {JsonConvert.SerializeObject(documents)}"); + Console.WriteLine($"{nameof(counter)}: {counter}"); + Console.WriteLine($"{nameof(context.LeaseToken)}: {context.LeaseToken}"); + + counter++; + + return Task.CompletedTask; + }) + .WithInstanceName(Guid.NewGuid().ToString()) + .WithMaxItems(1) + .WithLeaseContainer(leaseContainer) + .WithErrorNotification((leaseToken, error) => + { + // an exception should happen here, because it is trying to use the same LatestVersion leaseContainer on an AllVersionsAndDeletes processor. + + exception = error.InnerException; + Console.WriteLine(error.ToString()); + + return Task.CompletedTask; + }) + .Build(); + + await allVersionsAndDeletesProcessor.StartAsync(); + await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); + bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + + await allVersionsAndDeletesProcessor.StopAsync(); + + if (exception != default) + { + Assert.Fail(exception.ToString()); + } + } } } From 59e8811a62c5bef9c39469a83165195362d31b8c Mon Sep 17 00:00:00 2001 From: philipthomas Date: Tue, 6 Feb 2024 15:28:31 -0500 Subject: [PATCH 02/17] renaming --- ...hangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs index 85b04b5736..650164346a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs @@ -221,7 +221,7 @@ private static async Task ArrangeActAssetLastestVersionChangeFeedProcessorAsync( if (counter == documentCount / 2) { - Console.WriteLine($"stopping {nameof(latestVersionProcessorAtomic)}"); + Console.WriteLine($"Stopping {nameof(latestVersionProcessorAtomic)}"); latestVersionProcessorAtomic.StopAsync(); @@ -251,7 +251,7 @@ private static async Task ArrangeActAssetLastestVersionChangeFeedProcessorAsync( await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); - await latestVersionProcessor.StopAsync(); + //////await latestVersionProcessor.StopAsync(); if (exception != default) { From 465143d8a6fbcf78a7a84324295e21894b0d38c5 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Thu, 29 Feb 2024 13:58:25 -0500 Subject: [PATCH 03/17] throwing exception when changeFeedMode change on CFP with same lease container --- .../src/EncryptionContainer.cs | 8 ++ .../ChangeFeedEstimatorRunner.cs | 13 +- .../ChangeFeedProcessorBuilder.cs | 9 +- .../ChangeFeedProcessorCore.cs | 100 ++++++++++++- .../Configuration/ChangeFeedLeaseOptions.cs | 5 + .../LeaseManagement/DocumentServiceLease.cs | 5 + .../DocumentServiceLeaseCore.cs | 3 + .../DocumentServiceLeaseCoreEpk.cs | 3 + .../DocumentServiceLeaseManagerCosmos.cs | 13 +- ...DocumentServiceLeaseStoreManagerBuilder.cs | 14 +- ...DocumentServiceLeaseStoreManagerOptions.cs | 2 + .../src/Resource/Container/Container.cs | 74 ++++++++++ .../Resource/Container/ContainerInlineCore.cs | 2 + .../Resource/Container/ContainerInternal.cs | 2 +- ...orBuilderWithAllVersionsAndDeletesTests.cs | 131 +++++++++++++++--- 15 files changed, 347 insertions(+), 37 deletions(-) diff --git a/Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs b/Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs index 0bd342f0ef..b4979a0baa 100644 --- a/Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs +++ b/Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs @@ -639,6 +639,14 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( }); } +#if ENCRYPTIONPREVIEW + public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes( + string processorName, + ChangeFeedHandler> onChangesDelegate) + { + throw new NotImplementedException(); + } +#endif public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint( string processorName, ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs index 8f8a8b9d81..0679678580 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs @@ -139,11 +139,14 @@ private async Task InitializeLeaseStoreAsync() { string monitoredContainerAndDatabaseRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(default); string leasePrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredContainerAndDatabaseRid); - DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( - monitoredContainer: this.monitoredContainer, - leaseContainer: this.leaseContainer, - leaseContainerPrefix: leasePrefix, - instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName); + DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder + .InitializeAsync( + monitoredContainer: this.monitoredContainer, + leaseContainer: this.leaseContainer, + leaseContainerPrefix: leasePrefix, + instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName, + changeFeedMode: this.changeFeedLeaseOptions.Mode) + .ConfigureAwait(false); this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer; } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs index df3db6f2bf..d91142e5aa 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs @@ -70,13 +70,20 @@ public ChangeFeedProcessorBuilder WithInstanceName(string instanceName) } /// - /// Sets the mode for the change freed processor. + /// Sets the on for a monitored + /// container. Setting the on for a lease + /// container will be used to track which on + /// for a monitored container and should be used to prevent switching on the + /// when using the same lease container. + /// This is based on an issue located at . /// /// /// The instance of to use. internal ChangeFeedProcessorBuilder WithChangeFeedMode(ChangeFeedMode changeFeedMode) { this.changeFeedProcessorOptions.Mode = changeFeedMode; + this.changeFeedLeaseOptions.Mode = changeFeedMode; + return this; } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 94c3e09dac..2183eff697 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -5,6 +5,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; + using System.Diagnostics; + using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; @@ -12,8 +14,11 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; using Microsoft.Azure.Cosmos.ChangeFeed.Utils; + using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; internal sealed class ChangeFeedProcessorCore : ChangeFeedProcessor { @@ -75,23 +80,110 @@ public override async Task StopAsync() private async Task InitializeAsync() { string containerRid = await this.monitoredContainer.GetCachedRIDAsync( - forceRefresh: false, - NoOpTrace.Singleton, + forceRefresh: false, + NoOpTrace.Singleton, default); + string monitoredDatabaseAndContainerRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(); + +#if PREVIEW + await this + .ChangeFeedModeSwitchingCheckAsync( + key: monitoredDatabaseAndContainerRid) + .ConfigureAwait(false); +#endif + string leaseContainerPrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredDatabaseAndContainerRid); Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.monitoredContainer.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton); if (this.documentServiceLeaseStoreManager == null) { - this.documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(this.monitoredContainer, this.leaseContainer, leaseContainerPrefix, this.instanceName).ConfigureAwait(false); + this.documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder + .InitializeAsync( + this.monitoredContainer, + this.leaseContainer, + leaseContainerPrefix, + this.instanceName, + changeFeedMode: this.changeFeedProcessorOptions.Mode) + .ConfigureAwait(false); } this.partitionManager = this.BuildPartitionManager( - containerRid, + containerRid, partitionKeyRangeCache); this.initialized = true; } + /// + /// If the lease's lease document is found, this method checks for lease + /// document's and if the new is different + /// from the current , a is thrown. + /// This is based on an issue located at . + /// + /// + private async Task ChangeFeedModeSwitchingCheckAsync(string key) + { + FeedIterator feedIterator = this.leaseContainer.GetItemQueryIterator(queryText: "SELECT * FROM c"); + + while (feedIterator.HasMoreResults) + { + FeedResponse feedResponses = await feedIterator + .ReadNextAsync() + .ConfigureAwait(false); + + bool shouldThrowException = false; + string currentMode = default; + string newMode = this.GetChangeFeedMode(); + + foreach (dynamic response in feedResponses) + { + // NOTE(philipthomas-MSFT): ChangeFeedMode is not set for older leases. + // Since Full-Fidelity Feed is not public at the time we are implementing + // this, all lease documents are Incremental Feeds by default. So if the + // new incoming request is for a Full-Fidelity Feed, then we know a switch + // is happening, and we want to throw the BadRequest CosmosException. + // This is based on an issue located at https://github.com/Azure/azure-cosmos-dotnet-v3/issues/4308. + + if (response.Mode != null) + { + currentMode = response.Mode?.ToString(); + } + + if (currentMode == string.Empty) + { + if (this.changeFeedLeaseOptions.Mode != ChangeFeedMode.Incremental) + { + shouldThrowException = true; + + break; + } + } + + if (response.id.ToString().Contains(key) && currentMode != newMode) + { + shouldThrowException = true; + + break; + } + } + + if (shouldThrowException) + { + CosmosException cosmosException = CosmosExceptionFactory.CreateBadRequestException( + message: $"Switching {nameof(ChangeFeedMode)} {currentMode} to {newMode} is not allowed.", + headers: default); + + throw cosmosException; + } + } + } + + private string GetChangeFeedMode() + { + return this.changeFeedLeaseOptions.Mode == ChangeFeedMode.AllVersionsAndDeletes + ? HttpConstants.A_IMHeaderValues.FullFidelityFeed + : HttpConstants.A_IMHeaderValues.IncrementalFeed; + } + private PartitionManager BuildPartitionManager( string containerRid, Routing.PartitionKeyRangeCache partitionKeyRangeCache) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs index e72e00efb9..dac334c038 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs @@ -44,5 +44,10 @@ public ChangeFeedLeaseOptions() /// instances pointing at the same feed while using the same auxiliary collection. /// public string LeasePrefix { get; set; } + + /// + /// Gets or sets the . + /// + public ChangeFeedMode Mode { get; set; } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs index 95200f48ee..9c5165cf00 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs @@ -70,5 +70,10 @@ internal abstract class DocumentServiceLease /// Gets or sets custom lease properties which can be managed from . /// public abstract Dictionary Properties { get; set; } + + /// + /// Gets or sets the . + /// + public abstract string Mode { get; set; } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs index 65e4e6e725..ed6b050278 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs @@ -95,6 +95,9 @@ public override DateTime Timestamp [JsonProperty("_ts")] private long TS { get; set; } + [JsonProperty("Mode", NullValueHandling = NullValueHandling.Ignore)] + public override string Mode { get; set; } + public override string ToString() { return string.Format( diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs index 55c4105c3a..ab46ebba29 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs @@ -66,6 +66,9 @@ public override DateTime Timestamp [JsonProperty("properties")] public override Dictionary Properties { get; set; } = new Dictionary(); + [JsonProperty("Mode", NullValueHandling = NullValueHandling.Ignore)] + public override string Mode { get; set; } + [JsonProperty("timestamp")] private DateTime? ExplicitTimestamp { get; set; } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs index d5a75abb47..0f343a7732 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs @@ -124,7 +124,8 @@ public override Task CreateLeaseIfNotExistAsync( LeaseId = leaseDocId, LeaseToken = leaseToken, ContinuationToken = continuationToken, - FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange()) + FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange()), + Mode = this.GetChangeFeedMode() }; this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString()); @@ -148,7 +149,8 @@ public override Task CreateLeaseIfNotExistAsync( LeaseId = leaseDocId, LeaseToken = leaseToken, ContinuationToken = continuationToken, - FeedRange = feedRange + FeedRange = feedRange, + Mode = this.GetChangeFeedMode() }; this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString()); @@ -156,6 +158,13 @@ public override Task CreateLeaseIfNotExistAsync( return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease); } + private string GetChangeFeedMode() + { + return this.options.Mode == ChangeFeedMode.AllVersionsAndDeletes + ? HttpConstants.A_IMHeaderValues.FullFidelityFeed + : HttpConstants.A_IMHeaderValues.IncrementalFeed; + } + public override async Task ReleaseAsync(DocumentServiceLease lease) { if (lease == null) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs index 70358bca22..f0b754579c 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs @@ -22,7 +22,8 @@ public static async Task InitializeAsync( ContainerInternal monitoredContainer, ContainerInternal leaseContainer, string leaseContainerPrefix, - string instanceName) + string instanceName, + ChangeFeedMode changeFeedMode = default) { ContainerProperties containerProperties = await leaseContainer.GetCachedContainerPropertiesAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default); @@ -58,7 +59,8 @@ public static async Task InitializeAsync( .WithMonitoredContainer(monitoredContainer) .WithLeaseContainer(leaseContainer) .WithRequestOptionsFactory(requestOptionsFactory) - .WithHostName(instanceName); + .WithHostName(instanceName) + .WithChangeFeedMode(changeFeedMode); return leaseStoreManagerBuilder.Build(); } @@ -70,7 +72,7 @@ public static async Task InitializeAsync( private DocumentServiceLeaseStoreManagerBuilder WithMonitoredContainer(ContainerInternal monitoredContainer) { - this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(leaseContainer)); + this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer)); return this; } @@ -98,6 +100,12 @@ private DocumentServiceLeaseStoreManagerBuilder WithHostName(string hostName) return this; } + private DocumentServiceLeaseStoreManagerBuilder WithChangeFeedMode(ChangeFeedMode changeFeedMode) + { + this.options.Mode = changeFeedMode ?? throw new ArgumentNullException(nameof(changeFeedMode)); + return this; + } + private DocumentServiceLeaseStoreManager Build() { if (this.monitoredContainer == null) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerOptions.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerOptions.cs index 146c7a32a3..942db791db 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerOptions.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerOptions.cs @@ -16,5 +16,7 @@ internal string GetPartitionLeasePrefix() { return this.ContainerNamePrefix + PartitionLeasePrefixSeparator; } + + internal ChangeFeedMode Mode { get; set; } } } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs index 21d660b744..c65c11d309 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs @@ -1682,6 +1682,80 @@ public abstract Task> GetPartitionKeyRangesAsync( FeedRange feedRange, CancellationToken cancellationToken = default); + /// + /// Initializes a for change feed processing with all versions and deletes. + /// + /// Document type + /// A name that identifies the Processor and the particular work it will do. + /// Delegate to receive all changes and deletes + /// + /// + /// > documents, CancellationToken token) => + /// { + /// Console.WriteLine($"number of documents processed: {documents.Count}"); + /// + /// string id = default; + /// string pk = default; + /// string description = default; + /// + /// foreach (ChangeFeedItemChange changeFeedItem in documents) + /// { + /// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete) + /// { + /// id = changeFeedItem.Current.id.ToString(); + /// pk = changeFeedItem.Current.pk.ToString(); + /// description = changeFeedItem.Current.description.ToString(); + /// } + /// else + /// { + /// id = changeFeedItem.Previous.id.ToString(); + /// pk = changeFeedItem.Previous.pk.ToString(); + /// description = changeFeedItem.Previous.description.ToString(); + /// } + /// + /// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType; + /// long previousLsn = changeFeedItem.Metadata.PreviousLsn; + /// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp; + /// long lsn = changeFeedItem.Metadata.Lsn; + /// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired; + /// } + /// + /// return Task.CompletedTask; + /// }) + /// .WithInstanceName(Guid.NewGuid().ToString()) + /// .WithLeaseContainer(leaseContainer) + /// .WithErrorNotification((leaseToken, error) => + /// { + /// Console.WriteLine(error.ToString()); + /// + /// return Task.CompletedTask; + /// }) + /// .Build(); + /// + /// await changeFeedProcessor.StartAsync(); + /// await Task.Delay(1000); + /// await this.Container.CreateItemAsync(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1")); + /// await this.Container.UpsertItemAsync(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1")); + /// await this.Container.DeleteItemAsync(id: "1", partitionKey: new PartitionKey("1")); + /// + /// allProcessedDocumentsEvent.WaitOne(10 * 1000); + /// + /// await changeFeedProcessor.StopAsync(); + /// ]]> + /// + /// + /// An instance of + public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes( + string processorName, + ChangeFeedHandler> onChangesDelegate); #endif } } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs index ede4aebc22..696fbe4bb3 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs @@ -662,6 +662,7 @@ public override Task DeleteAllItemsByPartitionKeyStreamAsync( openTelemetry: (response) => new OpenTelemetryResponse(response)); } +#if PREVIEW public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes( string processorName, ChangeFeedHandler> onChangesDelegate) @@ -670,5 +671,6 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllV processorName, onChangesDelegate); } +#endif } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs index 27b007ed8e..b52c865b10 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs @@ -147,7 +147,6 @@ public abstract Task DeleteAllItemsByPartitionKeyStreamAsync( public abstract Task> GetPartitionKeyRangesAsync( FeedRange feedRange, CancellationToken cancellationToken = default); -#endif /// /// Initializes a for change feed processing with all versions and deletes. @@ -223,6 +222,7 @@ public abstract Task> GetPartitionKeyRangesAsync( public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes( string processorName, ChangeFeedHandler> onChangesDelegate); +#endif public abstract class TryExecuteQueryResult { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs index 650164346a..39e4615382 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs @@ -6,9 +6,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed { using System; using System.Collections.Generic; - using System.Diagnostics; - using System.Diagnostics.Metrics; using System.Linq; + using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -159,13 +158,14 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() } } +#if PREVIEW /// - /// + /// This is based on an issue located at . /// [TestMethod] [Owner("philipthomas-MSFT")] - [Description("")] - public async Task WhenAllVersionsAndDeletesDocumentsAreReadAndLeaseContainerIsTheSameThenSwitchedToLatestVersionExpectsAnExceptionTestMeAsync() + [Description("ChangeFeedMode switch from LatestVersion to AllVersionsAndDeletes")] + public async Task WhenLatestVersionSwitchToAllVersionsAndDeletesExpectsACosmosExceptionTestAsync() { int documentCount = 10; _ = await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests @@ -175,16 +175,91 @@ public async Task WhenAllVersionsAndDeletesDocumentsAreReadAndLeaseContainerIsTh ManualResetEvent allDocsProcessed = new(false); - await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.ArrangeActAssetLastestVersionChangeFeedProcessorAsync( - monitoredContainer: this.Container, - documentCount: documentCount, - leaseContainer: this.LeaseContainer, - allDocsProcessed: allDocsProcessed); + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .ArrangeActAssetLastestVersionChangeFeedProcessorAsync( + monitoredContainer: this.Container, + documentCount: documentCount, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed); + + CosmosException exception = await Assert.ThrowsExceptionAsync( + () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( + monitoredContainer: this.Container, + documentCount: documentCount, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed)); + + Assert.AreEqual(expected: HttpStatusCode.BadRequest, actual: exception.StatusCode); + Assert.AreEqual(expected: default, actual: exception.SubStatusCode); + Assert.AreEqual(expected: "Switching ChangeFeedMode Incremental Feed to Full-Fidelity Feed is not allowed.", actual: exception.ResponseBody); + } + + /// + /// This is based on an issue located at . + /// + [TestMethod] + [Owner("philipthomas-MSFT")] + [Description("ChangeFeedMode switch from AllVersionsAndDeletes to LatestVersion, a CosmosException is expected.")] + public async Task WhenAllVersionsAndDeletesSwitchToLatestVersionExpectsACosmosExceptionTestAsync() + { + int documentCount = 10; + _ = await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .IngestDocumentsAsync( + monitoredContainer: this.Container, + documentCount: documentCount); + + ManualResetEvent allDocsProcessed = new(false); + + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( + monitoredContainer: this.Container, + documentCount: documentCount, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed); + + CosmosException exception = await Assert.ThrowsExceptionAsync( + () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .ArrangeActAssetLastestVersionChangeFeedProcessorAsync( + monitoredContainer: this.Container, + documentCount: documentCount, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed)); + + Assert.AreEqual(expected: HttpStatusCode.BadRequest, actual: exception.StatusCode); + Assert.AreEqual(expected: default, actual: exception.SubStatusCode); + Assert.AreEqual(expected: "Switching ChangeFeedMode Full-Fidelity Feed to Incremental Feed is not allowed.", actual: exception.ResponseBody); + } + + /// + /// This is based on an issue located at . + /// + [TestMethod] + [Owner("philipthomas-MSFT")] + [Description("No ChangeFeedMode switch, no CosmosException is expected.")] + public async Task WhenNoSwitchDoesNotExpectACosmosExceptionTestAsync() + { + int documentCount = 10; + _ = await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .IngestDocumentsAsync( + monitoredContainer: this.Container, + documentCount: documentCount); - await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( - monitoredContainer: this.Container, - leaseContainer: this.LeaseContainer, - allDocsProcessed: allDocsProcessed); + ManualResetEvent allDocsProcessed = new(false); + + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( + monitoredContainer: this.Container, + documentCount: documentCount, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed); + + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( + monitoredContainer: this.Container, + documentCount: documentCount, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed); } private static async Task> IngestDocumentsAsync(Container monitoredContainer, int documentCount) @@ -236,6 +311,7 @@ private static async Task ArrangeActAssetLastestVersionChangeFeedProcessorAsync( .WithMaxItems(1) .WithStartFromBeginning() .WithLeaseContainer(leaseContainer) + .WithChangeFeedMode(ChangeFeedMode.LatestVersion) .WithErrorNotification((leaseToken, error) => { exception = error.InnerException; @@ -251,8 +327,6 @@ private static async Task ArrangeActAssetLastestVersionChangeFeedProcessorAsync( await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); - //////await latestVersionProcessor.StopAsync(); - if (exception != default) { Assert.Fail(exception.ToString()); @@ -262,24 +336,38 @@ private static async Task ArrangeActAssetLastestVersionChangeFeedProcessorAsync( private static async Task ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( ContainerInternal monitoredContainer, Container leaseContainer, + long documentCount, ManualResetEvent allDocsProcessed) { Exception exception = default; long counter = 0; - ChangeFeedProcessor allVersionsAndDeletesProcessor = monitoredContainer + ChangeFeedProcessor allVersionsAndDeletesProcessorAtomic = null; + + ChangeFeedProcessorBuilder allVersionsAndDeletesProcessorBuilder = monitoredContainer .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: $"{nameof(ChangeFeedMode.AllVersionsAndDeletes)}", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection> documents, CancellationToken token) => { Console.WriteLine($"Reading {nameof(documents)} in {nameof(ChangeFeedMode.AllVersionsAndDeletes)} mode: {JsonConvert.SerializeObject(documents)}"); Console.WriteLine($"{nameof(counter)}: {counter}"); Console.WriteLine($"{nameof(context.LeaseToken)}: {context.LeaseToken}"); + if (counter == documentCount / 2) + { + Console.WriteLine($"Stopping {nameof(allVersionsAndDeletesProcessorAtomic)}"); + + allVersionsAndDeletesProcessorAtomic.StopAsync(); + + return Task.CompletedTask; + } + counter++; return Task.CompletedTask; }) .WithInstanceName(Guid.NewGuid().ToString()) .WithMaxItems(1) + .WithStartFromBeginning() .WithLeaseContainer(leaseContainer) + .WithChangeFeedMode(ChangeFeedMode.AllVersionsAndDeletes) .WithErrorNotification((leaseToken, error) => { // an exception should happen here, because it is trying to use the same LatestVersion leaseContainer on an AllVersionsAndDeletes processor. @@ -288,19 +376,20 @@ private static async Task ArrangeActAssertAllVersionsAndDeletesChangeFeedProcess Console.WriteLine(error.ToString()); return Task.CompletedTask; - }) - .Build(); + }); + + ChangeFeedProcessor allVersionsAndDeletesProcessor = allVersionsAndDeletesProcessorBuilder.Build(); + Interlocked.Exchange(ref allVersionsAndDeletesProcessorAtomic, allVersionsAndDeletesProcessor); await allVersionsAndDeletesProcessor.StartAsync(); await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); - await allVersionsAndDeletesProcessor.StopAsync(); - if (exception != default) { Assert.Fail(exception.ToString()); } } +#endif } } From b8001a61e676d1096e0afeaa4276f47ebafaae57 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Fri, 1 Mar 2024 10:26:34 -0500 Subject: [PATCH 04/17] removed PREVIEW tags under advisement. Well create a PR later --- .../src/EncryptionContainer.cs | 8 -- .../ChangeFeedProcessorCore.cs | 2 - .../src/Resource/Container/Container.cs | 75 ------------------- .../Resource/Container/ContainerInlineCore.cs | 2 - .../Resource/Container/ContainerInternal.cs | 2 +- ...orBuilderWithAllVersionsAndDeletesTests.cs | 2 - 6 files changed, 1 insertion(+), 90 deletions(-) diff --git a/Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs b/Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs index b4979a0baa..0bd342f0ef 100644 --- a/Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs +++ b/Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs @@ -639,14 +639,6 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( }); } -#if ENCRYPTIONPREVIEW - public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes( - string processorName, - ChangeFeedHandler> onChangesDelegate) - { - throw new NotImplementedException(); - } -#endif public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint( string processorName, ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 2183eff697..61d1a76129 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -86,12 +86,10 @@ private async Task InitializeAsync() string monitoredDatabaseAndContainerRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(); -#if PREVIEW await this .ChangeFeedModeSwitchingCheckAsync( key: monitoredDatabaseAndContainerRid) .ConfigureAwait(false); -#endif string leaseContainerPrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredDatabaseAndContainerRid); Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.monitoredContainer.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton); diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs index c65c11d309..5936f5e05d 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs @@ -1681,81 +1681,6 @@ public abstract Task DeleteAllItemsByPartitionKeyStreamAsync( public abstract Task> GetPartitionKeyRangesAsync( FeedRange feedRange, CancellationToken cancellationToken = default); - - /// - /// Initializes a for change feed processing with all versions and deletes. - /// - /// Document type - /// A name that identifies the Processor and the particular work it will do. - /// Delegate to receive all changes and deletes - /// - /// - /// > documents, CancellationToken token) => - /// { - /// Console.WriteLine($"number of documents processed: {documents.Count}"); - /// - /// string id = default; - /// string pk = default; - /// string description = default; - /// - /// foreach (ChangeFeedItemChange changeFeedItem in documents) - /// { - /// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete) - /// { - /// id = changeFeedItem.Current.id.ToString(); - /// pk = changeFeedItem.Current.pk.ToString(); - /// description = changeFeedItem.Current.description.ToString(); - /// } - /// else - /// { - /// id = changeFeedItem.Previous.id.ToString(); - /// pk = changeFeedItem.Previous.pk.ToString(); - /// description = changeFeedItem.Previous.description.ToString(); - /// } - /// - /// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType; - /// long previousLsn = changeFeedItem.Metadata.PreviousLsn; - /// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp; - /// long lsn = changeFeedItem.Metadata.Lsn; - /// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired; - /// } - /// - /// return Task.CompletedTask; - /// }) - /// .WithInstanceName(Guid.NewGuid().ToString()) - /// .WithLeaseContainer(leaseContainer) - /// .WithErrorNotification((leaseToken, error) => - /// { - /// Console.WriteLine(error.ToString()); - /// - /// return Task.CompletedTask; - /// }) - /// .Build(); - /// - /// await changeFeedProcessor.StartAsync(); - /// await Task.Delay(1000); - /// await this.Container.CreateItemAsync(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1")); - /// await this.Container.UpsertItemAsync(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1")); - /// await this.Container.DeleteItemAsync(id: "1", partitionKey: new PartitionKey("1")); - /// - /// allProcessedDocumentsEvent.WaitOne(10 * 1000); - /// - /// await changeFeedProcessor.StopAsync(); - /// ]]> - /// - /// - /// An instance of - public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes( - string processorName, - ChangeFeedHandler> onChangesDelegate); #endif } } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs index 696fbe4bb3..ede4aebc22 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs @@ -662,7 +662,6 @@ public override Task DeleteAllItemsByPartitionKeyStreamAsync( openTelemetry: (response) => new OpenTelemetryResponse(response)); } -#if PREVIEW public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes( string processorName, ChangeFeedHandler> onChangesDelegate) @@ -671,6 +670,5 @@ public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllV processorName, onChangesDelegate); } -#endif } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs index b52c865b10..27b007ed8e 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs @@ -147,6 +147,7 @@ public abstract Task DeleteAllItemsByPartitionKeyStreamAsync( public abstract Task> GetPartitionKeyRangesAsync( FeedRange feedRange, CancellationToken cancellationToken = default); +#endif /// /// Initializes a for change feed processing with all versions and deletes. @@ -222,7 +223,6 @@ public abstract Task> GetPartitionKeyRangesAsync( public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes( string processorName, ChangeFeedHandler> onChangesDelegate); -#endif public abstract class TryExecuteQueryResult { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs index 39e4615382..0f79aa785f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs @@ -158,7 +158,6 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() } } -#if PREVIEW /// /// This is based on an issue located at . /// @@ -390,6 +389,5 @@ private static async Task ArrangeActAssertAllVersionsAndDeletesChangeFeedProcess Assert.Fail(exception.ToString()); } } -#endif } } From 706453f5c769c6e9a05349a2f1bfe56c6c1db278 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 13 Mar 2024 12:14:46 -0400 Subject: [PATCH 05/17] removed usings --- .../src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 61d1a76129..e6a07f5919 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -5,8 +5,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; - using System.Diagnostics; - using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; @@ -14,7 +12,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; using Microsoft.Azure.Cosmos.ChangeFeed.Utils; - using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; using Microsoft.Azure.Cosmos.Tracing; From c621e17412b8abdb8eeae8448969e763f7e72453 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 13 Mar 2024 12:18:06 -0400 Subject: [PATCH 06/17] revert to old summary --- .../src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs index d91142e5aa..49bfa9d816 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs @@ -70,12 +70,7 @@ public ChangeFeedProcessorBuilder WithInstanceName(string instanceName) } /// - /// Sets the on for a monitored - /// container. Setting the on for a lease - /// container will be used to track which on - /// for a monitored container and should be used to prevent switching on the - /// when using the same lease container. - /// This is based on an issue located at . + /// Sets the mode for the change feed processor. /// /// /// The instance of to use. From 5fa3dcd34c296dcc3dba962dbccf215485816153 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 13 Mar 2024 12:23:20 -0400 Subject: [PATCH 07/17] defaulting change feed mode to latestversion --- .../src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs index 0679678580..e3d4468d0f 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs @@ -145,7 +145,7 @@ private async Task InitializeLeaseStoreAsync() leaseContainer: this.leaseContainer, leaseContainerPrefix: leasePrefix, instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName, - changeFeedMode: this.changeFeedLeaseOptions.Mode) + changeFeedMode: ChangeFeedMode.LatestVersion) .ConfigureAwait(false); this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer; From f21b8ed0a4240a56e7e135e224cbe884f03f1448 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 20 Mar 2024 12:00:21 -0400 Subject: [PATCH 08/17] change feed mode exception on switching and tests based on last recommendation. --- .../ChangeFeedProcessorCore.cs | 107 +++---- ...orBuilderWithAllVersionsAndDeletesTests.cs | 287 +++++++++--------- 2 files changed, 195 insertions(+), 199 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index e6a07f5919..e046c50dd1 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -5,7 +5,10 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; + using System.Collections.Generic; + using System.Linq; using System.Threading.Tasks; + using global::Azure; using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement; @@ -82,12 +85,6 @@ private async Task InitializeAsync() default); string monitoredDatabaseAndContainerRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(); - - await this - .ChangeFeedModeSwitchingCheckAsync( - key: monitoredDatabaseAndContainerRid) - .ConfigureAwait(false); - string leaseContainerPrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredDatabaseAndContainerRid); Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.monitoredContainer.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton); if (this.documentServiceLeaseStoreManager == null) @@ -102,6 +99,8 @@ await this .ConfigureAwait(false); } + await this.ChangeFeedModeSwitchingCheckAsync(monitoredDatabaseAndContainerRid).ConfigureAwait(false); + this.partitionManager = this.BuildPartitionManager( containerRid, partitionKeyRangeCache); @@ -109,72 +108,66 @@ await this } /// - /// If the lease's lease document is found, this method checks for lease - /// document's and if the new is different - /// from the current , a is thrown. + /// If the lease container's lease document is found, this method checks for lease + /// document's ChangeFeedMode and if the new ChangeFeedMode is different + /// from the current ChangeFeedMode, a CosmosException is thrown. /// This is based on an issue located at . /// - /// - private async Task ChangeFeedModeSwitchingCheckAsync(string key) + /// + private async Task ChangeFeedModeSwitchingCheckAsync(string monitoredDatabaseAndContainerRid) { - FeedIterator feedIterator = this.leaseContainer.GetItemQueryIterator(queryText: "SELECT * FROM c"); + IReadOnlyList documentServiceLeases = await this.documentServiceLeaseStoreManager + .LeaseContainer + .GetAllLeasesAsync() + .ConfigureAwait(false); - while (feedIterator.HasMoreResults) + bool shouldThrowException = false; + + // No lease documents. Return. + if (documentServiceLeases.Count == 0) { - FeedResponse feedResponses = await feedIterator - .ReadNextAsync() - .ConfigureAwait(false); + return; + } - bool shouldThrowException = false; - string currentMode = default; - string newMode = this.GetChangeFeedMode(); + DocumentServiceLease documentServiceLease = documentServiceLeases.FirstOrDefault(lease => lease.Id.Contains(monitoredDatabaseAndContainerRid)); - foreach (dynamic response in feedResponses) + // No lease documents that match the Id. + if (documentServiceLease == default) + { + return; + } + + // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because + // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are + // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, a CosmosException should thrown. + if (string.IsNullOrEmpty(documentServiceLease.Mode)) + { + if (this.changeFeedLeaseOptions.Mode != ChangeFeedMode.LatestVersion) { - // NOTE(philipthomas-MSFT): ChangeFeedMode is not set for older leases. - // Since Full-Fidelity Feed is not public at the time we are implementing - // this, all lease documents are Incremental Feeds by default. So if the - // new incoming request is for a Full-Fidelity Feed, then we know a switch - // is happening, and we want to throw the BadRequest CosmosException. - // This is based on an issue located at https://github.com/Azure/azure-cosmos-dotnet-v3/issues/4308. - - if (response.Mode != null) - { - currentMode = response.Mode?.ToString(); - } - - if (currentMode == string.Empty) - { - if (this.changeFeedLeaseOptions.Mode != ChangeFeedMode.Incremental) - { - shouldThrowException = true; - - break; - } - } - - if (response.id.ToString().Contains(key) && currentMode != newMode) - { - shouldThrowException = true; - - break; - } + shouldThrowException = true; } + } - if (shouldThrowException) - { - CosmosException cosmosException = CosmosExceptionFactory.CreateBadRequestException( - message: $"Switching {nameof(ChangeFeedMode)} {currentMode} to {newMode} is not allowed.", - headers: default); + string changeFeedProcessorMode = this.NormalizeChangeFeedProcessorMode(this.changeFeedLeaseOptions.Mode); - throw cosmosException; - } + // If the ChangeFeedProcessor mode is not the mode in the lease document, a CosmosException should be thrown. + if (string.Compare(documentServiceLease.Mode, changeFeedProcessorMode, StringComparison.OrdinalIgnoreCase) != 0) + { + shouldThrowException = true; + } + + // If shouldThrowException is true, throw the CosmosException. + if (shouldThrowException) + { + throw CosmosExceptionFactory.CreateBadRequestException( + message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {changeFeedProcessorMode} is not allowed.", + headers: default); } } - private string GetChangeFeedMode() + private string NormalizeChangeFeedProcessorMode(ChangeFeedMode changeFeedMode) { - return this.changeFeedLeaseOptions.Mode == ChangeFeedMode.AllVersionsAndDeletes + return changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes ? HttpConstants.A_IMHeaderValues.FullFidelityFeed : HttpConstants.A_IMHeaderValues.IncrementalFeed; } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs index 0f79aa785f..5e5a62cd46 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs @@ -6,33 +6,21 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed { using System; using System.Collections.Generic; + using System.Diagnostics; using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; - using Newtonsoft.Json; [TestClass] - [TestCategory("ChangeFeedProcessor with AllVersionsAndDeletes")] + [TestCategory("ChangeFeedProcessor")] public class GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests : BaseChangeFeedClientHelper { - private ContainerInternal Container; - [TestInitialize] public async Task TestInitialize() { await base.ChangeFeedTestInit(); - - string PartitionKey = "/pk"; - ContainerProperties properties = new ContainerProperties(id: Guid.NewGuid().ToString(), - partitionKeyPath: PartitionKey); - properties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5); - - ContainerResponse response = await this.database.CreateContainerAsync(properties, - throughput: 10000, - cancellationToken: this.cancellationToken); - this.Container = (ContainerInternal)response; } [TestCleanup] @@ -44,13 +32,14 @@ public async Task Cleanup() [TestMethod] [Owner("philipthomas-MSFT")] [Description("Scenario: When a document is created, then updated, and finally deleted, there should be 3 changes that will appear for that " + - "document when using ChangeFeedProcessor with AllVersionsAndDeletes.")] + "document when using ChangeFeedProcessor with AllVersionsAndDeletes set as the ChangeFeedMode.")] public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() { + ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes); ManualResetEvent allDocsProcessed = new ManualResetEvent(false); Exception exception = default; - ChangeFeedProcessor processor = this.Container + ChangeFeedProcessor processor = monitoredContainer .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection> docs, CancellationToken token) => { string id = default; @@ -119,7 +108,7 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() Assert.IsTrue(condition: createChange.Metadata.Lsn < replaceChange.Metadata.Lsn, message: "The create operation must happen before the replace operation."); Assert.IsTrue(condition: createChange.Metadata.Lsn < replaceChange.Metadata.Lsn, message: "The replace operation must happen before the delete operation."); - Console.WriteLine("Assertions completed."); + Debug.WriteLine("Assertions completed."); return Task.CompletedTask; }) @@ -128,7 +117,9 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() .WithErrorNotification((leaseToken, error) => { exception = error.InnerException; - Console.WriteLine(error.ToString()); + + Debug.WriteLine("WithErrorNotification"); + Debug.WriteLine(error.ToString()); return Task.CompletedTask; }) @@ -140,13 +131,13 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() await processor.StartAsync(); await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); - await this.Container.CreateItemAsync(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1")); + await monitoredContainer.CreateItemAsync(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1")); await Task.Delay(1000); - - await this.Container.UpsertItemAsync(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1")); + + await monitoredContainer.UpsertItemAsync(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1")); await Task.Delay(1000); - await this.Container.DeleteItemAsync(id: "1", partitionKey: new PartitionKey("1")); + await monitoredContainer.DeleteItemAsync(id: "1", partitionKey: new PartitionKey("1")); bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); @@ -163,35 +154,36 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() /// [TestMethod] [Owner("philipthomas-MSFT")] - [Description("ChangeFeedMode switch from LatestVersion to AllVersionsAndDeletes")] - public async Task WhenLatestVersionSwitchToAllVersionsAndDeletesExpectsACosmosExceptionTestAsync() + [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor, switches from LatestVersion to AllVersionsAndDeletes," + + "a CosmosException is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] + [DataRow(false)] + [DataRow(true)] + public async Task WhenLatestVersionSwitchToAllVersionsAndDeletesExpectsACosmosExceptionTestAsync(bool withStartFromBeginning) { - int documentCount = 10; - _ = await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .IngestDocumentsAsync( - monitoredContainer: this.Container, - documentCount: documentCount); - + ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.LatestVersion); ManualResetEvent allDocsProcessed = new(false); await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .ArrangeActAssetLastestVersionChangeFeedProcessorAsync( - monitoredContainer: this.Container, - documentCount: documentCount, + .BuildChangeFeedProcessorWithLatestVersionAsync( + monitoredContainer: monitoredContainer, leaseContainer: this.LeaseContainer, - allDocsProcessed: allDocsProcessed); + allDocsProcessed: allDocsProcessed, + withStartFromBeginning: withStartFromBeginning); CosmosException exception = await Assert.ThrowsExceptionAsync( () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( - monitoredContainer: this.Container, - documentCount: documentCount, + .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync( + monitoredContainer: monitoredContainer, leaseContainer: this.LeaseContainer, allDocsProcessed: allDocsProcessed)); + Debug.WriteLine(exception.ToString()); + Assert.AreEqual(expected: HttpStatusCode.BadRequest, actual: exception.StatusCode); Assert.AreEqual(expected: default, actual: exception.SubStatusCode); Assert.AreEqual(expected: "Switching ChangeFeedMode Incremental Feed to Full-Fidelity Feed is not allowed.", actual: exception.ResponseBody); + + Debug.WriteLine("Assertions completed."); } /// @@ -199,35 +191,36 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests /// [TestMethod] [Owner("philipthomas-MSFT")] - [Description("ChangeFeedMode switch from AllVersionsAndDeletes to LatestVersion, a CosmosException is expected.")] - public async Task WhenAllVersionsAndDeletesSwitchToLatestVersionExpectsACosmosExceptionTestAsync() + [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor, switches from AllVersionsAndDeletes to LatestVersion," + + "a CosmosException is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] + [DataRow(false)] + [DataRow(true)] + public async Task WhenAllVersionsAndDeletesSwitchToLatestVersionExpectsACosmosExceptionTestAsync(bool withStartFromBeginning) { - int documentCount = 10; - _ = await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .IngestDocumentsAsync( - monitoredContainer: this.Container, - documentCount: documentCount); - + ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes); ManualResetEvent allDocsProcessed = new(false); await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( - monitoredContainer: this.Container, - documentCount: documentCount, + .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync( + monitoredContainer: monitoredContainer, leaseContainer: this.LeaseContainer, allDocsProcessed: allDocsProcessed); CosmosException exception = await Assert.ThrowsExceptionAsync( () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .ArrangeActAssetLastestVersionChangeFeedProcessorAsync( - monitoredContainer: this.Container, - documentCount: documentCount, + .BuildChangeFeedProcessorWithLatestVersionAsync( + monitoredContainer: monitoredContainer, leaseContainer: this.LeaseContainer, - allDocsProcessed: allDocsProcessed)); + allDocsProcessed: allDocsProcessed, + withStartFromBeginning: withStartFromBeginning)); + + Debug.WriteLine(exception.ToString()); Assert.AreEqual(expected: HttpStatusCode.BadRequest, actual: exception.StatusCode); Assert.AreEqual(expected: default, actual: exception.SubStatusCode); Assert.AreEqual(expected: "Switching ChangeFeedMode Full-Fidelity Feed to Incremental Feed is not allowed.", actual: exception.ResponseBody); + + Debug.WriteLine("Assertions completed."); } /// @@ -235,94 +228,106 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests /// [TestMethod] [Owner("philipthomas-MSFT")] - [Description("No ChangeFeedMode switch, no CosmosException is expected.")] - public async Task WhenNoSwitchDoesNotExpectACosmosExceptionTestAsync() + [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor does not switch, AllVersionsAndDeletes," + + "no CosmosException is expected.")] + public async Task WhenNoSwitchAllVersionsAndDeletesFDoesNotExpectACosmosExceptionTestAsync() { - int documentCount = 10; - _ = await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .IngestDocumentsAsync( - monitoredContainer: this.Container, - documentCount: documentCount); - + ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes); ManualResetEvent allDocsProcessed = new(false); - await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( - monitoredContainer: this.Container, - documentCount: documentCount, - leaseContainer: this.LeaseContainer, - allDocsProcessed: allDocsProcessed); + try + { + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync( + monitoredContainer: monitoredContainer, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed); - await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests - .ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( - monitoredContainer: this.Container, - documentCount: documentCount, - leaseContainer: this.LeaseContainer, - allDocsProcessed: allDocsProcessed); + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync( + monitoredContainer: monitoredContainer, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed); + + Debug.WriteLine("No exceptions occurred."); + } + catch + { + Assert.Fail("An exception occurred when one was not expceted."); ; + } } - private static async Task> IngestDocumentsAsync(Container monitoredContainer, int documentCount) + /// + /// This is based on an issue located at . + /// + [TestMethod] + [Owner("philipthomas-MSFT")] + [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor does not switch, LatestVersion," + + "no CosmosException is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] + [DataRow(false)] + [DataRow(true)] + public async Task WhenNoSwitchLatestVersionDoesNotExpectACosmosExceptionTestAsync(bool withStartFromBeginning) { - List docs = new(); + ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.LatestVersion); + ManualResetEvent allDocsProcessed = new(false); - for (int i = 0; i < documentCount; i++) + try { - ItemResponse response = await monitoredContainer.CreateItemAsync(new { id = i.ToString(), pk = i.ToString(), description = $"original test{i}" }, partitionKey: new PartitionKey(i.ToString())); - docs.Add(response.Resource); + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .BuildChangeFeedProcessorWithLatestVersionAsync( + monitoredContainer: monitoredContainer, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed, + withStartFromBeginning: withStartFromBeginning); - await Task.Delay(1000); - } + await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests + .BuildChangeFeedProcessorWithLatestVersionAsync( + monitoredContainer: monitoredContainer, + leaseContainer: this.LeaseContainer, + allDocsProcessed: allDocsProcessed, + withStartFromBeginning: withStartFromBeginning); - return docs; + Debug.WriteLine("No exceptions occurred."); + } + catch + { + Assert.Fail("An exception occurred when one was not expceted."); ; + } } - private static async Task ArrangeActAssetLastestVersionChangeFeedProcessorAsync( + private static async Task BuildChangeFeedProcessorWithLatestVersionAsync( ContainerInternal monitoredContainer, Container leaseContainer, - long documentCount, - ManualResetEvent allDocsProcessed) + ManualResetEvent allDocsProcessed, + bool withStartFromBeginning) { Exception exception = default; - long counter = 0; ChangeFeedProcessor latestVersionProcessorAtomic = null; - ChangeFeedProcessorBuilder latestVersionProcessorBuilder = monitoredContainer - .GetChangeFeedProcessorBuilder(processorName: $"{nameof(ChangeFeedMode.LatestVersion)}", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection documents, CancellationToken token) => - { - Console.WriteLine($"Reading {nameof(documents)} in {nameof(ChangeFeedMode.LatestVersion)} mode: {JsonConvert.SerializeObject(documents)}"); - Console.WriteLine($"{nameof(counter)}: {counter}"); - Console.WriteLine($"{nameof(context.LeaseToken)}: {context.LeaseToken}"); - - if (counter == documentCount / 2) - { - Console.WriteLine($"Stopping {nameof(latestVersionProcessorAtomic)}"); - - latestVersionProcessorAtomic.StopAsync(); - - return Task.CompletedTask; - } - - counter++; - - return Task.CompletedTask; - }) + ChangeFeedProcessorBuilder processorBuilder = monitoredContainer + .GetChangeFeedProcessorBuilder(processorName: $"processorName", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection documents, CancellationToken token) => Task.CompletedTask) .WithInstanceName(Guid.NewGuid().ToString()) - .WithMaxItems(1) - .WithStartFromBeginning() .WithLeaseContainer(leaseContainer) - .WithChangeFeedMode(ChangeFeedMode.LatestVersion) .WithErrorNotification((leaseToken, error) => { exception = error.InnerException; - Console.WriteLine(error.ToString()); + + Debug.WriteLine("WithErrorNotification"); + Debug.WriteLine(error.ToString()); return Task.CompletedTask; }); - ChangeFeedProcessor latestVersionProcessor = latestVersionProcessorBuilder.Build(); - Interlocked.Exchange(ref latestVersionProcessorAtomic, latestVersionProcessor); + if (withStartFromBeginning) + { + processorBuilder.WithStartFromBeginning(); + } + - await latestVersionProcessor.StartAsync(); + ChangeFeedProcessor processor = processorBuilder.Build(); + Interlocked.Exchange(ref latestVersionProcessorAtomic, processor); + + await processor.StartAsync(); await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); @@ -332,55 +337,33 @@ private static async Task ArrangeActAssetLastestVersionChangeFeedProcessorAsync( } } - private static async Task ArrangeActAssertAllVersionsAndDeletesChangeFeedProcessorAsync( + private static async Task BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync( ContainerInternal monitoredContainer, Container leaseContainer, - long documentCount, ManualResetEvent allDocsProcessed) { Exception exception = default; - long counter = 0; ChangeFeedProcessor allVersionsAndDeletesProcessorAtomic = null; ChangeFeedProcessorBuilder allVersionsAndDeletesProcessorBuilder = monitoredContainer - .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: $"{nameof(ChangeFeedMode.AllVersionsAndDeletes)}", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection> documents, CancellationToken token) => - { - Console.WriteLine($"Reading {nameof(documents)} in {nameof(ChangeFeedMode.AllVersionsAndDeletes)} mode: {JsonConvert.SerializeObject(documents)}"); - Console.WriteLine($"{nameof(counter)}: {counter}"); - Console.WriteLine($"{nameof(context.LeaseToken)}: {context.LeaseToken}"); - - if (counter == documentCount / 2) - { - Console.WriteLine($"Stopping {nameof(allVersionsAndDeletesProcessorAtomic)}"); - - allVersionsAndDeletesProcessorAtomic.StopAsync(); - - return Task.CompletedTask; - } - - counter++; - - return Task.CompletedTask; - }) + .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: $"processorName", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection> documents, CancellationToken token) => Task.CompletedTask) .WithInstanceName(Guid.NewGuid().ToString()) .WithMaxItems(1) - .WithStartFromBeginning() .WithLeaseContainer(leaseContainer) - .WithChangeFeedMode(ChangeFeedMode.AllVersionsAndDeletes) .WithErrorNotification((leaseToken, error) => { - // an exception should happen here, because it is trying to use the same LatestVersion leaseContainer on an AllVersionsAndDeletes processor. - exception = error.InnerException; - Console.WriteLine(error.ToString()); - return Task.CompletedTask; + Debug.WriteLine("WithErrorNotification"); + Debug.WriteLine(error.ToString()); + + return Task.FromResult(exception); }); - ChangeFeedProcessor allVersionsAndDeletesProcessor = allVersionsAndDeletesProcessorBuilder.Build(); - Interlocked.Exchange(ref allVersionsAndDeletesProcessorAtomic, allVersionsAndDeletesProcessor); + ChangeFeedProcessor processor = allVersionsAndDeletesProcessorBuilder.Build(); + Interlocked.Exchange(ref allVersionsAndDeletesProcessorAtomic, processor); - await allVersionsAndDeletesProcessor.StartAsync(); + await processor.StartAsync(); await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); @@ -389,5 +372,25 @@ private static async Task ArrangeActAssertAllVersionsAndDeletesChangeFeedProcess Assert.Fail(exception.ToString()); } } + + private async Task CreateMonitoredContainer(ChangeFeedMode changeFeedMode) + { + string PartitionKey = "/pk"; + ContainerProperties properties = new ContainerProperties(id: Guid.NewGuid().ToString(), + partitionKeyPath: PartitionKey); + + if (changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes) + { + Debug.WriteLine($"{nameof(properties.ChangeFeedPolicy.FullFidelityRetention)} initialized."); + + properties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5); + } + + ContainerResponse response = await this.database.CreateContainerAsync(properties, + throughput: 10000, + cancellationToken: this.cancellationToken); + + return (ContainerInternal)response; + } } } From ffba02551d2059bb4fe45ca26ede6233794351a0 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 20 Mar 2024 14:30:44 -0400 Subject: [PATCH 09/17] more changes based on discussions --- .../ChangeFeedEstimatorIterator.cs | 3 +- .../ChangeFeedProcessorCore.cs | 40 ++++++++++++------- .../LeaseManagement/DocumentServiceLease.cs | 2 +- ...DocumentServiceLeaseStoreManagerBuilder.cs | 2 +- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs index 20108af23c..4d1fc6fd4a 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs @@ -334,7 +334,8 @@ private async Task InitializeLeaseStoreAsync(ITrace trace, CancellationToken can monitoredContainer: this.monitoredContainer, leaseContainer: this.leaseContainer, leaseContainerPrefix: leasePrefix, - instanceName: ChangeFeedEstimatorIterator.EstimatorDefaultHostName); + instanceName: ChangeFeedEstimatorIterator.EstimatorDefaultHostName, + changeFeedMode: ChangeFeedMode.LatestVersion); this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer; } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index e046c50dd1..5766e9fd28 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -99,7 +99,7 @@ private async Task InitializeAsync() .ConfigureAwait(false); } - await this.ChangeFeedModeSwitchingCheckAsync(monitoredDatabaseAndContainerRid).ConfigureAwait(false); + await this.ChangeFeedModeSwitchingCheckAsync().ConfigureAwait(false); this.partitionManager = this.BuildPartitionManager( containerRid, @@ -113,25 +113,24 @@ private async Task InitializeAsync() /// from the current ChangeFeedMode, a CosmosException is thrown. /// This is based on an issue located at . /// - /// - private async Task ChangeFeedModeSwitchingCheckAsync(string monitoredDatabaseAndContainerRid) + private async Task ChangeFeedModeSwitchingCheckAsync() { IReadOnlyList documentServiceLeases = await this.documentServiceLeaseStoreManager .LeaseContainer .GetAllLeasesAsync() .ConfigureAwait(false); - bool shouldThrowException = false; - // No lease documents. Return. + if (documentServiceLeases.Count == 0) { return; } - DocumentServiceLease documentServiceLease = documentServiceLeases.FirstOrDefault(lease => lease.Id.Contains(monitoredDatabaseAndContainerRid)); + DocumentServiceLease documentServiceLease = documentServiceLeases.FirstOrDefault(); // No lease documents that match the Id. + if (documentServiceLease == default) { return; @@ -140,6 +139,10 @@ private async Task ChangeFeedModeSwitchingCheckAsync(string monitoredDatabaseAnd // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, a CosmosException should thrown. + + bool shouldThrowException = default; + string normalizedProcessorChangeFeedMode = default; + if (string.IsNullOrEmpty(documentServiceLease.Mode)) { if (this.changeFeedLeaseOptions.Mode != ChangeFeedMode.LatestVersion) @@ -147,29 +150,36 @@ private async Task ChangeFeedModeSwitchingCheckAsync(string monitoredDatabaseAnd shouldThrowException = true; } } - - string changeFeedProcessorMode = this.NormalizeChangeFeedProcessorMode(this.changeFeedLeaseOptions.Mode); - - // If the ChangeFeedProcessor mode is not the mode in the lease document, a CosmosException should be thrown. - if (string.Compare(documentServiceLease.Mode, changeFeedProcessorMode, StringComparison.OrdinalIgnoreCase) != 0) + else { - shouldThrowException = true; + // If the ChangeFeedProcessor mode is not the mode in the lease document, a CosmosException should be thrown. + + shouldThrowException = this.VerifyChangeFeedProcessorMode( + changeFeedMode: this.changeFeedLeaseOptions.Mode, + leaseChangeFeedMode: documentServiceLease.Mode, + normalizedProcessorChangeFeedMode: out normalizedProcessorChangeFeedMode); } // If shouldThrowException is true, throw the CosmosException. + if (shouldThrowException) { throw CosmosExceptionFactory.CreateBadRequestException( - message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {changeFeedProcessorMode} is not allowed.", + message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed.", headers: default); } } - private string NormalizeChangeFeedProcessorMode(ChangeFeedMode changeFeedMode) + private bool VerifyChangeFeedProcessorMode( + ChangeFeedMode changeFeedMode, + string leaseChangeFeedMode, + out string normalizedProcessorChangeFeedMode) { - return changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes + normalizedProcessorChangeFeedMode = changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes ? HttpConstants.A_IMHeaderValues.FullFidelityFeed : HttpConstants.A_IMHeaderValues.IncrementalFeed; + + return string.Compare(leaseChangeFeedMode, normalizedProcessorChangeFeedMode, StringComparison.OrdinalIgnoreCase) != 0; } private PartitionManager BuildPartitionManager( diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs index 9c5165cf00..ab9d6beb43 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs @@ -72,7 +72,7 @@ internal abstract class DocumentServiceLease public abstract Dictionary Properties { get; set; } /// - /// Gets or sets the . + /// Gets or sets the ChangeFeedMode. /// public abstract string Mode { get; set; } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs index f0b754579c..b5fb29a095 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs @@ -23,7 +23,7 @@ public static async Task InitializeAsync( ContainerInternal leaseContainer, string leaseContainerPrefix, string instanceName, - ChangeFeedMode changeFeedMode = default) + ChangeFeedMode changeFeedMode) { ContainerProperties containerProperties = await leaseContainer.GetCachedContainerPropertiesAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default); From 14aa99f35cd1a25a6331ed7df53c6c13d9875a0b Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 20 Mar 2024 14:57:43 -0400 Subject: [PATCH 10/17] some refactoring --- .../ChangeFeedProcessorCore.cs | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 5766e9fd28..e1456f0fd6 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -139,26 +139,17 @@ private async Task ChangeFeedModeSwitchingCheckAsync() // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, a CosmosException should thrown. + // If the ChangeFeedProcessor mode is not the mode in the lease document, a CosmosException should be thrown. - bool shouldThrowException = default; - string normalizedProcessorChangeFeedMode = default; - - if (string.IsNullOrEmpty(documentServiceLease.Mode)) - { - if (this.changeFeedLeaseOptions.Mode != ChangeFeedMode.LatestVersion) - { - shouldThrowException = true; - } - } - else - { - // If the ChangeFeedProcessor mode is not the mode in the lease document, a CosmosException should be thrown. - - shouldThrowException = this.VerifyChangeFeedProcessorMode( + bool shouldThrowException = string.IsNullOrEmpty(documentServiceLease.Mode) + ? this.VerifyChangeFeedProcessorMode( + changeFeedMode: ChangeFeedMode.LatestVersion, + leaseChangeFeedMode: documentServiceLease.Mode, + normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode) + : this.VerifyChangeFeedProcessorMode( changeFeedMode: this.changeFeedLeaseOptions.Mode, leaseChangeFeedMode: documentServiceLease.Mode, normalizedProcessorChangeFeedMode: out normalizedProcessorChangeFeedMode); - } // If shouldThrowException is true, throw the CosmosException. From 420385caaaac4ce0fdd2286ee27d12fbea04aea2 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 20 Mar 2024 15:05:10 -0400 Subject: [PATCH 11/17] fixed some tests --- .../DocumentServiceLeaseStoreManagerBuilderTests.cs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs index f6901bb75b..6a9fb11fc2 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs @@ -36,7 +36,8 @@ await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( Mock.Of(), leaseContainerMock.Object, Guid.NewGuid().ToString(), - Guid.NewGuid().ToString()); + Guid.NewGuid().ToString(), + ChangeFeedMode.LatestVersion); } [TestMethod] @@ -59,7 +60,8 @@ await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( Mock.Of(), leaseContainerMock.Object, Guid.NewGuid().ToString(), - Guid.NewGuid().ToString()); + Guid.NewGuid().ToString(), + ChangeFeedMode.LatestVersion); } [TestMethod] @@ -82,7 +84,8 @@ await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync( Mock.Of(), leaseContainerMock.Object, Guid.NewGuid().ToString(), - Guid.NewGuid().ToString()); + Guid.NewGuid().ToString(), + ChangeFeedMode.LatestVersion); } [TestMethod] @@ -105,7 +108,8 @@ await Assert.ThrowsExceptionAsync(() => DocumentServiceLeaseS Mock.Of(), leaseContainerMock.Object, Guid.NewGuid().ToString(), - Guid.NewGuid().ToString())); + Guid.NewGuid().ToString(), + ChangeFeedMode.LatestVersion)); } } } From d718befa3002674a7e099165ec4923dd183ec8d5 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 20 Mar 2024 15:24:32 -0400 Subject: [PATCH 12/17] more changes based on recommendations --- .../ChangeFeedProcessor/ChangeFeedProcessorCore.cs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index e1456f0fd6..71686c1a7b 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -6,9 +6,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; using System.Collections.Generic; - using System.Linq; using System.Threading.Tasks; - using global::Azure; using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement; @@ -127,14 +125,7 @@ private async Task ChangeFeedModeSwitchingCheckAsync() return; } - DocumentServiceLease documentServiceLease = documentServiceLeases.FirstOrDefault(); - - // No lease documents that match the Id. - - if (documentServiceLease == default) - { - return; - } + DocumentServiceLease documentServiceLease = documentServiceLeases[0]; // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are From 3a29c2302783ce0868184e55184f2ae4e28e4940 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 20 Mar 2024 15:36:16 -0400 Subject: [PATCH 13/17] another refactor --- .../ChangeFeedProcessorCore.cs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 71686c1a7b..3f9eb21a39 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; using System.Collections.Generic; + using System.ComponentModel; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; @@ -132,15 +133,13 @@ private async Task ChangeFeedModeSwitchingCheckAsync() // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, a CosmosException should thrown. // If the ChangeFeedProcessor mode is not the mode in the lease document, a CosmosException should be thrown. - bool shouldThrowException = string.IsNullOrEmpty(documentServiceLease.Mode) - ? this.VerifyChangeFeedProcessorMode( - changeFeedMode: ChangeFeedMode.LatestVersion, - leaseChangeFeedMode: documentServiceLease.Mode, - normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode) - : this.VerifyChangeFeedProcessorMode( - changeFeedMode: this.changeFeedLeaseOptions.Mode, - leaseChangeFeedMode: documentServiceLease.Mode, - normalizedProcessorChangeFeedMode: out normalizedProcessorChangeFeedMode); + bool shouldThrowException = this.VerifyChangeFeedProcessorMode( + changeFeedMode: + string.IsNullOrEmpty(documentServiceLease.Mode) + ? ChangeFeedMode.LatestVersion + : this.changeFeedLeaseOptions.Mode, + leaseChangeFeedMode: documentServiceLease.Mode, + normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode); // If shouldThrowException is true, throw the CosmosException. From e204c1bfed60cac834acdde78bd3b6263402c388 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 20 Mar 2024 15:48:16 -0400 Subject: [PATCH 14/17] change from CosmosException to ArgumentException --- .../ChangeFeedProcessor/ChangeFeedProcessorCore.cs | 6 +----- ...rocessorBuilderWithAllVersionsAndDeletesTests.cs | 13 ++++--------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 3f9eb21a39..70b8d90098 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; using System.Collections.Generic; - using System.ComponentModel; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; @@ -15,7 +14,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; using Microsoft.Azure.Cosmos.ChangeFeed.Utils; using Microsoft.Azure.Cosmos.Core.Trace; - using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; @@ -145,9 +143,7 @@ private async Task ChangeFeedModeSwitchingCheckAsync() if (shouldThrowException) { - throw CosmosExceptionFactory.CreateBadRequestException( - message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed.", - headers: default); + throw new ArgumentException(message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed."); } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs index 5e5a62cd46..40ba3bac1c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs @@ -8,7 +8,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed using System.Collections.Generic; using System.Diagnostics; using System.Linq; - using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -170,7 +169,7 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests allDocsProcessed: allDocsProcessed, withStartFromBeginning: withStartFromBeginning); - CosmosException exception = await Assert.ThrowsExceptionAsync( + ArgumentException exception = await Assert.ThrowsExceptionAsync( () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync( monitoredContainer: monitoredContainer, @@ -179,9 +178,7 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests Debug.WriteLine(exception.ToString()); - Assert.AreEqual(expected: HttpStatusCode.BadRequest, actual: exception.StatusCode); - Assert.AreEqual(expected: default, actual: exception.SubStatusCode); - Assert.AreEqual(expected: "Switching ChangeFeedMode Incremental Feed to Full-Fidelity Feed is not allowed.", actual: exception.ResponseBody); + Assert.AreEqual(expected: "Switching ChangeFeedMode Incremental Feed to Full-Fidelity Feed is not allowed.", actual: exception.Message); Debug.WriteLine("Assertions completed."); } @@ -206,7 +203,7 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests leaseContainer: this.LeaseContainer, allDocsProcessed: allDocsProcessed); - CosmosException exception = await Assert.ThrowsExceptionAsync( + ArgumentException exception = await Assert.ThrowsExceptionAsync( () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests .BuildChangeFeedProcessorWithLatestVersionAsync( monitoredContainer: monitoredContainer, @@ -216,9 +213,7 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests Debug.WriteLine(exception.ToString()); - Assert.AreEqual(expected: HttpStatusCode.BadRequest, actual: exception.StatusCode); - Assert.AreEqual(expected: default, actual: exception.SubStatusCode); - Assert.AreEqual(expected: "Switching ChangeFeedMode Full-Fidelity Feed to Incremental Feed is not allowed.", actual: exception.ResponseBody); + Assert.AreEqual(expected: "Switching ChangeFeedMode Full-Fidelity Feed to Incremental Feed is not allowed.", actual: exception.Message); Debug.WriteLine("Assertions completed."); } From bf6c2f5193b688b2fb889054fdbef7ed62c8099c Mon Sep 17 00:00:00 2001 From: philipthomas Date: Wed, 20 Mar 2024 15:51:13 -0400 Subject: [PATCH 15/17] removed CosmosException from comments --- .../ChangeFeedProcessorCore.cs | 8 ++++---- ...essorBuilderWithAllVersionsAndDeletesTests.cs | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 70b8d90098..3255aec9d6 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -107,7 +107,7 @@ private async Task InitializeAsync() /// /// If the lease container's lease document is found, this method checks for lease /// document's ChangeFeedMode and if the new ChangeFeedMode is different - /// from the current ChangeFeedMode, a CosmosException is thrown. + /// from the current ChangeFeedMode, an exception is thrown. /// This is based on an issue located at . /// private async Task ChangeFeedModeSwitchingCheckAsync() @@ -128,8 +128,8 @@ private async Task ChangeFeedModeSwitchingCheckAsync() // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are - // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, a CosmosException should thrown. - // If the ChangeFeedProcessor mode is not the mode in the lease document, a CosmosException should be thrown. + // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, an exception should thrown. + // If the ChangeFeedProcessor mode is not the mode in the lease document, an exception should be thrown. bool shouldThrowException = this.VerifyChangeFeedProcessorMode( changeFeedMode: @@ -139,7 +139,7 @@ private async Task ChangeFeedModeSwitchingCheckAsync() leaseChangeFeedMode: documentServiceLease.Mode, normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode); - // If shouldThrowException is true, throw the CosmosException. + // If shouldThrowException is true, throw the exception. if (shouldThrowException) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs index 40ba3bac1c..33f5d9d0c4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs @@ -154,10 +154,10 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync() [TestMethod] [Owner("philipthomas-MSFT")] [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor, switches from LatestVersion to AllVersionsAndDeletes," + - "a CosmosException is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] + "an exception is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] [DataRow(false)] [DataRow(true)] - public async Task WhenLatestVersionSwitchToAllVersionsAndDeletesExpectsACosmosExceptionTestAsync(bool withStartFromBeginning) + public async Task WhenLatestVersionSwitchToAllVersionsAndDeletesExpectsAexceptionTestAsync(bool withStartFromBeginning) { ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.LatestVersion); ManualResetEvent allDocsProcessed = new(false); @@ -189,10 +189,10 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests [TestMethod] [Owner("philipthomas-MSFT")] [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor, switches from AllVersionsAndDeletes to LatestVersion," + - "a CosmosException is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] + "an exception is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] [DataRow(false)] [DataRow(true)] - public async Task WhenAllVersionsAndDeletesSwitchToLatestVersionExpectsACosmosExceptionTestAsync(bool withStartFromBeginning) + public async Task WhenAllVersionsAndDeletesSwitchToLatestVersionExpectsAexceptionTestAsync(bool withStartFromBeginning) { ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes); ManualResetEvent allDocsProcessed = new(false); @@ -224,8 +224,8 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests [TestMethod] [Owner("philipthomas-MSFT")] [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor does not switch, AllVersionsAndDeletes," + - "no CosmosException is expected.")] - public async Task WhenNoSwitchAllVersionsAndDeletesFDoesNotExpectACosmosExceptionTestAsync() + "no exception is expected.")] + public async Task WhenNoSwitchAllVersionsAndDeletesFDoesNotExpectAexceptionTestAsync() { ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes); ManualResetEvent allDocsProcessed = new(false); @@ -258,10 +258,10 @@ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests [TestMethod] [Owner("philipthomas-MSFT")] [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor does not switch, LatestVersion," + - "no CosmosException is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] + "no exception is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")] [DataRow(false)] [DataRow(true)] - public async Task WhenNoSwitchLatestVersionDoesNotExpectACosmosExceptionTestAsync(bool withStartFromBeginning) + public async Task WhenNoSwitchLatestVersionDoesNotExpectAexceptionTestAsync(bool withStartFromBeginning) { ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.LatestVersion); ManualResetEvent allDocsProcessed = new(false); From 6fe813d961459c5cab4e99ad59e93fd82497c017 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Thu, 21 Mar 2024 13:13:10 -0400 Subject: [PATCH 16/17] fixin tests to account for extra GetAllLeasesAsync call --- .../ChangeFeed/ChangeFeedProcessorCoreTests.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs index c5e74fbfd9..f2684576c5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; - using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; using Microsoft.Azure.Cosmos.Tests; using Microsoft.Azure.Cosmos.Tracing; @@ -98,6 +97,7 @@ public async Task StartAsync() Mock leaseContainer = new Mock(); leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty())); + leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List()); Mock leaseStoreManager = new Mock(); leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object); @@ -148,6 +148,7 @@ public async Task ObserverIsCreated() Mock leaseContainer = new Mock(); leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(ownedLeases)); + leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List()); Mock leaseStoreManager = new Mock(); leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object); @@ -211,7 +212,7 @@ public async Task StopAsync() await processor.StopAsync(); Mock.Get(leaseContainer.Object) - .Verify(store => store.GetAllLeasesAsync(), Times.Once); + .Verify(store => store.GetAllLeasesAsync(), Times.Exactly(2)); } From 2a03e7369619ba058bce47ce2e82a81850d29ba7 Mon Sep 17 00:00:00 2001 From: philipthomas Date: Thu, 21 Mar 2024 15:14:47 -0400 Subject: [PATCH 17/17] some recommendation changes --- .../ChangeFeedProcessorCore.cs | 64 +++---------------- .../DocumentServiceLeaseManager.cs | 54 ++++++++++++++++ 2 files changed, 62 insertions(+), 56 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 3255aec9d6..db0ecacb32 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -96,7 +96,14 @@ private async Task InitializeAsync() .ConfigureAwait(false); } - await this.ChangeFeedModeSwitchingCheckAsync().ConfigureAwait(false); + this.documentServiceLeaseStoreManager + .LeaseManager + .ChangeFeedModeSwitchingCheck( + documentServiceLeases: await this.documentServiceLeaseStoreManager + .LeaseContainer + .GetAllLeasesAsync() + .ConfigureAwait(false), + changeFeedLeaseOptionsMode: this.changeFeedLeaseOptions.Mode); this.partitionManager = this.BuildPartitionManager( containerRid, @@ -104,61 +111,6 @@ private async Task InitializeAsync() this.initialized = true; } - /// - /// If the lease container's lease document is found, this method checks for lease - /// document's ChangeFeedMode and if the new ChangeFeedMode is different - /// from the current ChangeFeedMode, an exception is thrown. - /// This is based on an issue located at . - /// - private async Task ChangeFeedModeSwitchingCheckAsync() - { - IReadOnlyList documentServiceLeases = await this.documentServiceLeaseStoreManager - .LeaseContainer - .GetAllLeasesAsync() - .ConfigureAwait(false); - - // No lease documents. Return. - - if (documentServiceLeases.Count == 0) - { - return; - } - - DocumentServiceLease documentServiceLease = documentServiceLeases[0]; - - // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because - // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are - // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, an exception should thrown. - // If the ChangeFeedProcessor mode is not the mode in the lease document, an exception should be thrown. - - bool shouldThrowException = this.VerifyChangeFeedProcessorMode( - changeFeedMode: - string.IsNullOrEmpty(documentServiceLease.Mode) - ? ChangeFeedMode.LatestVersion - : this.changeFeedLeaseOptions.Mode, - leaseChangeFeedMode: documentServiceLease.Mode, - normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode); - - // If shouldThrowException is true, throw the exception. - - if (shouldThrowException) - { - throw new ArgumentException(message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed."); - } - } - - private bool VerifyChangeFeedProcessorMode( - ChangeFeedMode changeFeedMode, - string leaseChangeFeedMode, - out string normalizedProcessorChangeFeedMode) - { - normalizedProcessorChangeFeedMode = changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes - ? HttpConstants.A_IMHeaderValues.FullFidelityFeed - : HttpConstants.A_IMHeaderValues.IncrementalFeed; - - return string.Compare(leaseChangeFeedMode, normalizedProcessorChangeFeedMode, StringComparison.OrdinalIgnoreCase) != 0; - } - private PartitionManager BuildPartitionManager( string containerRid, Routing.PartitionKeyRangeCache partitionKeyRangeCache) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs index 27dafd2162..18b0e790ba 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs @@ -4,6 +4,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { + using System; + using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions; using Microsoft.Azure.Documents; @@ -63,5 +65,57 @@ internal abstract class DocumentServiceLeaseManager /// Updated lease. /// Thrown if other host acquired the lease public abstract Task UpdatePropertiesAsync(DocumentServiceLease leaseToUpdatePropertiesFrom); + + /// + /// If the lease container's lease document is found, this method checks for lease + /// document's ChangeFeedMode and if the new ChangeFeedMode is different + /// from the current ChangeFeedMode, an exception is thrown. + /// This is based on an issue located at . + /// + public void ChangeFeedModeSwitchingCheck( + IReadOnlyList documentServiceLeases, + ChangeFeedMode changeFeedLeaseOptionsMode) + { + // No lease documents. Return. + + if (documentServiceLeases.Count == 0) + { + return; + } + + DocumentServiceLease documentServiceLease = documentServiceLeases[0]; + + // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because + // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are + // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, an exception should thrown. + // If the ChangeFeedProcessor mode is not the mode in the lease document, an exception should be thrown. + + bool shouldThrowException = this.VerifyChangeFeedProcessorMode( + changeFeedMode: + string.IsNullOrEmpty(documentServiceLease.Mode) + ? ChangeFeedMode.LatestVersion + : changeFeedLeaseOptionsMode, + leaseChangeFeedMode: documentServiceLease.Mode, + normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode); + + // If shouldThrowException is true, throw the exception. + + if (shouldThrowException) + { + throw new ArgumentException(message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed."); + } + } + + private bool VerifyChangeFeedProcessorMode( + ChangeFeedMode changeFeedMode, + string leaseChangeFeedMode, + out string normalizedProcessorChangeFeedMode) + { + normalizedProcessorChangeFeedMode = changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes + ? HttpConstants.A_IMHeaderValues.FullFidelityFeed + : HttpConstants.A_IMHeaderValues.IncrementalFeed; + + return string.Compare(leaseChangeFeedMode, normalizedProcessorChangeFeedMode, StringComparison.OrdinalIgnoreCase) != 0; + } } }