Skip to content

Commit b607989

Browse files
darklJohan 't Hart
andauthored
Observable progressive caller/callee (#355)
Caller/Callee observable support Co-authored-by: Johan 't Hart <[email protected]>
1 parent a491add commit b607989

File tree

11 files changed

+372
-51
lines changed

11 files changed

+372
-51
lines changed

src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs

Lines changed: 175 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Reactive.Disposables;
5+
using System.Reactive.Linq;
46
using System.Threading.Tasks;
57
using NUnit.Framework;
68
using WampSharp.Core.Serialization;
@@ -30,13 +32,65 @@ public async Task ProgressiveCallsCallerProgress()
3032
callerChannel.RealmProxy.RpcCatalog.Invoke
3133
(callback,
3234
new CallOptions() {ReceiveProgress = true},
33-
"com.myapp.longop",
35+
MyOperation.ProcedureUri,
3436
new object[] {10});
3537

36-
callback.Task.Wait(2000);
38+
int? result = await callback.Task;
3739

3840
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), callback.ProgressiveResults);
39-
Assert.That(callback.Task.Result, Is.EqualTo(10));
41+
Assert.That(result, Is.EqualTo(10));
42+
}
43+
44+
[Test]
45+
public async Task ProgressiveCallsCallerProgressObservable()
46+
{
47+
WampPlayground playground = new WampPlayground();
48+
49+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
50+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
51+
IWampChannel callerChannel = dualChannel.CallerChannel;
52+
53+
var service = new LongOpObservableService();
54+
await calleeChannel.RealmProxy.Services.RegisterCallee(service);
55+
56+
MyCallback callback = new MyCallback();
57+
58+
callerChannel.RealmProxy.RpcCatalog.Invoke
59+
(callback,
60+
new CallOptions() { ReceiveProgress = true },
61+
MyOperation.ProcedureUri,
62+
new object[] { 10 });
63+
64+
Assert.That(service.State, Is.EqualTo(LongOpObservableService.OperationState.Called));
65+
int? result = await callback.Task;
66+
Assert.That(result, Is.EqualTo(null));
67+
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), callback.ProgressiveResults);
68+
Assert.That(service.State, Is.EqualTo(LongOpObservableService.OperationState.Completed));
69+
}
70+
71+
[Test]
72+
public async Task ProgressiveCallsCallerProgressCancelObservable()
73+
{
74+
WampPlayground playground = new WampPlayground();
75+
76+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
77+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
78+
IWampChannel callerChannel = dualChannel.CallerChannel;
79+
80+
var service = new LongOpObservableService();
81+
await calleeChannel.RealmProxy.Services.RegisterCallee(service);
82+
83+
MyCallback callback = new MyCallback();
84+
85+
var invocation = callerChannel.RealmProxy.RpcCatalog.Invoke
86+
(callback,
87+
new CallOptions() { ReceiveProgress = true },
88+
MyOperation.ProcedureUri,
89+
new object[] { 10 });
90+
91+
Assert.That(service.State, Is.EqualTo(LongOpObservableService.OperationState.Called));
92+
invocation.Cancel(new CancelOptions());
93+
Assert.That(service.State, Is.EqualTo(LongOpObservableService.OperationState.Cancelled));
4094
}
4195

4296
[Test]
@@ -56,17 +110,56 @@ public async Task ProgressiveCallsCalleeProxyProgress()
56110
List<int> results = new List<int>();
57111
MyProgress<int> progress = new MyProgress<int>(i => results.Add(i));
58112

59-
Task<int> result = proxy.LongOp(10, progress);
60-
result.Wait();
113+
int result = await proxy.LongOp(10, progress);
61114

62115
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), results);
63116

64-
Assert.That(result.Result, Is.EqualTo(10));
117+
Assert.That(result, Is.EqualTo(10));
118+
}
119+
120+
[Test]
121+
public async Task ProgressiveCallsCalleeProxyObservable()
122+
{
123+
WampPlayground playground = new WampPlayground();
124+
125+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
126+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
127+
IWampChannel callerChannel = dualChannel.CallerChannel;
128+
129+
MyOperation myOperation = new MyOperation();
130+
131+
await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
132+
ILongOpObservableService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObservableService>();
133+
134+
IObservable<int> proxyResult = proxy.LongOp(9); // it will emit one more than asked
135+
136+
IEnumerable<int> results = proxyResult.ToEnumerable();
137+
138+
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), results);
139+
}
140+
141+
[Test]
142+
public async Task ProgressiveCallsCalleeProxyObservableError()
143+
{
144+
WampPlayground playground = new WampPlayground();
145+
146+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
147+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
148+
IWampChannel callerChannel = dualChannel.CallerChannel;
149+
150+
MyOperation myOperation = new MyOperation {EndWithError = true};
151+
152+
await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
153+
ILongOpObservableService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObservableService>();
154+
155+
Assert.Throws(typeof(WampException), () => proxy.LongOp(9).ToEnumerable().Count());
65156
}
66157

