Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
preview cfp ffcf
  • Loading branch information
philipthomas-MSFT committed Mar 25, 2024
commit 5d36ba727693ea5bcaf455d03e0dcf12a42dc8fc
78 changes: 76 additions & 2 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.Cosmos
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Serializer;

/// <summary>
/// Operations for reading, replacing, or deleting a specific, existing container or item in a container by id.
Expand Down Expand Up @@ -1681,6 +1680,81 @@ public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);

/// <summary>
/// Initializes a <see cref="GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes"/> for change feed processing with all versions and deletes.
/// </summary>
/// <typeparam name="T">Document type</typeparam>
/// <param name="processorName">A name that identifies the Processor and the particular work it will do.</param>
/// <param name="onChangesDelegate">Delegate to receive all changes and deletes</param>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// Container leaseContainer = await this.database.CreateContainerAsync(
/// new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
/// cancellationToken: this.cancellationToken);
///
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
///
/// ChangeFeedProcessor changeFeedProcessor = this.Container
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
/// {
/// Console.WriteLine($"number of documents processed: {documents.Count}");
///
/// string id = default;
/// string pk = default;
/// string description = default;
///
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
/// {
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
/// {
/// id = changeFeedItem.Current.id.ToString();
/// pk = changeFeedItem.Current.pk.ToString();
/// description = changeFeedItem.Current.description.ToString();
/// }
/// else
/// {
/// id = changeFeedItem.Previous.id.ToString();
/// pk = changeFeedItem.Previous.pk.ToString();
/// description = changeFeedItem.Previous.description.ToString();
/// }
///
/// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
/// long previousLsn = changeFeedItem.Metadata.PreviousLsn;
/// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
/// long lsn = changeFeedItem.Metadata.Lsn;
/// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
/// }
///
/// return Task.CompletedTask;
/// })
/// .WithInstanceName(Guid.NewGuid().ToString())
/// .WithLeaseContainer(leaseContainer)
/// .WithErrorNotification((leaseToken, error) =>
/// {
/// Console.WriteLine(error.ToString());
///
/// return Task.CompletedTask;
/// })
/// .Build();
///
/// await changeFeedProcessor.StartAsync();
/// await Task.Delay(1000);
/// await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
/// await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
/// await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
///
/// allProcessedDocumentsEvent.WaitOne(10 * 1000);
///
/// await changeFeedProcessor.StopAsync();
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
#endif
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.ReadFeed;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;

// This class acts as a wrapper for environments that use SynchronizationContext.
Expand Down Expand Up @@ -661,14 +660,5 @@ public override Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
task: (trace) => base.DeleteAllItemsByPartitionKeyStreamAsync(partitionKey, trace, requestOptions, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate)
{
return base.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(
processorName,
onChangesDelegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,82 +147,11 @@ public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);
#endif

/// <summary>
/// Initializes a <see cref="GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes"/> for change feed processing with all versions and deletes.
/// </summary>
/// <typeparam name="T">Document type</typeparam>
/// <param name="processorName">A name that identifies the Processor and the particular work it will do.</param>
/// <param name="onChangesDelegate">Delegate to receive all changes and deletes</param>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// Container leaseContainer = await this.database.CreateContainerAsync(
/// new ContainerProperties(id: "leases", partitionKeyPath: "/id"),
/// cancellationToken: this.cancellationToken);
///
/// ManualResetEvent allProcessedDocumentsEvent = new ManualResetEvent(false);
///
/// ChangeFeedProcessor changeFeedProcessor = this.Container
/// .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItemChange<dynamic>> documents, CancellationToken token) =>
/// {
/// Console.WriteLine($"number of documents processed: {documents.Count}");
///
/// string id = default;
/// string pk = default;
/// string description = default;
///
/// foreach (ChangeFeedItemChange<dynamic> changeFeedItem in documents)
/// {
/// if (changeFeedItem.Metadata.OperationType != ChangeFeedOperationType.Delete)
/// {
/// id = changeFeedItem.Current.id.ToString();
/// pk = changeFeedItem.Current.pk.ToString();
/// description = changeFeedItem.Current.description.ToString();
/// }
/// else
/// {
/// id = changeFeedItem.Previous.id.ToString();
/// pk = changeFeedItem.Previous.pk.ToString();
/// description = changeFeedItem.Previous.description.ToString();
/// }
///
/// ChangeFeedOperationType operationType = changeFeedItem.Metadata.OperationType;
/// long previousLsn = changeFeedItem.Metadata.PreviousLsn;
/// DateTime conflictResolutionTimestamp = changeFeedItem.Metadata.ConflictResolutionTimestamp;
/// long lsn = changeFeedItem.Metadata.Lsn;
/// bool isTimeToLiveExpired = changeFeedItem.Metadata.IsTimeToLiveExpired;
/// }
///
/// return Task.CompletedTask;
/// })
/// .WithInstanceName(Guid.NewGuid().ToString())
/// .WithLeaseContainer(leaseContainer)
/// .WithErrorNotification((leaseToken, error) =>
/// {
/// Console.WriteLine(error.ToString());
///
/// return Task.CompletedTask;
/// })
/// .Build();
///
/// await changeFeedProcessor.StartAsync();
/// await Task.Delay(1000);
/// await this.Container.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
/// await this.Container.UpsertItemAsync<dynamic>(new { id = "1", pk = "1", description = "test after replace" }, partitionKey: new PartitionKey("1"));
/// await this.Container.DeleteItemAsync<dynamic>(id: "1", partitionKey: new PartitionKey("1"));
///
/// allProcessedDocumentsEvent.WaitOne(10 * 1000);
///
/// await changeFeedProcessor.StopAsync();
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="ChangeFeedProcessorBuilder"/></returns>
public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
string processorName,
ChangeFeedHandler<ChangeFeedItemChange<T>> onChangesDelegate);
#endif

public abstract class TryExecuteQueryResult
{
Expand Down