Skip to content
Merged
Prev Previous commit
Next Next commit
Enricher failures should not fail the whole ingestion pipeline, as th…
…ey are best-effort enhancements
  • Loading branch information
adamsitnik committed Nov 5, 2025
commit 655e5cf9cec53d71d6bb42c6a0f2071b8758efea
6 changes: 6 additions & 0 deletions src/Libraries/Microsoft.Extensions.DataIngestion/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,11 @@ internal static partial class Log

[LoggerMessage(6, LogLevel.Error, "An error occurred while ingesting document '{identifier}'.")]
internal static partial void IngestingFailed(this ILogger logger, Exception exception, string identifier);

[LoggerMessage(7, LogLevel.Error, "The AI chat service returned {resultCount} instead of {expectedCount} results.")]
internal static partial void UnexpectedResultsCount(this ILogger logger, int resultCount, int expectedCount);

[LoggerMessage(8, LogLevel.Error, "Unexpected enricher failure.")]
internal static partial void UnexpectedEnricherFailure(this ILogger logger, Exception exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;
Expand All @@ -22,6 +21,7 @@ public sealed class ClassificationEnricher : IngestionChunkProcessor<string>
{
private readonly EnricherOptions _options;
private readonly ChatMessage _systemPrompt;
private readonly ILogger? _logger;

/// <summary>
/// Initializes a new instance of the <see cref="ClassificationEnricher"/> class.
Expand All @@ -40,6 +40,7 @@ public ClassificationEnricher(EnricherOptions options, ReadOnlySpan<string> pred

Validate(predefinedClasses, fallbackClass!);
_systemPrompt = CreateSystemPrompt(predefinedClasses, fallbackClass!);
_logger = _options.LoggerFactory?.CreateLogger<ClassificationEnricher>();
}

/// <summary>
Expand All @@ -48,37 +49,8 @@ public ClassificationEnricher(EnricherOptions options, ReadOnlySpan<string> pred
public static string MetadataKey => "classification";

/// <inheritdoc />
public override async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(chunks);

await foreach (var batch in chunks.BufferAsync(_options.BatchSize).WithCancellation(cancellationToken))
{
List<AIContent> contents = new(batch.Count);
foreach (var chunk in batch)
{
contents.Add(new TextContent(chunk.Content));
}

var response = await _options.ChatClient.GetResponseAsync<string[]>(
[
_systemPrompt,
new(ChatRole.User, contents)
], _options.ChatOptions, cancellationToken: cancellationToken).ConfigureAwait(false);

if (response.Result.Length != contents.Count)
{
throw new InvalidOperationException($"The AI chat service returned {response.Result.Length} instead of {contents.Count} results.");
}

for (int i = 0; i < response.Result.Length; i++)
{
batch[i].Metadata[MetadataKey] = response.Result[i];
yield return batch[i];
}
}
}
public override IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks, CancellationToken cancellationToken = default)
=> Batching.ProcessAsync<string>(chunks, _options, MetadataKey, _systemPrompt, _logger, cancellationToken);

private static void Validate(ReadOnlySpan<string> predefinedClasses, string fallbackClass)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;
Expand Down Expand Up @@ -30,6 +31,15 @@ public EnricherOptions(IChatClient chatClient)
/// </summary>
public ChatOptions? ChatOptions { get; set; }

/// <summary>
/// Gets or sets the logger factory to be used for logging.
/// </summary>
/// <remarks>
/// Enricher failures should not fail the whole ingestion pipeline, as they are best-effort enhancements.
/// This logger factory can be used to create loggers to log such failures.
/// </remarks>
public ILoggerFactory? LoggerFactory { get; set; }

/// <summary>
/// Gets or sets the batch size for processing chunks. Default is 20.
/// </summary>
Expand All @@ -38,6 +48,7 @@ public EnricherOptions(IChatClient chatClient)
internal EnricherOptions Clone() => new(ChatClient)
{
ChatOptions = ChatOptions?.Clone(),
LoggerFactory = LoggerFactory,
BatchSize = BatchSize
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;
Expand All @@ -18,6 +19,7 @@ public sealed class ImageAlternativeTextEnricher : IngestionDocumentProcessor
{
private readonly EnricherOptions _options;
private readonly ChatMessage _systemPrompt;
private readonly ILogger? _logger;

/// <summary>
/// Initializes a new instance of the <see cref="ImageAlternativeTextEnricher"/> class.
Expand All @@ -27,6 +29,7 @@ public ImageAlternativeTextEnricher(EnricherOptions options)
{
_options = Throw.IfNull(options).Clone();
_systemPrompt = new(ChatRole.System, "For each of the following images, write a detailed alternative text with fewer than 50 words.");
_logger = _options.LoggerFactory?.CreateLogger<ImageAlternativeTextEnricher>();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -90,19 +93,30 @@ private async Task ProcessAsync(List<IngestionDocumentImage> batch, Cancellation
contents.Add(new DataContent(image.Content!.Value, image.MediaType!));
}

var response = await _options.ChatClient.GetResponseAsync<string[]>(
[_systemPrompt, new(ChatRole.User, contents)],
_options.ChatOptions,
cancellationToken: cancellationToken).ConfigureAwait(false);

if (response.Result.Length != contents.Count)
try
{
throw new InvalidOperationException($"The AI chat service returned {response.Result.Length} instead of {contents.Count} results.");
}
ChatResponse<string[]> response = await _options.ChatClient.GetResponseAsync<string[]>(
[_systemPrompt, new(ChatRole.User, contents)],
_options.ChatOptions, cancellationToken: cancellationToken).ConfigureAwait(false);

for (int i = 0; i < response.Result.Length; i++)
if (response.Result.Length == contents.Count)
{
for (int i = 0; i < response.Result.Length; i++)
{
batch[i].AlternativeText = response.Result[i];
}
}
else
{
_logger?.UnexpectedResultsCount(response.Result.Length, contents.Count);
}
}
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex)
#pragma warning restore CA1031 // Do not catch general exception types
{
batch[i].AlternativeText = response.Result[i];
// Enricher failures should not fail the whole ingestion pipeline, as they are best-effort enhancements.
_logger?.UnexpectedEnricherFailure(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;
Expand All @@ -23,6 +22,7 @@ public sealed class KeywordEnricher : IngestionChunkProcessor<string>
private const int DefaultMaxKeywords = 5;
private readonly EnricherOptions _options;
private readonly ChatMessage _systemPrompt;
private readonly ILogger? _logger;

/// <summary>
/// Initializes a new instance of the <see cref="KeywordEnricher"/> class.
Expand All @@ -48,6 +48,7 @@ public KeywordEnricher(EnricherOptions options, ReadOnlySpan<string> predefinedK
? Throw.IfLessThanOrEqual(maxKeywords.Value, 0, nameof(maxKeywords))
: DefaultMaxKeywords;
_systemPrompt = CreateSystemPrompt(keywordsCount, predefinedKeywords, threshold);
_logger = _options.LoggerFactory?.CreateLogger<KeywordEnricher>();
}

/// <summary>
Expand All @@ -56,37 +57,8 @@ public KeywordEnricher(EnricherOptions options, ReadOnlySpan<string> predefinedK
public static string MetadataKey => "keywords";

/// <inheritdoc/>
public override async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(chunks);

await foreach (var batch in chunks.BufferAsync(_options.BatchSize).WithCancellation(cancellationToken))
{
List<AIContent> contents = new(batch.Count);
foreach (var chunk in batch)
{
contents.Add(new TextContent(chunk.Content));
}

var response = await _options.ChatClient.GetResponseAsync<string[][]>(
[
_systemPrompt,
new(ChatRole.User, contents)
], _options.ChatOptions, cancellationToken: cancellationToken).ConfigureAwait(false);

if (response.Result.Length != contents.Count)
{
throw new InvalidOperationException($"The AI chat service returned {response.Result.Length} instead of {contents.Count} results.");
}

for (int i = 0; i < response.Result.Length; i++)
{
batch[i].Metadata[MetadataKey] = response.Result[i];
yield return batch[i];
}
}
}
public override IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks, CancellationToken cancellationToken = default)
=> Batching.ProcessAsync<string[]>(chunks, _options, MetadataKey, _systemPrompt, _logger, cancellationToken);

private static void Validate(ReadOnlySpan<string> predefinedKeywords)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;
Expand All @@ -21,6 +20,7 @@ public sealed class SentimentEnricher : IngestionChunkProcessor<string>
{
private readonly EnricherOptions _options;
private readonly ChatMessage _systemPrompt;
private readonly ILogger? _logger;

/// <summary>
/// Initializes a new instance of the <see cref="SentimentEnricher"/> class.
Expand All @@ -38,6 +38,7 @@ public SentimentEnricher(EnricherOptions options, double? confidenceThreshold =
Unknown when confidence score is below {threshold}.
""";
_systemPrompt = new(ChatRole.System, prompt);
_logger = _options.LoggerFactory?.CreateLogger<SentimentEnricher>();
}

/// <summary>
Expand All @@ -46,35 +47,6 @@ Unknown when confidence score is below {threshold}.
public static string MetadataKey => "sentiment";

/// <inheritdoc/>
public override async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(chunks);

await foreach (var batch in chunks.BufferAsync(_options.BatchSize).WithCancellation(cancellationToken))
{
List<AIContent> contents = new(batch.Count);
foreach (var chunk in batch)
{
contents.Add(new TextContent(chunk.Content));
}

var response = await _options.ChatClient.GetResponseAsync<string[]>(
[
_systemPrompt,
new(ChatRole.User, contents)
], _options.ChatOptions, cancellationToken: cancellationToken).ConfigureAwait(false);

if (response.Result.Length != contents.Count)
{
throw new InvalidOperationException($"The AI chat service returned {response.Result.Length} instead of {contents.Count} results.");
}

for (int i = 0; i < response.Result.Length; i++)
{
batch[i].Metadata[MetadataKey] = response.Result[i];
yield return batch[i];
}
}
}
public override IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks, CancellationToken cancellationToken = default)
=> Batching.ProcessAsync<string>(chunks, _options, MetadataKey, _systemPrompt, _logger, cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;
Expand All @@ -21,6 +20,7 @@ public sealed class SummaryEnricher : IngestionChunkProcessor<string>
{
private readonly EnricherOptions _options;
private readonly ChatMessage _systemPrompt;
private readonly ILogger? _logger;

/// <summary>
/// Initializes a new instance of the <see cref="SummaryEnricher"/> class.
Expand All @@ -33,6 +33,7 @@ public SummaryEnricher(EnricherOptions options, int? maxWordCount = null)

int wordCount = maxWordCount.HasValue ? Throw.IfLessThanOrEqual(maxWordCount.Value, 0, nameof(maxWordCount)) : 100;
_systemPrompt = new(ChatRole.System, $"For each of the following texts, write a summary text with no more than {wordCount} words.");
_logger = _options.LoggerFactory?.CreateLogger<SummaryEnricher>();
}

/// <summary>
Expand All @@ -41,35 +42,6 @@ public SummaryEnricher(EnricherOptions options, int? maxWordCount = null)
public static string MetadataKey => "summary";

/// <inheritdoc/>
public override async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(chunks);

await foreach (var batch in chunks.BufferAsync(_options.BatchSize).WithCancellation(cancellationToken))
{
List<AIContent> contents = new(batch.Count);
foreach (var chunk in batch)
{
contents.Add(new TextContent(chunk.Content));
}

var response = await _options.ChatClient.GetResponseAsync<string[]>(
[
_systemPrompt,
new(ChatRole.User, contents)
], _options.ChatOptions, cancellationToken: cancellationToken).ConfigureAwait(false);

if (response.Result.Length != contents.Count)
{
throw new InvalidOperationException($"The AI chat service returned {response.Result.Length} instead of {contents.Count} results.");
}

for (int i = 0; i < response.Result.Length; i++)
{
batch[i].Metadata[MetadataKey] = response.Result[i];
yield return batch[i];
}
}
}
public override IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks, CancellationToken cancellationToken = default)
=> Batching.ProcessAsync<string>(chunks, _options, MetadataKey, _systemPrompt, _logger, cancellationToken);
}
Loading