Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eng/packages/General.props
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<PackageVersion Include="Microsoft.Extensions.VectorData.Abstractions" Version="$(MicrosoftExtensionsVectorDataAbstractionsVersion)" />
<PackageVersion Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.0" />
<PackageVersion Include="Microsoft.ML.Tokenizers" Version="$(MicrosoftMLTokenizersVersion)" />
<PackageVersion Include="ModelContextProtocol" Version="0.4.0-preview.2" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="OllamaSharp" Version="5.1.9" />
<PackageVersion Include="OpenAI" Version="2.6.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
using ModelContextProtocol.Client;
using ModelContextProtocol.Protocol;

namespace Microsoft.Extensions.DataIngestion;

/// <summary>
/// Reads documents by converting them to Markdown using the <see href="https://github.com/microsoft/markitdown">MarkItDown</see> MCP server.
/// </summary>
public class MarkItDownMcpReader : IngestionDocumentReader
{
private readonly Uri _mcpServerUri;

/// <summary>
/// Initializes a new instance of the <see cref="MarkItDownMcpReader"/> class.
/// </summary>
/// <param name="mcpServerUri">The URI of the MarkItDown MCP server (e.g., http://localhost:3001/sse).</param>
public MarkItDownMcpReader(Uri mcpServerUri)
{
_mcpServerUri = Throw.IfNull(mcpServerUri);
}

/// <inheritdoc/>
public override async Task<IngestionDocument> ReadAsync(FileInfo source, string identifier, string? mediaType = null, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);

if (!source.Exists)
{
throw new FileNotFoundException("The specified file does not exist.", source.FullName);
}

// Read file content as base64 data URI
#if NET
byte[] fileBytes = await File.ReadAllBytesAsync(source.FullName, cancellationToken).ConfigureAwait(false);
#else
byte[] fileBytes;
using (FileStream fs = new(source.FullName, FileMode.Open, FileAccess.Read, FileShare.Read))
{
using MemoryStream ms = new();
await fs.CopyToAsync(ms).ConfigureAwait(false);
fileBytes = ms.ToArray();
}
#endif
string base64Content = Convert.ToBase64String(fileBytes);
string mimeType = string.IsNullOrEmpty(mediaType) ? "application/octet-stream" : mediaType!;
string dataUri = $"data:{mimeType};base64,{base64Content}";

string markdown = await ConvertToMarkdownAsync(dataUri, cancellationToken).ConfigureAwait(false);

return MarkdownParser.Parse(markdown, identifier);
}

/// <inheritdoc/>
public override async Task<IngestionDocument> ReadAsync(Stream source, string identifier, string mediaType, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(source);
_ = Throw.IfNullOrEmpty(identifier);

// Read stream content as base64 data URI
using MemoryStream ms = new();
#if NET
await source.CopyToAsync(ms, cancellationToken).ConfigureAwait(false);
#else
await source.CopyToAsync(ms).ConfigureAwait(false);
#endif
byte[] fileBytes = ms.ToArray();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot, once you switch to using DataContent, it accepts a Readonlymemory, so rather than using ToArray, you can pass in a Readonlymemory created from MemoryStream's GetBuffer and Length

string base64Content = Convert.ToBase64String(fileBytes);
string mimeType = string.IsNullOrEmpty(mediaType) ? "application/octet-stream" : mediaType;
string dataUri = $"data:{mimeType};base64,{base64Content}";

string markdown = await ConvertToMarkdownAsync(dataUri, cancellationToken).ConfigureAwait(false);

return MarkdownParser.Parse(markdown, identifier);
}