67158
public class MyOperation : IWampRpcOperation
68159
{
69-
public string Procedure => "com.myapp.longop";
160+
public const string ProcedureUri = "com.myapp.longop";
161+
162+
public string Procedure => ProcedureUri;
70163

71164
public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter, InvocationDetails details)
72165
{
@@ -87,23 +180,35 @@ public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCal
87180
new object[] {i});
88181
}
89182

90-
caller.Result(WampObjectFormatter.Value,
91-
new YieldOptions(),
92-
new object[] {n});
183+
if (EndWithError)
184+
{
185+
caller.Error(WampObjectFormatter.Value,
186+
new Dictionary<string, string>(),
187+
"longop.error",
188+
new object[] {"Something bad happened"});
189+
}
190+
else
191+
{
192+
caller.Result(WampObjectFormatter.Value,
193+
new YieldOptions(),
194+
new object[] { n });
195+
}
93196

94197
return null;
95198
}
96199

200+
public bool EndWithError { get; set; }
201+
97202
public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter, InvocationDetails details,
98-
TMessage[] arguments, IDictionary<string, TMessage> argumentsKeywords)
203+
TMessage[] arguments, IDictionary<string, TMessage> argumentsKeywords)
99204
{
100205
return null;
101206
}
102207
}
103208

104209
public interface ILongOpService
105210
{
106-
[WampProcedure("com.myapp.longop")]
211+
[WampProcedure(MyOperation.ProcedureUri)]
107212
[WampProgressiveResultProcedure]
108213
Task<int> LongOp(int n, IProgress<int> progress);
109214
}
@@ -122,17 +227,71 @@ public async Task<int> LongOp(int n, IProgress<int> progress)
122227
}
123228
}
124229

230+
public interface ILongOpObservableService
231+
{
232+
[WampProcedure(MyOperation.ProcedureUri)]
233+
[WampProgressiveResultProcedure]
234+
IObservable<int> LongOp(int n);
235+
}
236+
237+
public class LongOpObservableService : ILongOpObservableService
238+
{
239+
public enum OperationState
240+
{
241+
Nothing,
242+
Called,
243+
Completed,
244+
Cancelled
245+
}
246+
247+
public OperationState State { get; set; } = OperationState.Nothing;
248+
249+
public IObservable<int> LongOp(int n) => Observable.Create<int>(async (observer, cancellationToken) =>
250+
{
251+
State = OperationState.Called;
252+
253+
cancellationToken.Register(() =>
254+
{
255+
if (State == OperationState.Called)
256+
{
257+
State = OperationState.Cancelled;
258+
}
259+
});
260+
261+
for (int i = 0; i < n; i++)
262+
{
263+
observer.OnNext(i);
264+
await Task.Delay(100, cancellationToken);
265+
cancellationToken.ThrowIfCancellationRequested();
266+
}
267+
268+
State = OperationState.Completed;
269+
270+
if (EndWithError)
271+
{
272+
observer.OnError(new WampException("longop.error", "Something bad happened"));
273+
}
274+
else
275+
{
276+
observer.OnCompleted();
277+
}
278+
});
279+
280+
public bool EndWithError { get; set; }
281+
}
282+
125283
public class MyCallback : IWampRawRpcOperationClientCallback
126284
{
127-
private readonly TaskCompletionSource<int> mTask = new TaskCompletionSource<int>();
285+
private readonly TaskCompletionSource<int?> mTask = new TaskCompletionSource<int?>();
128286

129287
public List<int> ProgressiveResults { get; } = new List<int>();
130288

131-
public Task<int> Task => mTask.Task;
289+
public Task<int?> Task => mTask.Task;
132290

133291
public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details)
134292
{
135-
throw new NotImplementedException();
293+
mTask.SetResult(null);
294+
// null indicates no final return value
136295
}
137296

138297
public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details, TMessage[] arguments)
@@ -187,4 +346,4 @@ public void Report(T value)
187346
mAction(value);
188347
}
189348
}
190-
}
349+
}

