Skip to content

Commit 5423a2a

Browse files
committed
Adding async subjects implementation
1 parent ca81db0 commit 5423a2a

File tree

5 files changed

+238
-0
lines changed

5 files changed

+238
-0
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Reactive.Subjects;
2+
3+
namespace WampSharp.V2
4+
{
5+
/// <summary>
6+
/// Represents a <see cref="IAsyncSubject{TSource,TResult}"/> that publishes/receives messages
7+
/// via a WAMP topic.
8+
/// </summary>
9+
public interface IWampAsyncSubject : IAsyncSubject<IWampEvent, IWampSerializedEvent>
10+
{
11+
}
12+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using WampSharp.Core.Serialization;
4+
using WampSharp.V2.Core.Contracts;
5+
using WampSharp.V2.PubSub;
6+
7+
namespace WampSharp.V2
8+
{
9+
internal class RawTopicClientAsyncSubscriber : IWampRawTopicClientSubscriber
10+
{
11+
private readonly IAsyncObserver<IWampSerializedEvent> mObserver;
12+
13+
public RawTopicClientAsyncSubscriber(IAsyncObserver<IWampSerializedEvent> observer)
14+
{
15+
mObserver = observer;
16+
}
17+
18+
public void Event<TMessage>(IWampFormatter<TMessage> formatter, long publicationId, EventDetails details)
19+
{
20+
mObserver.OnNextAsync(new WampSerializedEvent<TMessage>(formatter, publicationId, details));
21+
}
22+
23+
public void Event<TMessage>(IWampFormatter<TMessage> formatter, long publicationId, EventDetails details, TMessage[] arguments)
24+
{
25+
mObserver.OnNextAsync(new WampSerializedEvent<TMessage>(formatter, publicationId, details, arguments));
26+
}
27+
28+
public void Event<TMessage>(IWampFormatter<TMessage> formatter, long publicationId, EventDetails details, TMessage[] arguments, IDictionary<string, TMessage> argumentsKeywords)
29+
{
30+
mObserver.OnNextAsync(new WampSerializedEvent<TMessage>(formatter, publicationId, details, arguments, argumentsKeywords));
31+
}
32+
}
33+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using WampSharp.V2.Core.Contracts;
5+
6+
namespace WampSharp.V2
7+
{
8+
internal abstract class WampAsyncSubject : IWampAsyncSubject
9+
{
10+
private static readonly PublishOptions EmptyOptions =
11+
new PublishOptions();
12+
13+
public virtual Task OnNextAsync(IWampEvent value)
14+
{
15+
PublishOptions options = value.Options ?? EmptyOptions;
16+
object[] arguments = value.Arguments;
17+
IDictionary<string, object> argumentsKeywords = value.ArgumentsKeywords;
18+
19+
Task result;
20+
21+
if (argumentsKeywords != null)
22+
{
23+
result = Publish(options, arguments, argumentsKeywords);
24+
}
25+
else if (arguments != null)
26+
{
27+
result = Publish(options, arguments);
28+
}
29+
else
30+
{
31+
result = Publish(options);
32+
}
33+
34+
return result;
35+
}
36+
37+
public virtual Task OnErrorAsync(Exception error)
38+
{
39+
throw new NotImplementedException();
40+
}
41+
42+
public virtual Task OnCompletedAsync()
43+
{
44+
throw new NotImplementedException();
45+
}
46+
47+
protected abstract Task Publish(PublishOptions options);
48+
protected abstract Task Publish(PublishOptions options, object[] arguments);
49+
protected abstract Task Publish(PublishOptions options, object[] arguments,
50+
IDictionary<string, object> argumentsKeywords);
51+
52+
public abstract Task<IAsyncDisposable> SubscribeAsync(IAsyncObserver<IWampSerializedEvent> observer);
53+
}
54+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Reactive.Linq;
4+
using System.Threading.Tasks;
5+
using WampSharp.Core.Listener;
6+
using WampSharp.V2.Client;
7+
using WampSharp.V2.Core.Contracts;
8+
using WampSharp.V2.Realm;
9+
10+
namespace WampSharp.V2
11+
{
12+
internal class WampClientAsyncSubject : WampAsyncSubject
13+
{
14+
private readonly IWampTopicProxy mTopic;
15+
private readonly IAsyncObservable<IWampSerializedEvent> mObservable;
16+
17+
public WampClientAsyncSubject(IWampTopicProxy topic, IWampClientConnectionMonitor monitor)
18+
{
19+
mTopic = topic;
20+
21+
mObservable = CreateObservable(topic, monitor);
22+
}
23+
24+
private static IAsyncObservable<IWampSerializedEvent> CreateObservable(IWampTopicProxy topic, IWampClientConnectionMonitor monitor)
25+
{
26+
IObservable<IWampSerializedEvent> connectionError =
27+
Observable.FromEventPattern<WampConnectionErrorEventArgs>
28+
(x => monitor.ConnectionError += x,
29+
x => monitor.ConnectionError -= x)
30+
.SelectMany(x => Observable.Throw<IWampSerializedEvent>(x.EventArgs.Exception));
31+
32+
IObservable<IWampSerializedEvent> connectionComplete =
33+
Observable.FromEventPattern<WampSessionCloseEventArgs>
34+
(x => monitor.ConnectionBroken += x,
35+
x => monitor.ConnectionBroken -= x)
36+
.SelectMany(x => Observable.Throw<IWampSerializedEvent>(new WampConnectionBrokenException(x.EventArgs)));
37+
38+
ClientAsyncObservable messages = new ClientAsyncObservable(topic);
39+
40+
IObservable<IWampSerializedEvent> connectionNotifications =
41+
Observable.Merge(connectionError, connectionComplete);
42+
43+
IAsyncObservable<IWampSerializedEvent> asyncNotifications =
44+
AsyncObservable.ToAsyncObservable(connectionNotifications);
45+
46+
//IAsyncObservable<IWampSerializedEvent> result =
47+
// AsyncObservable.Merge(asyncNotifications, messages);
48+
49+
IAsyncObservable<IWampSerializedEvent> result =
50+
new[] {asyncNotifications, messages}.ToObservable()
51+
.ToAsyncObservable()
52+
.Merge();
53+
54+
return result;
55+
}
56+
57+
protected override Task Publish(PublishOptions options)
58+
{
59+
return mTopic.Publish(options);
60+
}
61+
62+
protected override Task Publish(PublishOptions options, object[] arguments)
63+
{
64+
return mTopic.Publish(options, arguments);
65+
}
66+
67+
protected override Task Publish(PublishOptions options, object[] arguments,
68+
IDictionary<string, object> argumentsKeywords)
69+
{
70+
return mTopic.Publish(options, arguments, argumentsKeywords);
71+
}
72+
73+
public override Task<IAsyncDisposable> SubscribeAsync(IAsyncObserver<IWampSerializedEvent> observer)
74+
{
75+
return mObservable.SubscribeAsync(observer);
76+
}
77+
78+
private class ClientAsyncObservable : IAsyncObservable<IWampSerializedEvent>
79+
{
80+
private readonly IWampTopicProxy mTopic;
81+
82+
public ClientAsyncObservable(IWampTopicProxy topic)
83+
{
84+
mTopic = topic;
85+
}
86+
87+
public Task<IAsyncDisposable> SubscribeAsync(IAsyncObserver<IWampSerializedEvent> observer)
88+
{
89+
Task<IAsyncDisposable>
90+
task = mTopic.Subscribe(new RawTopicClientAsyncSubscriber(observer), new SubscribeOptions());
91+
92+
return task;
93+
}
94+
}
95+
}
96+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Linq;
3+
using System.Reactive.Linq;
4+
using System.Reactive.Subjects;
5+
using System.Threading.Tasks;
6+
7+
namespace WampSharp.V2
8+
{
9+
internal class WampAsyncTopicSubject<T> : IAsyncSubject<T>
10+
{
11+
private readonly IWampAsyncSubject mSubject;
12+
private readonly IAsyncObservable<T> mObservable;
13+
14+
public WampAsyncTopicSubject(IWampAsyncSubject subject)
15+
{
16+
mSubject = subject;
17+
18+
mObservable =
19+
subject.Where(x => x.Arguments != null && x.Arguments.Any())
20+
.Select(x => x.Arguments[0].Deserialize<T>());
21+
}
22+
23+
public Task OnNextAsync(T value)
24+
{
25+
return mSubject.OnNextAsync(new WampEvent {Arguments = new object[] {value}});
26+
}
27+
28+
public Task OnErrorAsync(Exception error)
29+
{
30+
throw new NotImplementedException();
31+
}
32+
33+
public Task OnCompletedAsync()
34+
{
35+
throw new NotImplementedException();
36+
}
37+
38+
public Task<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer)
39+
{
40+
return mObservable.SubscribeAsync(observer);
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)