private async Task<string> ConvertToMarkdownAsync(string dataUri, CancellationToken cancellationToken)
{
// Create HTTP client transport for MCP
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task - await using pattern
await using var transport = new HttpClientTransport(new HttpClientTransportOptions
{
Endpoint = _mcpServerUri
});
#pragma warning restore CA2007

// Create MCP client
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
await using var client = await McpClient.CreateAsync(transport, cancellationToken: cancellationToken).ConfigureAwait(false);
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task

// Build parameters for convert_to_markdown tool
var parameters = new Dictionary<string, object?>
{
["uri"] = dataUri
};

// Call the convert_to_markdown tool
var result = await client.CallToolAsync("convert_to_markdown", parameters, cancellationToken: cancellationToken).ConfigureAwait(false);

// Extract markdown content from result
// The result is expected to be in the format: { "content": [{ "type": "text", "text": "markdown content" }] }
if (result.Content != null && result.Content.Count > 0)
{
foreach (var content in result.Content)
{
if (content.Type == "text" && content is TextContentBlock textBlock)
{
return textBlock.Text;
}
}
}

throw new InvalidOperationException("Failed to convert document to markdown: unexpected response format from MCP server.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

<ItemGroup>
<PackageReference Include="Markdig.Signed" />
<PackageReference Include="ModelContextProtocol" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ Or directly in the C# project file:

## Usage Examples

### Creating a MarkItDownReader for Data Ingestion
### Creating a MarkItDownReader for Data Ingestion (Local Process)

Use `MarkItDownReader` to convert documents using the MarkItDown executable installed locally:

```csharp
using Microsoft.Extensions.DataIngestion;
Expand All @@ -31,6 +33,33 @@ IngestionDocumentReader reader =
using IngestionPipeline<string> pipeline = new(reader, CreateChunker(), CreateWriter());
```

### Creating a MarkItDownMcpReader for Data Ingestion (MCP Server)

Use `MarkItDownMcpReader` to convert documents using a MarkItDown MCP server:

```csharp
using Microsoft.Extensions.DataIngestion;

// Connect to a MarkItDown MCP server (e.g., running in Docker)
IngestionDocumentReader reader =
new MarkItDownMcpReader(new Uri("http://localhost:3001/sse"));

using IngestionPipeline<string> pipeline = new(reader, CreateChunker(), CreateWriter());
```

The MarkItDown MCP server can be run using Docker:

```bash
docker run -p 3001:3001 mcp/markitdown
```

Or installed via pip:

```bash
pip install markitdown-mcp-server
markitdown-mcp --http --host 0.0.0.0 --port 3001
```

## Feedback & Contributing

We welcome feedback and contributions in [our GitHub repo](https://github.com/dotnet/extensions).
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.Extensions.DataIngestion.Readers.Tests;

public class MarkItDownMcpReaderTests
{
[Fact]
public void Constructor_ThrowsWhenMcpServerUriIsNull()
{
Assert.Throws<ArgumentNullException>("mcpServerUri", () => new MarkItDownMcpReader(null!));
}

[Fact]
public async Task ReadAsync_ThrowsWhenIdentifierIsNull()
{
var reader = new MarkItDownMcpReader(new Uri("http://localhost:3001/sse"));

await Assert.ThrowsAsync<ArgumentNullException>("identifier", async () => await reader.ReadAsync(new FileInfo("fileName.txt"), identifier: null!));
await Assert.ThrowsAsync<ArgumentException>("identifier", async () => await reader.ReadAsync(new FileInfo("fileName.txt"), identifier: string.Empty));

using MemoryStream stream = new();
await Assert.ThrowsAsync<ArgumentNullException>("identifier", async () => await reader.ReadAsync(stream, identifier: null!, mediaType: "some"));
await Assert.ThrowsAsync<ArgumentException>("identifier", async () => await reader.ReadAsync(stream, identifier: string.Empty, mediaType: "some"));
}

[Fact]
public async Task ReadAsync_ThrowsWhenSourceIsNull()
{
var reader = new MarkItDownMcpReader(new Uri("http://localhost:3001/sse"));

await Assert.ThrowsAsync<ArgumentNullException>("source", async () => await reader.ReadAsync((FileInfo)null!, "identifier"));
await Assert.ThrowsAsync<ArgumentNullException>("source", async () => await reader.ReadAsync((Stream)null!, "identifier", "mediaType"));
}

[Fact]
public async Task ReadAsync_ThrowsWhenFileDoesNotExist()
{
var reader = new MarkItDownMcpReader(new Uri("http://localhost:3001/sse"));
var nonExistentFile = new FileInfo(Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()));

await Assert.ThrowsAsync<FileNotFoundException>(async () => await reader.ReadAsync(nonExistentFile, "identifier"));
}

// NOTE: Integration tests with an actual MCP server would go here, but they would require
// a running MarkItDown MCP server to be available, which is not part of the test setup.
// For full integration testing, use a real MCP server in a separate test environment.
}
Loading