Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix double remove in OnErrorAsync
  • Loading branch information
idg10 committed May 1, 2025
commit b108795154947c285023e6047159782003ef41a9
78 changes: 43 additions & 35 deletions AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -616,22 +616,8 @@ public partial class AsyncObserver

async ValueTask OnErrorAsync(Exception ex)
{
var nullGroupLocal = default(IAsyncSubject<TElement>);

lock (nullGate)
{
nullGroupLocal = nullGroup;
}

if (nullGroupLocal != null)
{
await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false);
}

foreach (var group in groups.Values)
{
await group.OnErrorAsync(ex).ConfigureAwait(false);
}
await ErrorAndRemoveNullGroupIfPresentAsync(ex);
await ErrorAndRemoveAllGroupsIfPresentAsync(ex);

using (await gate.LockAsync().ConfigureAwait(false))
{
Expand Down Expand Up @@ -770,23 +756,7 @@ async ValueTask Expire()
await CompleteAndRemoveNullGroupIfPresentAsync();
}

foreach (var key in groups.Keys)
{
// The ConcurrentDictionary's Keys property is a snapshot, so
// although this TryRemove should always succeed for the first
// key in the dictionary (as long as our upstream observable is
// obeying the rules, and not making multiple concurrent calls
// to our observer) each await in this loop offers an opportunity
// for one of the group duration observables to complete, which
// will cause the Expire method above to run, meaning that an
// entry that was present when we retrieved Keys at the start of
// this loop might already have been completed and removed by the
// time this loop reaches it.
if (groups.TryRemove(key, out var group))
{
await group.OnCompletedAsync().ConfigureAwait(false);
}
}
await CompleteAndRemoveAllGroupsIfPresentAsync();

using (await gate.LockAsync().ConfigureAwait(false))
{
Expand All @@ -797,7 +767,9 @@ async ValueTask Expire()
refCount
);

async ValueTask CompleteAndRemoveNullGroupIfPresentAsync()
ValueTask CompleteAndRemoveNullGroupIfPresentAsync() => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(null);
ValueTask ErrorAndRemoveNullGroupIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(x);
async ValueTask CompleteOrErrorAndRemoveNullGroupIfPresentAsync(Exception x)
{
var oldNullGroup = default(IAsyncSubject<TElement>);

Expand All @@ -809,9 +781,45 @@ async ValueTask CompleteAndRemoveNullGroupIfPresentAsync()

if (oldNullGroup != null)
{
await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
if (x is null)
{
await oldNullGroup.OnCompletedAsync().ConfigureAwait(false);
}
else
{
await oldNullGroup.OnErrorAsync(x).ConfigureAwait(false);
}
}
}

ValueTask CompleteAndRemoveAllGroupsIfPresentAsync() => CompleteOrErrorAndRemoveAllGroupsAsync(null);
ValueTask ErrorAndRemoveAllGroupsIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveAllGroupsAsync(x);
async ValueTask CompleteOrErrorAndRemoveAllGroupsAsync(Exception x)
{
foreach (var key in groups.Keys)
{
// The ConcurrentDictionary's Keys property is a snapshot, so
// although this TryRemove should always succeed for the first
// key in the dictionary (as long as our upstream observable is
// obeying the rules, and not making multiple concurrent calls
// to our observer) each await in this loop offers an opportunity
// for one of the group duration observables to complete, which
// will cause the Expire method above to run, meaning that an
// entry that was present when we retrieved Keys at the start of
// this loop might already have been completed and removed by the
// time this loop reaches it.
if (groups.TryRemove(key, out var group))
{
if (x is null)
{
await group.OnCompletedAsync().ConfigureAwait(false);
}
else
{
await group.OnErrorAsync(x).ConfigureAwait(false);
}
}
}
}
}
}
Expand Down