Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public abstract class ChangeFeedProcessorContext
/// Gets the headers related to the service response that provided the changes.
/// </summary>
public abstract Headers Headers { get; }

/// <summary>
/// Gets the feed range.
/// </summary>
public abstract FeedRange FeedRange { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ private void HandleFailedRequest(

private Task DispatchChangesAsync(ResponseMessage response, CancellationToken cancellationToken)
{
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.options.LeaseToken, response, this.checkpointer);
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(
this.options.LeaseToken,
response,
this.checkpointer,
this.options.FeedRange);
return this.observer.ProcessChangesAsync(context, response.Content, cancellationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public override FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObser
FeedPollDelay = this.changeFeedProcessorOptions.FeedPollDelay,
MaxItemCount = this.changeFeedProcessorOptions.MaxItemCount,
StartFromBeginning = this.changeFeedProcessorOptions.StartFromBeginning,
StartTime = this.changeFeedProcessorOptions.StartTime
StartTime = this.changeFeedProcessorOptions.StartTime,
FeedRange = lease.FeedRange,
};

PartitionCheckpointerCore checkpointer = new PartitionCheckpointerCore(this.leaseCheckpointer, lease);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
using System;
using Microsoft.Azure.Documents;

internal class ProcessorOptions
{
Expand All @@ -22,5 +21,7 @@ internal class ProcessorOptions
public DateTime? StartTime { get; set; }

public TimeSpan RequestTimeout { get; set; } = CosmosHttpClient.GatewayRequestTimeout;

public FeedRange FeedRange { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
Expand All @@ -24,11 +23,13 @@ internal sealed class ChangeFeedObserverContextCore
internal ChangeFeedObserverContextCore(
string leaseToken,
ResponseMessage feedResponse,
PartitionCheckpointer checkpointer)
PartitionCheckpointer checkpointer,
FeedRange feedRange)
{
this.LeaseToken = leaseToken;
this.responseMessage = feedResponse;
this.checkpointer = checkpointer;
this.FeedRange = feedRange;
}

public string LeaseToken { get; }
Expand All @@ -37,6 +38,8 @@ internal ChangeFeedObserverContextCore(

public Headers Headers => this.responseMessage.Headers;

public FeedRange FeedRange { get; }

public async Task CheckpointAsync()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ public ChangeFeedProcessorContextCore(ChangeFeedObserverContextCore changeFeedOb
public override CosmosDiagnostics Diagnostics => this.changeFeedObserverContextCore.Diagnostics;

public override Headers Headers => this.changeFeedObserverContextCore.Headers;

public override FeedRange FeedRange => this.changeFeedObserverContextCore.FeedRange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Scripts;
using Microsoft.Azure.Cosmos.Services.Management.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;

Expand Down Expand Up @@ -380,7 +382,7 @@ public async Task TestWithStartTime_CustomTime()
Assert.AreEqual("doc5.doc6.doc7.doc8.doc9.", accumulator);
}

private void ValidateContext(ChangeFeedProcessorContext changeFeedProcessorContext)
private async void ValidateContext(ChangeFeedProcessorContext changeFeedProcessorContext)
{
Assert.IsNotNull(changeFeedProcessorContext.LeaseToken);
Assert.IsNotNull(changeFeedProcessorContext.Diagnostics);
Expand All @@ -389,6 +391,19 @@ private void ValidateContext(ChangeFeedProcessorContext changeFeedProcessorConte
Assert.IsTrue(changeFeedProcessorContext.Headers.RequestCharge > 0);
string diagnosticsAsString = changeFeedProcessorContext.Diagnostics.ToString();
Assert.IsTrue(diagnosticsAsString.Contains("Change Feed Processor Read Next Async"));

await this.ValidateFeedRangeAsync(changeFeedProcessorContext.FeedRange);
}

private async Task ValidateFeedRangeAsync(FeedRange feedRange)
{
Assert.IsNotNull(feedRange);

IEnumerable<string> partitionKeyRanges = await this.Container.GetPartitionKeyRangesAsync(feedRange);

Assert.IsNotNull(partitionKeyRanges);
Assert.AreEqual(1, partitionKeyRanges.Count());
Assert.AreEqual(expected: "0", actual: partitionKeyRanges.FirstOrDefault());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,53 +588,5 @@ private async Task<ContainerInternal> CreateMonitoredContainer(ChangeFeedMode ch

return (ContainerInternal)response;
}

[TestMethod]
[Owner("philipthomas-MSFT")]
[Description("Scenario: WithStartTime should throw an exception when used in AVAD mode.")]
public async Task WhenACFPInAVADModeUsesWithStartTimeExpectExceptionTestsAsync()
{
ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);

InvalidOperationException exception = Assert.ThrowsException<InvalidOperationException>(() =>
{
ChangeFeedProcessor processor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName: "processor",
onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> docs, CancellationToken cancellationToken) => Task.CompletedTask)
.WithStartTime(DateTime.Now)
.WithInstanceName(Guid.NewGuid().ToString())
.WithLeaseContainer(this.LeaseContainer)
.Build();
});

Assert.AreEqual(
expected: "Using the 'WithStartTime' option with ChangeFeedProcessor is not supported with Microsoft.Azure.Cosmos.ChangeFeed.ChangeFeedModeFullFidelity mode.",
actual: exception.Message);
}

[TestMethod]
[Owner("philipthomas-MSFT")]
[Description("Scenario: WithStartFromBeginning should throw an exception when used in AVAD mode.")]
public async Task WhenACFPInAVADModeUsesWithStartFromBeginningExpectExceptionTestsAsync()
{
ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);

InvalidOperationException exception = Assert.ThrowsException<InvalidOperationException>(() =>
{
ChangeFeedProcessor processor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName: "processor",
onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> docs, CancellationToken cancellationToken) => Task.CompletedTask)
.WithStartFromBeginning()
.WithInstanceName(Guid.NewGuid().ToString())
.WithLeaseContainer(this.LeaseContainer)
.Build();
});

Assert.AreEqual(
expected: "Using the 'WithStartFromBeginning' option with ChangeFeedProcessor is not supported with Microsoft.Azure.Cosmos.ChangeFeed.ChangeFeedModeFullFidelity mode.",
actual: exception.Message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public AutoCheckPointTests()

ResponseMessage responseMessage = new ResponseMessage();
responseMessage.Headers.ContinuationToken = Guid.NewGuid().ToString();
this.observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, this.partitionCheckpointer.Object);
this.observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, this.partitionCheckpointer.Object, FeedRangeEpk.FullRange);
}

