diff --git a/CommunityToolkit.Mvvm/Messaging/IMessengerExtensions.Observables.cs b/CommunityToolkit.Mvvm/Messaging/IMessengerExtensions.Observables.cs
new file mode 100644
index 000000000..22a1cb759
--- /dev/null
+++ b/CommunityToolkit.Mvvm/Messaging/IMessengerExtensions.Observables.cs
@@ -0,0 +1,205 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Diagnostics.CodeAnalysis;
+using System.Linq;
+using System.Linq.Expressions;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using CommunityToolkit.Mvvm.Messaging.Internals;
+
+namespace CommunityToolkit.Mvvm.Messaging;
+
+///
+partial class IMessengerExtensions
+{
+ ///
+ /// Creates an instance that can be used to be notified whenever a message of a given type is broadcast by a messenger.
+ ///
+ /// The type of message to use to receive notification for through the resulting instance.
+ /// The instance to use to register the recipient.
+ /// An instance to receive notifications for messages being broadcast.
+ /// Thrown if is .
+ public static IObservable CreateObservable(this IMessenger messenger)
+ where TMessage : class
+ {
+ ArgumentNullException.ThrowIfNull(messenger);
+
+ return new Observable(messenger);
+ }
+
+ ///
+ /// Creates an instance that can be used to be notified whenever a message of a given type is broadcast by a messenger.
+ ///
+ /// The type of message to use to receive notification for through the resulting instance.
+ /// The type of token to identify what channel to use to receive messages.
+ /// The instance to use to register the recipient.
+ /// A token used to determine the receiving channel to use.
+ /// An instance to receive notifications for messages being broadcast.
+ /// Thrown if or are .
+ public static IObservable CreateObservable(this IMessenger messenger, TToken token)
+ where TMessage : class
+ where TToken : IEquatable
+ {
+ ArgumentNullException.ThrowIfNull(messenger);
+ ArgumentNullException.For.ThrowIfNull(token);
+
+ return new Observable(messenger, token);
+ }
+
+ ///
+ /// An implementations for a given message type.
+ ///
+ /// The type of messages to listen to.
+ private sealed class Observable : IObservable
+ where TMessage : class
+ {
+ ///
+ /// The instance to use to register the recipient.
+ ///
+ private readonly IMessenger messenger;
+
+ ///
+ /// Creates a new instance with the given parameters.
+ ///
+ /// The instance to use to register the recipient.
+ public Observable(IMessenger messenger)
+ {
+ this.messenger = messenger;
+ }
+
+ ///
+ public IDisposable Subscribe(IObserver observer)
+ {
+ return new Recipient(this.messenger, observer);
+ }
+
+ ///
+ /// An implementation for .
+ ///
+ private sealed class Recipient : IRecipient, IDisposable
+ {
+ ///
+ /// The instance to use to register the recipient.
+ ///
+ private readonly IMessenger messenger;
+
+ ///
+ /// The target instance currently in use.
+ ///
+ private readonly IObserver observer;
+
+ ///
+ /// Creates a new instance with the specified parameters.
+ ///
+ /// The instance to use to register the recipient.
+ /// The instance to use to create the recipient for.
+ public Recipient(IMessenger messenger, IObserver observer)
+ {
+ this.messenger = messenger;
+ this.observer = observer;
+
+ messenger.Register(this);
+ }
+
+ ///
+ public void Receive(TMessage message)
+ {
+ this.observer.OnNext(message);
+ }
+
+ ///
+ public void Dispose()
+ {
+ this.messenger.Unregister(this);
+ }
+ }
+ }
+
+ ///
+ /// An implementations for a given pair of message and token types.
+ ///
+ /// The type of messages to listen to.
+ /// The type of token to identify what channel to use to receive messages.
+ private sealed class Observable : IObservable
+ where TMessage : class
+ where TToken : IEquatable
+ {
+ ///
+ /// The instance to use to register the recipient.
+ ///
+ private readonly IMessenger messenger;
+
+ ///
+ /// The token used to determine the receiving channel to use.
+ ///
+ private readonly TToken token;
+
+ ///
+ /// Creates a new instance with the given parameters.
+ ///
+ /// The instance to use to register the recipient.
+ /// A token used to determine the receiving channel to use.
+ public Observable(IMessenger messenger, TToken token)
+ {
+ this.messenger = messenger;
+ this.token = token;
+ }
+
+ ///
+ public IDisposable Subscribe(IObserver observer)
+ {
+ return new Recipient(this.messenger, observer, this.token);
+ }
+
+ ///
+ /// An implementation for .
+ ///
+ private sealed class Recipient : IRecipient, IDisposable
+ {
+ ///
+ /// The instance to use to register the recipient.
+ ///
+ private readonly IMessenger messenger;
+
+ ///
+ /// The target instance currently in use.
+ ///
+ private readonly IObserver observer;
+
+ ///
+ /// The token used to determine the receiving channel to use.
+ ///
+ private readonly TToken token;
+
+ ///
+ /// Creates a new instance with the specified parameters.
+ ///
+ /// The instance to use to register the recipient.
+ /// The instance to use to create the recipient for.
+ /// A token used to determine the receiving channel to use.
+ public Recipient(IMessenger messenger, IObserver observer, TToken token)
+ {
+ this.messenger = messenger;
+ this.observer = observer;
+ this.token = token;
+
+ messenger.Register(this, token);
+ }
+
+ ///
+ public void Receive(TMessage message)
+ {
+ this.observer.OnNext(message);
+ }
+
+ ///
+ public void Dispose()
+ {
+ this.messenger.Unregister(this, this.token);
+ }
+ }
+ }
+}
diff --git a/CommunityToolkit.Mvvm/Messaging/IMessengerExtensions.cs b/CommunityToolkit.Mvvm/Messaging/IMessengerExtensions.cs
index 2163efd86..8f9f8ee6e 100644
--- a/CommunityToolkit.Mvvm/Messaging/IMessengerExtensions.cs
+++ b/CommunityToolkit.Mvvm/Messaging/IMessengerExtensions.cs
@@ -15,7 +15,7 @@ namespace CommunityToolkit.Mvvm.Messaging;
///
/// Extensions for the type.
///
-public static class IMessengerExtensions
+public static partial class IMessengerExtensions
{
///
/// A class that acts as a container to load the instance linked to
diff --git a/tests/CommunityToolkit.Mvvm.Roslyn401.UnitTests/CommunityToolkit.Mvvm.Roslyn401.UnitTests.csproj b/tests/CommunityToolkit.Mvvm.Roslyn401.UnitTests/CommunityToolkit.Mvvm.Roslyn401.UnitTests.csproj
index f58c5bf3e..197c23c9c 100644
--- a/tests/CommunityToolkit.Mvvm.Roslyn401.UnitTests/CommunityToolkit.Mvvm.Roslyn401.UnitTests.csproj
+++ b/tests/CommunityToolkit.Mvvm.Roslyn401.UnitTests/CommunityToolkit.Mvvm.Roslyn401.UnitTests.csproj
@@ -9,6 +9,7 @@
+
diff --git a/tests/CommunityToolkit.Mvvm.Roslyn431.UnitTests/CommunityToolkit.Mvvm.Roslyn431.UnitTests.csproj b/tests/CommunityToolkit.Mvvm.Roslyn431.UnitTests/CommunityToolkit.Mvvm.Roslyn431.UnitTests.csproj
index 89514cb03..ab4fca628 100644
--- a/tests/CommunityToolkit.Mvvm.Roslyn431.UnitTests/CommunityToolkit.Mvvm.Roslyn431.UnitTests.csproj
+++ b/tests/CommunityToolkit.Mvvm.Roslyn431.UnitTests/CommunityToolkit.Mvvm.Roslyn431.UnitTests.csproj
@@ -9,6 +9,7 @@
+
diff --git a/tests/CommunityToolkit.Mvvm.UnitTests/CommunityToolkit.Mvvm.UnitTests.projitems b/tests/CommunityToolkit.Mvvm.UnitTests/CommunityToolkit.Mvvm.UnitTests.projitems
index 276665389..3089c9db3 100644
--- a/tests/CommunityToolkit.Mvvm.UnitTests/CommunityToolkit.Mvvm.UnitTests.projitems
+++ b/tests/CommunityToolkit.Mvvm.UnitTests/CommunityToolkit.Mvvm.UnitTests.projitems
@@ -25,6 +25,7 @@
+
diff --git a/tests/CommunityToolkit.Mvvm.UnitTests/Test_Messenger.Observables.cs b/tests/CommunityToolkit.Mvvm.UnitTests/Test_Messenger.Observables.cs
new file mode 100644
index 000000000..e631eddf9
--- /dev/null
+++ b/tests/CommunityToolkit.Mvvm.UnitTests/Test_Messenger.Observables.cs
@@ -0,0 +1,102 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using CommunityToolkit.Mvvm.Messaging;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace CommunityToolkit.Mvvm.UnitTests;
+
+partial class Test_Messenger
+{
+ [TestMethod]
+ [DataRow(typeof(StrongReferenceMessenger))]
+ [DataRow(typeof(WeakReferenceMessenger))]
+ public void Test_Messenger_CreateObservable(Type type)
+ {
+ IMessenger messenger = (IMessenger)Activator.CreateInstance(type)!;
+
+ IObservable observable = messenger.CreateObservable();
+
+ Assert.IsNotNull(observable);
+
+ List messages = new();
+
+ IDisposable disposable = observable.Subscribe(messages.Add);
+
+ MessageA message1 = new();
+ MessageA message2 = new();
+
+ _ = messenger.Send(message1);
+ _ = messenger.Send(message2);
+
+ // The expected messages have been observed
+ CollectionAssert.AreEqual(messages, new[] { message1, message2 });
+
+ disposable.Dispose();
+
+ _ = messenger.Send();
+
+ // No messages are sent after unsubscribing the observable
+ CollectionAssert.AreEqual(messages, new[] { message1, message2 });
+ }
+
+ [TestMethod]
+ [DataRow(typeof(StrongReferenceMessenger))]
+ [DataRow(typeof(WeakReferenceMessenger))]
+ public void Test_Messenger_CreateObservable_WithToken(Type type)
+ {
+ IMessenger messenger = (IMessenger)Activator.CreateInstance(type)!;
+
+ IObservable observable = messenger.CreateObservable(42);
+
+ Assert.IsNotNull(observable);
+
+ List messages = new();
+
+ IDisposable disposable = observable.Subscribe(messages.Add);
+
+ MessageA message1 = new();
+ MessageA message2 = new();
+
+ _ = messenger.Send(message1, 42);
+ _ = messenger.Send(message2, 42);
+
+ _ = messenger.Send(new MessageA(), 1);
+ _ = messenger.Send(new MessageA(), 999);
+
+ // The expected messages have been observed (only for matching tokens)
+ CollectionAssert.AreEqual(messages, new[] { message1, message2 });
+
+ disposable.Dispose();
+
+ _ = messenger.Send(new MessageA(), 42);
+ _ = messenger.Send(new MessageA(), 1);
+
+ // No messages are sent after unsubscribing the observable (regardless of token)
+ CollectionAssert.AreEqual(messages, new[] { message1, message2 });
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentNullException))]
+ public void Test_Messenger_CreateObservable_NullMessenger()
+ {
+ _ = IMessengerExtensions.CreateObservable(null!);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentNullException))]
+ public void Test_Messenger_CreateObservable_WithToken_NullMessenger()
+ {
+ _ = IMessengerExtensions.CreateObservable(null!, "Hello");
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentNullException))]
+ public void Test_Messenger_CreateObservable_WithToken_NullToken()
+ {
+ _ = IMessengerExtensions.CreateObservable(new WeakReferenceMessenger(), null!);
+ }
+}
diff --git a/tests/CommunityToolkit.Mvvm.UnitTests/Test_Messenger.Request.cs b/tests/CommunityToolkit.Mvvm.UnitTests/Test_Messenger.Request.cs
index 876dc7c62..b6c6f3ac1 100644
--- a/tests/CommunityToolkit.Mvvm.UnitTests/Test_Messenger.Request.cs
+++ b/tests/CommunityToolkit.Mvvm.UnitTests/Test_Messenger.Request.cs
@@ -12,7 +12,7 @@
namespace CommunityToolkit.Mvvm.UnitTests;
-public partial class Test_Messenger
+partial class Test_Messenger
{
[TestMethod]
[DataRow(typeof(StrongReferenceMessenger))]