src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptor.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Reflection;
23
using WampSharp.V2.Core.Contracts;
34
using WampSharp.V2.Rpc;
@@ -37,7 +38,7 @@ public virtual CallOptions GetCallOptions(MethodInfo method)
3738

3839
public virtual string GetProcedureUri(MethodInfo method)
3940
{
40-
WampProcedureAttribute attribute =
41+
WampProcedureAttribute attribute =
4142
method.GetCustomAttribute<WampProcedureAttribute>();
4243

4344
if (attribute == null)
@@ -48,4 +49,4 @@ public virtual string GetProcedureUri(MethodInfo method)
4849
return attribute.Procedure;
4950
}
5051
}
51-
}
52+
}

src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive;
23
using System.Reflection;
34
using System.Threading.Tasks;
45
using WampSharp.V2.Rpc;
@@ -28,7 +29,14 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
2829
Type genericArgument;
2930
Type interceptorType;
3031

31-
if (!typeof(Task).IsAssignableFrom(returnType))
32+
if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(IObservable<>))
33+
{
34+
MethodInfoValidation.ValidateProgressiveObservableMethod(method);
35+
36+
genericArgument = returnType.GetGenericArguments()[0];
37+
interceptorType = typeof(ObservableCalleeProxyInterceptor<>);
38+
}
39+
else if (!typeof(Task).IsAssignableFrom(returnType))
3240
{
3341
MethodInfoValidation.ValidateSyncMethod(method);
3442

@@ -55,4 +63,4 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
5563
return closedGenericType;
5664
}
5765
}
58-
}
66+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System.Reactive;
2+
using System.Reactive.Linq;
3+
using System.Reflection;
4+
5+
namespace WampSharp.V2.CalleeProxy
6+
{
7+
internal class ObservableCalleeProxyInterceptor<T> : CalleeProxyInterceptorBase<T>
8+
{
9+
public ObservableCalleeProxyInterceptor(MethodInfo method, IWampCalleeProxyInvocationHandler handler, ICalleeProxyInterceptor interceptor) : base(method, handler, interceptor)
10+
{
11+
}
12+
13+
// We can't use Observable.FromAsync, as there is no overload that receives IObserver, and therefore we can't convert it into
14+
// an IProgress instance and forward it to as the IProgress argument of InvokeProgressiveAsync.
15+
// However, Observable.FromAsync and Observable.Create both use TaskObservableExtensions.Subscribe, which
16+
// eventually uses EmitTaskResult so this is not a big deal.
17+
public override object Invoke(MethodInfo method, object[] arguments)
18+
{
19+
return Observable.Create<T>(async (observer, cancellationToken) =>
20+
{
21+
T last = await Handler.InvokeProgressiveAsync
22+
(Interceptor, method, Extractor, arguments, observer.ToProgress(),
23+
cancellationToken);
24+
25+
if (last != null)
26+
{
27+
observer.OnNext(last);
28+
}
29+
30+
observer.OnCompleted();
31+
});
32+
}
33+
}
34+
}

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/AsyncLocalRpcOperation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected override IWampCancellableInvocation InnerInvoke<TMessage>(IWampRawRpcO
4040

4141
if (SupportsCancellation)
4242
{
43-
result = new CancellationTokenSourceInvocation(task ,cancellationTokenSource);
43+
result = new CancellationTokenSourceInvocation(cancellationTokenSource);
4444
}
4545

4646
return result;

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/CancellationTokenSourceInvocation.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
using System.Threading;
2-
using System.Threading.Tasks;
32
using WampSharp.V2.Core.Contracts;
43

54
namespace WampSharp.V2.Rpc
65
{
76
internal class CancellationTokenSourceInvocation : IWampCancellableInvocation
87
{
9-
private readonly Task mTask;
108
private readonly CancellationTokenSource mCancellationTokenSource;
119

12-
public CancellationTokenSourceInvocation(Task task,
13-
CancellationTokenSource cancellationTokenSource)
10+
public CancellationTokenSourceInvocation(CancellationTokenSource cancellationTokenSource)
1411
{
15-
mTask = task;
1612
mCancellationTokenSource = cancellationTokenSource;
1713
}
1814

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected IEnumerable<object> UnpackParameters<TMessage>(IWampFormatter<TMessage
9393
{
9494
ArgumentUnpacker unpacker = new ArgumentUnpacker(Parameters);
9595

96-
IEnumerable<object> result =
96+
IEnumerable<object> result =
9797
unpacker.UnpackParameters(formatter, arguments, argumentsKeywords);
9898

9999
return result;

0 commit comments

Comments
 (0)