[TestMethod]
Expand Down Expand Up @@ -78,7 +78,7 @@ public async Task ProcessChanges_WhenCheckpointThrows_ShouldThrow()

ResponseMessage responseMessage = new ResponseMessage();
responseMessage.Headers.ContinuationToken = Guid.NewGuid().ToString();
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException caught = await Assert.ThrowsExceptionAsync<CosmosException>(() => this.sut.ProcessChangesAsync(observerContext, this.stream, CancellationToken.None));
Assert.AreEqual(original, caught);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void ExposesResponseProperties()
ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK);
responseMessage.Headers.RequestCharge = 10;

ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

Assert.AreEqual(leaseToken, changeFeedObserverContextCore.LeaseToken);
Assert.ReferenceEquals(responseMessage.Headers, changeFeedObserverContextCore.Headers);
Expand All @@ -40,7 +40,7 @@ public async Task TryCheckpoint_OnSuccess()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).Returns(Task.CompletedTask);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

await changeFeedObserverContextCore.CheckpointAsync();
}
Expand All @@ -54,7 +54,7 @@ public async Task TryCheckpoint_OnLeaseLost()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(new LeaseLostException());
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.AreEqual(HttpStatusCode.PreconditionFailed, exception.StatusCode);
Expand All @@ -70,7 +70,7 @@ public async Task TryCheckpoint_OnCosmosException()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(cosmosException);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.ReferenceEquals(cosmosException, exception);
Expand All @@ -86,7 +86,7 @@ public async Task TryCheckpoint_OnUnknownException()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(cosmosException);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

NotImplementedException exception = await Assert.ThrowsExceptionAsync<NotImplementedException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.ReferenceEquals(cosmosException, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Task changesHandler(IReadOnlyCollection<dynamic> docs, CancellationToken token)
Assert.IsNotNull(changeFeedObserver);

ResponseMessage responseMessage = this.BuildResponseMessage();
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand All @@ -67,7 +67,7 @@ Task changesHandler(ChangeFeedProcessorContext context, IReadOnlyCollection<dyna
Assert.IsNotNull(changeFeedObserver);

ResponseMessage responseMessage = this.BuildResponseMessage();
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand All @@ -92,7 +92,7 @@ Task changesHandler(ChangeFeedProcessorContext context, IReadOnlyCollection<dyna
Assert.IsNotNull(changeFeedObserver);

ResponseMessage responseMessage = this.BuildResponseMessage();
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand All @@ -117,7 +117,7 @@ Task changesHandler(ChangeFeedProcessorContext context, Stream stream, Cancellat
Assert.IsNotNull(changeFeedObserver);


ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand All @@ -141,7 +141,7 @@ Task changesHandler(ChangeFeedProcessorContext context, Stream stream, Func<Task

Assert.IsNotNull(changeFeedObserver);

ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class ObserverExceptionTests
public void ValidateConstructor()
{
ResponseMessage responseMessage = new ResponseMessage();
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);
ChangeFeedProcessorContextCore changeFeedProcessorContext = new ChangeFeedProcessorContextCore(observerContext);
Exception exception = new Exception("randomMessage");
ChangeFeedProcessorUserException ex = new ChangeFeedProcessorUserException(exception, changeFeedProcessorContext);
Expand All @@ -35,7 +35,7 @@ public void ValidateConstructor()
public void ValidateSerialization_AllFields()
{
ResponseMessage responseMessage = new ResponseMessage();
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);
ChangeFeedProcessorContextCore changeFeedProcessorContext = new ChangeFeedProcessorContextCore(observerContext);
Exception exception = new Exception("randomMessage");
ChangeFeedProcessorUserException originalException = new ChangeFeedProcessorUserException(exception, changeFeedProcessorContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ObserverExceptionWrappingChangeFeedObserverDecoratorTests
public ObserverExceptionWrappingChangeFeedObserverDecoratorTests()
{
this.observer = new Mock<ChangeFeedObserver>();
this.changeFeedObserverContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: null, Mock.Of<PartitionCheckpointer>()); ;
this.changeFeedObserverContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: null, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange); ;
this.observerWrapper = new ObserverExceptionWrappingChangeFeedObserverDecorator(this.observer.Object);

this.serializerCore = new CosmosSerializerCore();
Expand Down
Loading