diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs
index 20108af23c..4d1fc6fd4a 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorIterator.cs
@@ -334,7 +334,8 @@ private async Task InitializeLeaseStoreAsync(ITrace trace, CancellationToken can
monitoredContainer: this.monitoredContainer,
leaseContainer: this.leaseContainer,
leaseContainerPrefix: leasePrefix,
- instanceName: ChangeFeedEstimatorIterator.EstimatorDefaultHostName);
+ instanceName: ChangeFeedEstimatorIterator.EstimatorDefaultHostName,
+ changeFeedMode: ChangeFeedMode.LatestVersion);
this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer;
}
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs
index 8f8a8b9d81..e3d4468d0f 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedEstimatorRunner.cs
@@ -139,11 +139,14 @@ private async Task InitializeLeaseStoreAsync()
{
string monitoredContainerAndDatabaseRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(default);
string leasePrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredContainerAndDatabaseRid);
- DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(
- monitoredContainer: this.monitoredContainer,
- leaseContainer: this.leaseContainer,
- leaseContainerPrefix: leasePrefix,
- instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName);
+ DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder
+ .InitializeAsync(
+ monitoredContainer: this.monitoredContainer,
+ leaseContainer: this.leaseContainer,
+ leaseContainerPrefix: leasePrefix,
+ instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName,
+ changeFeedMode: ChangeFeedMode.LatestVersion)
+ .ConfigureAwait(false);
this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer;
}
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs
index df3db6f2bf..49bfa9d816 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs
@@ -70,13 +70,15 @@ public ChangeFeedProcessorBuilder WithInstanceName(string instanceName)
}
///
- /// Sets the mode for the change freed processor.
+ /// Sets the mode for the change feed processor.
///
///
/// The instance of to use.
internal ChangeFeedProcessorBuilder WithChangeFeedMode(ChangeFeedMode changeFeedMode)
{
this.changeFeedProcessorOptions.Mode = changeFeedMode;
+ this.changeFeedLeaseOptions.Mode = changeFeedMode;
+
return this;
}
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs
index 94c3e09dac..db0ecacb32 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs
@@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
+ using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping;
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
@@ -14,6 +15,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;
+ using Microsoft.Azure.Documents;
internal sealed class ChangeFeedProcessorCore : ChangeFeedProcessor
{
@@ -75,19 +77,36 @@ public override async Task StopAsync()
private async Task InitializeAsync()
{
string containerRid = await this.monitoredContainer.GetCachedRIDAsync(
- forceRefresh: false,
- NoOpTrace.Singleton,
+ forceRefresh: false,
+ NoOpTrace.Singleton,
default);
+
string monitoredDatabaseAndContainerRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync();
string leaseContainerPrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredDatabaseAndContainerRid);
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.monitoredContainer.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton);
if (this.documentServiceLeaseStoreManager == null)
{
- this.documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(this.monitoredContainer, this.leaseContainer, leaseContainerPrefix, this.instanceName).ConfigureAwait(false);
+ this.documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder
+ .InitializeAsync(
+ this.monitoredContainer,
+ this.leaseContainer,
+ leaseContainerPrefix,
+ this.instanceName,
+ changeFeedMode: this.changeFeedProcessorOptions.Mode)
+ .ConfigureAwait(false);
}
+ this.documentServiceLeaseStoreManager
+ .LeaseManager
+ .ChangeFeedModeSwitchingCheck(
+ documentServiceLeases: await this.documentServiceLeaseStoreManager
+ .LeaseContainer
+ .GetAllLeasesAsync()
+ .ConfigureAwait(false),
+ changeFeedLeaseOptionsMode: this.changeFeedLeaseOptions.Mode);
+
this.partitionManager = this.BuildPartitionManager(
- containerRid,
+ containerRid,
partitionKeyRangeCache);
this.initialized = true;
}
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs
index e72e00efb9..dac334c038 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Configuration/ChangeFeedLeaseOptions.cs
@@ -44,5 +44,10 @@ public ChangeFeedLeaseOptions()
/// instances pointing at the same feed while using the same auxiliary collection.
///
public string LeasePrefix { get; set; }
+
+ ///
+ /// Gets or sets the .
+ ///
+ public ChangeFeedMode Mode { get; set; }
}
}
\ No newline at end of file
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs
index 95200f48ee..ab9d6beb43 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLease.cs
@@ -70,5 +70,10 @@ internal abstract class DocumentServiceLease
/// Gets or sets custom lease properties which can be managed from .
///
public abstract Dictionary Properties { get; set; }
+
+ ///
+ /// Gets or sets the ChangeFeedMode.
+ ///
+ public abstract string Mode { get; set; }
}
}
\ No newline at end of file
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs
index 65e4e6e725..ed6b050278 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCore.cs
@@ -95,6 +95,9 @@ public override DateTime Timestamp
[JsonProperty("_ts")]
private long TS { get; set; }
+ [JsonProperty("Mode", NullValueHandling = NullValueHandling.Ignore)]
+ public override string Mode { get; set; }
+
public override string ToString()
{
return string.Format(
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs
index 55c4105c3a..ab46ebba29 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseCoreEpk.cs
@@ -66,6 +66,9 @@ public override DateTime Timestamp
[JsonProperty("properties")]
public override Dictionary Properties { get; set; } = new Dictionary();
+ [JsonProperty("Mode", NullValueHandling = NullValueHandling.Ignore)]
+ public override string Mode { get; set; }
+
[JsonProperty("timestamp")]
private DateTime? ExplicitTimestamp { get; set; }
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs
index 27dafd2162..18b0e790ba 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManager.cs
@@ -4,6 +4,8 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
+ using System;
+ using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions;
using Microsoft.Azure.Documents;
@@ -63,5 +65,57 @@ internal abstract class DocumentServiceLeaseManager
/// Updated lease.
/// Thrown if other host acquired the lease
public abstract Task UpdatePropertiesAsync(DocumentServiceLease leaseToUpdatePropertiesFrom);
+
+ ///
+ /// If the lease container's lease document is found, this method checks for lease
+ /// document's ChangeFeedMode and if the new ChangeFeedMode is different
+ /// from the current ChangeFeedMode, an exception is thrown.
+ /// This is based on an issue located at .
+ ///
+ public void ChangeFeedModeSwitchingCheck(
+ IReadOnlyList documentServiceLeases,
+ ChangeFeedMode changeFeedLeaseOptionsMode)
+ {
+ // No lease documents. Return.
+
+ if (documentServiceLeases.Count == 0)
+ {
+ return;
+ }
+
+ DocumentServiceLease documentServiceLease = documentServiceLeases[0];
+
+ // Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because
+ // AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are
+ // AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, an exception should thrown.
+ // If the ChangeFeedProcessor mode is not the mode in the lease document, an exception should be thrown.
+
+ bool shouldThrowException = this.VerifyChangeFeedProcessorMode(
+ changeFeedMode:
+ string.IsNullOrEmpty(documentServiceLease.Mode)
+ ? ChangeFeedMode.LatestVersion
+ : changeFeedLeaseOptionsMode,
+ leaseChangeFeedMode: documentServiceLease.Mode,
+ normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode);
+
+ // If shouldThrowException is true, throw the exception.
+
+ if (shouldThrowException)
+ {
+ throw new ArgumentException(message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed.");
+ }
+ }
+
+ private bool VerifyChangeFeedProcessorMode(
+ ChangeFeedMode changeFeedMode,
+ string leaseChangeFeedMode,
+ out string normalizedProcessorChangeFeedMode)
+ {
+ normalizedProcessorChangeFeedMode = changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes
+ ? HttpConstants.A_IMHeaderValues.FullFidelityFeed
+ : HttpConstants.A_IMHeaderValues.IncrementalFeed;
+
+ return string.Compare(leaseChangeFeedMode, normalizedProcessorChangeFeedMode, StringComparison.OrdinalIgnoreCase) != 0;
+ }
}
}
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs
index d5a75abb47..0f343a7732 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs
@@ -124,7 +124,8 @@ public override Task CreateLeaseIfNotExistAsync(
LeaseId = leaseDocId,
LeaseToken = leaseToken,
ContinuationToken = continuationToken,
- FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange())
+ FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange()),
+ Mode = this.GetChangeFeedMode()
};
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
@@ -148,7 +149,8 @@ public override Task CreateLeaseIfNotExistAsync(
LeaseId = leaseDocId,
LeaseToken = leaseToken,
ContinuationToken = continuationToken,
- FeedRange = feedRange
+ FeedRange = feedRange,
+ Mode = this.GetChangeFeedMode()
};
this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
@@ -156,6 +158,13 @@ public override Task CreateLeaseIfNotExistAsync(
return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
}
+ private string GetChangeFeedMode()
+ {
+ return this.options.Mode == ChangeFeedMode.AllVersionsAndDeletes
+ ? HttpConstants.A_IMHeaderValues.FullFidelityFeed
+ : HttpConstants.A_IMHeaderValues.IncrementalFeed;
+ }
+
public override async Task ReleaseAsync(DocumentServiceLease lease)
{
if (lease == null)
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs
index 70358bca22..b5fb29a095 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerBuilder.cs
@@ -22,7 +22,8 @@ public static async Task InitializeAsync(
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer,
string leaseContainerPrefix,
- string instanceName)
+ string instanceName,
+ ChangeFeedMode changeFeedMode)
{
ContainerProperties containerProperties = await leaseContainer.GetCachedContainerPropertiesAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default);
@@ -58,7 +59,8 @@ public static async Task InitializeAsync(
.WithMonitoredContainer(monitoredContainer)
.WithLeaseContainer(leaseContainer)
.WithRequestOptionsFactory(requestOptionsFactory)
- .WithHostName(instanceName);
+ .WithHostName(instanceName)
+ .WithChangeFeedMode(changeFeedMode);
return leaseStoreManagerBuilder.Build();
}
@@ -70,7 +72,7 @@ public static async Task InitializeAsync(
private DocumentServiceLeaseStoreManagerBuilder WithMonitoredContainer(ContainerInternal monitoredContainer)
{
- this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(leaseContainer));
+ this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer));
return this;
}
@@ -98,6 +100,12 @@ private DocumentServiceLeaseStoreManagerBuilder WithHostName(string hostName)
return this;
}
+ private DocumentServiceLeaseStoreManagerBuilder WithChangeFeedMode(ChangeFeedMode changeFeedMode)
+ {
+ this.options.Mode = changeFeedMode ?? throw new ArgumentNullException(nameof(changeFeedMode));
+ return this;
+ }
+
private DocumentServiceLeaseStoreManager Build()
{
if (this.monitoredContainer == null)
diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerOptions.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerOptions.cs
index 146c7a32a3..942db791db 100644
--- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerOptions.cs
+++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerOptions.cs
@@ -16,5 +16,7 @@ internal string GetPartitionLeasePrefix()
{
return this.ContainerNamePrefix + PartitionLeasePrefixSeparator;
}
+
+ internal ChangeFeedMode Mode { get; set; }
}
}
diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
index 21d660b744..5936f5e05d 100644
--- a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
+++ b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
@@ -1681,7 +1681,6 @@ public abstract Task DeleteAllItemsByPartitionKeyStreamAsync(
public abstract Task> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);
-
#endif
}
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs
index 9292e4d1e7..33f5d9d0c4 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.cs
@@ -6,31 +6,20 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
{
using System;
using System.Collections.Generic;
+ using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
[TestClass]
- [TestCategory("ChangeFeedProcessor with AllVersionsAndDeletes")]
+ [TestCategory("ChangeFeedProcessor")]
public class GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests : BaseChangeFeedClientHelper
{
- private ContainerInternal Container;
-
[TestInitialize]
public async Task TestInitialize()
{
await base.ChangeFeedTestInit();
-
- string PartitionKey = "/pk";
- ContainerProperties properties = new ContainerProperties(id: Guid.NewGuid().ToString(),
- partitionKeyPath: PartitionKey);
- properties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5);
-
- ContainerResponse response = await this.database.CreateContainerAsync(properties,
- throughput: 10000,
- cancellationToken: this.cancellationToken);
- this.Container = (ContainerInternal)response;
}
[TestCleanup]
@@ -42,13 +31,14 @@ public async Task Cleanup()
[TestMethod]
[Owner("philipthomas-MSFT")]
[Description("Scenario: When a document is created, then updated, and finally deleted, there should be 3 changes that will appear for that " +
- "document when using ChangeFeedProcessor with AllVersionsAndDeletes.")]
+ "document when using ChangeFeedProcessor with AllVersionsAndDeletes set as the ChangeFeedMode.")]
public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
{
+ ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);
ManualResetEvent allDocsProcessed = new ManualResetEvent(false);
Exception exception = default;
- ChangeFeedProcessor processor = this.Container
+ ChangeFeedProcessor processor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection> docs, CancellationToken token) =>
{
string id = default;
@@ -117,7 +107,7 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
Assert.IsTrue(condition: createChange.Metadata.Lsn < replaceChange.Metadata.Lsn, message: "The create operation must happen before the replace operation.");
Assert.IsTrue(condition: createChange.Metadata.Lsn < replaceChange.Metadata.Lsn, message: "The replace operation must happen before the delete operation.");
- Console.WriteLine("Assertions completed.");
+ Debug.WriteLine("Assertions completed.");
return Task.CompletedTask;
})
@@ -126,7 +116,9 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
.WithErrorNotification((leaseToken, error) =>
{
exception = error.InnerException;
- Console.WriteLine(error.ToString());
+
+ Debug.WriteLine("WithErrorNotification");
+ Debug.WriteLine(error.ToString());
return Task.CompletedTask;
})
@@ -138,13 +130,13 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
await processor.StartAsync();
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);
- await this.Container.CreateItemAsync(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
+ await monitoredContainer.CreateItemAsync(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
await Task.Delay(1000);
-
- await this.Container.UpsertItemAsync(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
+
+ await monitoredContainer.UpsertItemAsync(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
await Task.Delay(1000);
- await this.Container.DeleteItemAsync(id: "1", partitionKey: new PartitionKey("1"));
+ await monitoredContainer.DeleteItemAsync(id: "1", partitionKey: new PartitionKey("1"));
bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);
@@ -155,5 +147,245 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
Assert.Fail(exception.ToString());
}
}
+
+ ///
+ /// This is based on an issue located at .
+ ///
+ [TestMethod]
+ [Owner("philipthomas-MSFT")]
+ [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor, switches from LatestVersion to AllVersionsAndDeletes," +
+ "an exception is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")]
+ [DataRow(false)]
+ [DataRow(true)]
+ public async Task WhenLatestVersionSwitchToAllVersionsAndDeletesExpectsAexceptionTestAsync(bool withStartFromBeginning)
+ {
+ ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.LatestVersion);
+ ManualResetEvent allDocsProcessed = new(false);
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithLatestVersionAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed,
+ withStartFromBeginning: withStartFromBeginning);
+
+ ArgumentException exception = await Assert.ThrowsExceptionAsync(
+ () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed));
+
+ Debug.WriteLine(exception.ToString());
+
+ Assert.AreEqual(expected: "Switching ChangeFeedMode Incremental Feed to Full-Fidelity Feed is not allowed.", actual: exception.Message);
+
+ Debug.WriteLine("Assertions completed.");
+ }
+
+ ///
+ /// This is based on an issue located at .
+ ///
+ [TestMethod]
+ [Owner("philipthomas-MSFT")]
+ [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor, switches from AllVersionsAndDeletes to LatestVersion," +
+ "an exception is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")]
+ [DataRow(false)]
+ [DataRow(true)]
+ public async Task WhenAllVersionsAndDeletesSwitchToLatestVersionExpectsAexceptionTestAsync(bool withStartFromBeginning)
+ {
+ ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);
+ ManualResetEvent allDocsProcessed = new(false);
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed);
+
+ ArgumentException exception = await Assert.ThrowsExceptionAsync(
+ () => GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithLatestVersionAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed,
+ withStartFromBeginning: withStartFromBeginning));
+
+ Debug.WriteLine(exception.ToString());
+
+ Assert.AreEqual(expected: "Switching ChangeFeedMode Full-Fidelity Feed to Incremental Feed is not allowed.", actual: exception.Message);
+
+ Debug.WriteLine("Assertions completed.");
+ }
+
+ ///
+ /// This is based on an issue located at .
+ ///
+ [TestMethod]
+ [Owner("philipthomas-MSFT")]
+ [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor does not switch, AllVersionsAndDeletes," +
+ "no exception is expected.")]
+ public async Task WhenNoSwitchAllVersionsAndDeletesFDoesNotExpectAexceptionTestAsync()
+ {
+ ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);
+ ManualResetEvent allDocsProcessed = new(false);
+
+ try
+ {
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed);
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed);
+
+ Debug.WriteLine("No exceptions occurred.");
+ }
+ catch
+ {
+ Assert.Fail("An exception occurred when one was not expceted."); ;
+ }
+ }
+
+ ///
+ /// This is based on an issue located at .
+ ///
+ [TestMethod]
+ [Owner("philipthomas-MSFT")]
+ [Description("Scenario: When ChangeFeedMode on ChangeFeedProcessor does not switch, LatestVersion," +
+ "no exception is expected. LatestVersion's WithStartFromBeginning can be set, or not set.")]
+ [DataRow(false)]
+ [DataRow(true)]
+ public async Task WhenNoSwitchLatestVersionDoesNotExpectAexceptionTestAsync(bool withStartFromBeginning)
+ {
+ ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.LatestVersion);
+ ManualResetEvent allDocsProcessed = new(false);
+
+ try
+ {
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithLatestVersionAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed,
+ withStartFromBeginning: withStartFromBeginning);
+
+ await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests
+ .BuildChangeFeedProcessorWithLatestVersionAsync(
+ monitoredContainer: monitoredContainer,
+ leaseContainer: this.LeaseContainer,
+ allDocsProcessed: allDocsProcessed,
+ withStartFromBeginning: withStartFromBeginning);
+
+ Debug.WriteLine("No exceptions occurred.");
+ }
+ catch
+ {
+ Assert.Fail("An exception occurred when one was not expceted."); ;
+ }
+ }
+
+ private static async Task BuildChangeFeedProcessorWithLatestVersionAsync(
+ ContainerInternal monitoredContainer,
+ Container leaseContainer,
+ ManualResetEvent allDocsProcessed,
+ bool withStartFromBeginning)
+ {
+ Exception exception = default;
+ ChangeFeedProcessor latestVersionProcessorAtomic = null;
+
+ ChangeFeedProcessorBuilder processorBuilder = monitoredContainer
+ .GetChangeFeedProcessorBuilder(processorName: $"processorName", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection documents, CancellationToken token) => Task.CompletedTask)
+ .WithInstanceName(Guid.NewGuid().ToString())
+ .WithLeaseContainer(leaseContainer)
+ .WithErrorNotification((leaseToken, error) =>
+ {
+ exception = error.InnerException;
+
+ Debug.WriteLine("WithErrorNotification");
+ Debug.WriteLine(error.ToString());
+
+ return Task.CompletedTask;
+ });
+
+ if (withStartFromBeginning)
+ {
+ processorBuilder.WithStartFromBeginning();
+ }
+
+
+ ChangeFeedProcessor processor = processorBuilder.Build();
+ Interlocked.Exchange(ref latestVersionProcessorAtomic, processor);
+
+ await processor.StartAsync();
+ await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);
+ bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);
+
+ if (exception != default)
+ {
+ Assert.Fail(exception.ToString());
+ }
+ }
+
+ private static async Task BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync(
+ ContainerInternal monitoredContainer,
+ Container leaseContainer,
+ ManualResetEvent allDocsProcessed)
+ {
+ Exception exception = default;
+ ChangeFeedProcessor allVersionsAndDeletesProcessorAtomic = null;
+
+ ChangeFeedProcessorBuilder allVersionsAndDeletesProcessorBuilder = monitoredContainer
+ .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: $"processorName", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection> documents, CancellationToken token) => Task.CompletedTask)
+ .WithInstanceName(Guid.NewGuid().ToString())
+ .WithMaxItems(1)
+ .WithLeaseContainer(leaseContainer)
+ .WithErrorNotification((leaseToken, error) =>
+ {
+ exception = error.InnerException;
+
+ Debug.WriteLine("WithErrorNotification");
+ Debug.WriteLine(error.ToString());
+
+ return Task.FromResult(exception);
+ });
+
+ ChangeFeedProcessor processor = allVersionsAndDeletesProcessorBuilder.Build();
+ Interlocked.Exchange(ref allVersionsAndDeletesProcessorAtomic, processor);
+
+ await processor.StartAsync();
+ await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);
+ bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);
+
+ if (exception != default)
+ {
+ Assert.Fail(exception.ToString());
+ }
+ }
+
+ private async Task CreateMonitoredContainer(ChangeFeedMode changeFeedMode)
+ {
+ string PartitionKey = "/pk";
+ ContainerProperties properties = new ContainerProperties(id: Guid.NewGuid().ToString(),
+ partitionKeyPath: PartitionKey);
+
+ if (changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes)
+ {
+ Debug.WriteLine($"{nameof(properties.ChangeFeedPolicy.FullFidelityRetention)} initialized.");
+
+ properties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5);
+ }
+
+ ContainerResponse response = await this.database.CreateContainerAsync(properties,
+ throughput: 10000,
+ cancellationToken: this.cancellationToken);
+
+ return (ContainerInternal)response;
+ }
}
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs
index c5e74fbfd9..f2684576c5 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs
@@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
- using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
@@ -98,6 +97,7 @@ public async Task StartAsync()
Mock leaseContainer = new Mock();
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty()));
+ leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List());
Mock leaseStoreManager = new Mock();
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
@@ -148,6 +148,7 @@ public async Task ObserverIsCreated()
Mock leaseContainer = new Mock();
leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(ownedLeases));
+ leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List());
Mock leaseStoreManager = new Mock();
leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object);
@@ -211,7 +212,7 @@ public async Task StopAsync()
await processor.StopAsync();
Mock.Get(leaseContainer.Object)
- .Verify(store => store.GetAllLeasesAsync(), Times.Once);
+ .Verify(store => store.GetAllLeasesAsync(), Times.Exactly(2));
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs
index f6901bb75b..6a9fb11fc2 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseStoreManagerBuilderTests.cs
@@ -36,7 +36,8 @@ await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(
Mock.Of(),
leaseContainerMock.Object,
Guid.NewGuid().ToString(),
- Guid.NewGuid().ToString());
+ Guid.NewGuid().ToString(),
+ ChangeFeedMode.LatestVersion);
}
[TestMethod]
@@ -59,7 +60,8 @@ await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(
Mock.Of(),
leaseContainerMock.Object,
Guid.NewGuid().ToString(),
- Guid.NewGuid().ToString());
+ Guid.NewGuid().ToString(),
+ ChangeFeedMode.LatestVersion);
}
[TestMethod]
@@ -82,7 +84,8 @@ await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(
Mock.Of(),
leaseContainerMock.Object,
Guid.NewGuid().ToString(),
- Guid.NewGuid().ToString());
+ Guid.NewGuid().ToString(),
+ ChangeFeedMode.LatestVersion);
}
[TestMethod]
@@ -105,7 +108,8 @@ await Assert.ThrowsExceptionAsync(() => DocumentServiceLeaseS
Mock.Of(),
leaseContainerMock.Object,
Guid.NewGuid().ToString(),
- Guid.NewGuid().ToString()));
+ Guid.NewGuid().ToString(),
+ ChangeFeedMode.LatestVersion));
}
}
}