Skip to content

Commit 98ddaf5

Browse files
authored
Fix hot monos in GRPC + unit tests. (dapr#251)
1 parent eb27994 commit 98ddaf5

File tree

4 files changed

+189
-10
lines changed

4 files changed

+189
-10
lines changed

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,7 @@ public Mono<Void> saveStates(String stateStoreName, List<State<?>> states) {
287287
}
288288
DaprProtos.SaveStateEnvelope envelope = builder.build();
289289

290-
ListenableFuture<Empty> futureEmpty = client.saveState(envelope);
291-
return Mono.just(futureEmpty).flatMap(f -> {
290+
return Mono.fromCallable(() -> client.saveState(envelope)).flatMap(f -> {
292291
try {
293292
f.get();
294293
} catch (Exception ex) {
@@ -432,8 +431,7 @@ public Mono<Void> deleteState(String stateStoreName, String key, String etag, St
432431
}
433432

434433
DaprProtos.DeleteStateEnvelope envelope = builder.build();
435-
ListenableFuture<Empty> futureEmpty = client.deleteState(envelope);
436-
return Mono.just(futureEmpty).flatMap(f -> {
434+
return Mono.fromCallable(() -> client.deleteState(envelope)).flatMap(f -> {
437435
try {
438436
f.get();
439437
} catch (Exception ex) {

sdk/src/main/java/io/dapr/client/DaprHttp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public Mono<Response> invokeApi(
221221
* @return DaprError or null if could not parse.
222222
*/
223223
private static DaprError parseDaprError(byte[] json) throws IOException {
224-
if (json == null) {
224+
if ((json == null) || (json.length == 0)) {
225225
return null;
226226
}
227227
return OBJECT_MAPPER.readValue(json, DaprError.class);

sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java

Lines changed: 115 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,21 @@ public void publishEventTest() {
8282
assertTrue(callback.wasCalled);
8383
}
8484

85+
@Test
86+
public void publishEventNoHotMono() {
87+
SettableFuture<Empty> settableFuture = SettableFuture.create();
88+
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
89+
addCallback(settableFuture, callback, directExecutor());
90+
when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class)))
91+
.thenAnswer(c -> {
92+
settableFuture.set(Empty.newBuilder().build());
93+
return settableFuture;
94+
});
95+
adapter.publishEvent("topic", "object");
96+
// Do not call block() on the mono above, so nothing should happen.
97+
assertFalse(callback.wasCalled);
98+
}
99+
85100
@Test
86101
public void publishEventObjectTest() {
87102
SettableFuture<Empty> settableFuture = SettableFuture.create();
@@ -145,6 +160,22 @@ public void invokeBindingObjectTest() {
145160
assertTrue(callback.wasCalled);
146161
}
147162

163+
@Test
164+
public void invokeBindingObjectNoHotMono() {
165+
SettableFuture<Empty> settableFuture = SettableFuture.create();
166+
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
167+
addCallback(settableFuture, callback, directExecutor());
168+
when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class)))
169+
.thenAnswer(c -> {
170+
settableFuture.set(Empty.newBuilder().build());
171+
return settableFuture;
172+
});
173+
MyObject event = new MyObject(1, "Event");
174+
adapter.invokeBinding("BindingName", event);
175+
// Do not call block() on mono above, so nothing should happen.
176+
assertFalse(callback.wasCalled);
177+
}
178+
148179
@Test(expected = RuntimeException.class)
149180
public void invokeServiceVoidExceptionThrownTest() {
150181
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
@@ -415,6 +446,24 @@ public void invokeServiceNoRequestNoClassBodyTest() throws Exception {
415446
assertTrue(callback.wasCalled);
416447
}
417448

449+
@Test
450+
public void invokeServiceNoRequestNoHotMono() throws Exception {
451+
String expected = "Value";
452+
SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create();
453+
MockCallback<DaprProtos.InvokeServiceResponseEnvelope> callback =
454+
new MockCallback<DaprProtos.InvokeServiceResponseEnvelope>(DaprProtos.InvokeServiceResponseEnvelope.newBuilder()
455+
.setData(getAny(expected)).build());
456+
addCallback(settableFuture, callback, directExecutor());
457+
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
458+
.thenAnswer(c -> {
459+
settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(expected)).build());
460+
return settableFuture;
461+
});
462+
adapter.invokeService(Verb.GET, "appId", "method", null);
463+
// Do not call block() on mono above, so nothing should happen.
464+
assertFalse(callback.wasCalled);
465+
}
466+
418467
@Test
419468
public void invokeServiceNoRequestNoClassBodyObjectTest() throws Exception {
420469
MyObject resultObj = new MyObject(1, "Value");
@@ -466,13 +515,34 @@ public void getStateStringValueNoOptionsTest() throws IOException {
466515
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
467516
addCallback(settableFuture, callback, directExecutor());
468517
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
469-
.thenReturn(settableFuture);
518+
.thenReturn(settableFuture);
470519
State<String> keyRequest = buildStateKey(null, key, etag, null);
471520
Mono<State<String>> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class);
472521
settableFuture.set(responseEnvelope);
473522
assertEquals(expectedState, result.block());
474523
}
475524

525+
@Test
526+
public void getStateStringValueNoHotMono() throws IOException {
527+
String etag = "ETag1";
528+
String key = "key1";
529+
String expectedValue = "Expected state";
530+
State<String> expectedState = buildStateKey(expectedValue, key, etag, null);
531+
DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag);
532+
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
533+
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
534+
addCallback(settableFuture, callback, directExecutor());
535+
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
536+
.thenAnswer(c -> {
537+
settableFuture.set(responseEnvelope);
538+
return settableFuture;
539+
});
540+
State<String> keyRequest = buildStateKey(null, key, etag, null);
541+
adapter.getState(STATE_STORE_NAME, keyRequest, String.class);
542+
// block() on the mono above is not called, so nothing should happen.
543+
assertFalse(callback.wasCalled);
544+
}
545+
476546
@Test
477547
public void getStateObjectValueWithOptionsTest() throws IOException {
478548
String etag = "ETag1";
@@ -564,20 +634,41 @@ public void deleteStateTest() {
564634
String etag = "ETag1";
565635
String key = "key1";
566636
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
567-
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
637+
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
568638
SettableFuture<Empty> settableFuture = SettableFuture.create();
569639
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
570640
addCallback(settableFuture, callback, directExecutor());
571641
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
572-
.thenReturn(settableFuture);
642+
.thenReturn(settableFuture);
573643
State<String> stateKey = buildStateKey(null, key, etag, options);
574644
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
575-
stateKey.getOptions());
645+
stateKey.getOptions());
576646
settableFuture.set(Empty.newBuilder().build());
577647
result.block();
578648
assertTrue(callback.wasCalled);
579649
}
580650

651+
@Test
652+
public void deleteStateTestNoHotMono() {
653+
String etag = "ETag1";
654+
String key = "key1";
655+
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
656+
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
657+
SettableFuture<Empty> settableFuture = SettableFuture.create();
658+
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
659+
addCallback(settableFuture, callback, directExecutor());
660+
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
661+
.thenAnswer(c -> {
662+
settableFuture.set(Empty.newBuilder().build());
663+
return settableFuture;
664+
});
665+
State<String> stateKey = buildStateKey(null, key, etag, options);
666+
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
667+
stateKey.getOptions());
668+
// Do not call result.block(), so nothing should happen.
669+
assertFalse(callback.wasCalled);
670+
}
671+
581672
@Test
582673
public void deleteStateNoConsistencyTest() {
583674
String etag = "ETag1";
@@ -742,13 +833,32 @@ public void saveStateTest() {
742833
addCallback(settableFuture, callback, directExecutor());
743834
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
744835
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
745-
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
836+
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
746837
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
747838
settableFuture.set(Empty.newBuilder().build());
748839
result.block();
749840
assertTrue(callback.wasCalled);
750841
}
751842

843+
@Test
844+
public void saveStateTestNoHotMono() {
845+
String key = "key1";
846+
String etag = "ETag1";
847+
String value = "State value";
848+
SettableFuture<Empty> settableFuture = SettableFuture.create();
849+
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
850+
addCallback(settableFuture, callback, directExecutor());
851+
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenAnswer(c -> {
852+
settableFuture.set(Empty.newBuilder().build());
853+
return settableFuture;
854+
});
855+
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
856+
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
857+
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
858+
// No call to result.block(), so nothing should happen.
859+
assertFalse(callback.wasCalled);
860+
}
861+
752862
@Test
753863
public void saveStateNoConsistencyTest() {
754864
String key = "key1";

sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ public void publishEventIfTopicIsNull() {
7777
assertNull(mono.block());
7878
}
7979

80+
@Test
81+
public void publishEventNoHotMono() {
82+
mockInterceptor.addRule()
83+
.post("http://127.0.0.1:3000/v1.0/publish/A")
84+
.respond(EXPECTED_RESULT);
85+
String event = "{ \"message\": \"This is a test\" }";
86+
daprHttp = new DaprHttp(3000, okHttpClient);
87+
daprClientHttp = new DaprClientHttp(daprHttp);
88+
daprClientHttp.publishEvent("", event);
89+
// Should not throw exception because did not call block() on mono above.
90+
}
91+
8092
@Test(expected = IllegalArgumentException.class)
8193
public void invokeServiceVerbNull() {
8294
mockInterceptor.addRule()
@@ -187,6 +199,18 @@ public void invokeServiceWithRequest() {
187199
assertNull(mono.block());
188200
}
189201

202+
@Test
203+
public void invokeServiceNoHotMono() {
204+
Map<String, String> map = new HashMap<>();
205+
mockInterceptor.addRule()
206+
.get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder")
207+
.respond(500);
208+
daprHttp = new DaprHttp(3000, okHttpClient);
209+
daprClientHttp = new DaprClientHttp(daprHttp);
210+
daprClientHttp.invokeService(Verb.GET, "41", "neworder", "", map);
211+
// No exception should be thrown because did not call block() on mono above.
212+
}
213+
190214
@Test
191215
public void invokeBinding() {
192216
Map<String, String> map = new HashMap<>();
@@ -211,6 +235,17 @@ public void invokeBindingNullName() {
211235
assertNull(mono.block());
212236
}
213237

238+
@Test
239+
public void bindingNoHotMono() {
240+
Map<String, String> map = new HashMap<>();
241+
mockInterceptor.addRule()
242+
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
243+
.respond(EXPECTED_RESULT);
244+
daprHttp = new DaprHttp(3000, okHttpClient);
245+
daprClientHttp = new DaprClientHttp(daprHttp);
246+
daprClientHttp.invokeBinding(null, "");
247+
// No exception is thrown because did not call block() on mono above.
248+
}
214249

215250
@Test
216251
public void getStates() {
@@ -253,6 +288,18 @@ public void getStatesNullEtag() {
253288
assertEquals(monoNullEtag.block().getKey(), "key");
254289
}
255290

291+
@Test
292+
public void getStatesNoHotMono() {
293+
State<String> stateNullEtag = new State("value", "key", null, null);
294+
mockInterceptor.addRule()
295+
.get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key")
296+
.respond(500);
297+
daprHttp = new DaprHttp(3000, okHttpClient);
298+
daprClientHttp = new DaprClientHttp(daprHttp);
299+
daprClientHttp.getState(STATE_STORE_NAME, stateNullEtag, String.class);
300+
// No exception should be thrown since did not call block() on mono above.
301+
}
302+
256303
@Test
257304
public void saveStates() {
258305
State<String> stateKeyValue = new State("value", "key", "etag", null);
@@ -319,6 +366,17 @@ public void simpleSaveStates() {
319366
assertNull(mono.block());
320367
}
321368

369+
@Test
370+
public void saveStatesNoHotMono() {
371+
mockInterceptor.addRule()
372+
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore")
373+
.respond(500);
374+
StateOptions stateOptions = mock(StateOptions.class);
375+
daprHttp = new DaprHttp(3000, okHttpClient);
376+
daprClientHttp = new DaprClientHttp(daprHttp);
377+
daprClientHttp.saveState(STATE_STORE_NAME, "key", "etag", "value", stateOptions);
378+
// No exception should be thrown because we did not call block() on the mono above.
379+
}
322380

323381
@Test
324382
public void deleteState() {
@@ -333,6 +391,19 @@ public void deleteState() {
333391
assertNull(mono.block());
334392
}
335393

394+
@Test
395+
public void deleteStateNoHotMono() {
396+
StateOptions stateOptions = mock(StateOptions.class);
397+
State<String> stateKeyValue = new State("value", "key", "etag", stateOptions);
398+
mockInterceptor.addRule()
399+
.delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key")
400+
.respond(500);
401+
daprHttp = new DaprHttp(3000, okHttpClient);
402+
daprClientHttp = new DaprClientHttp(daprHttp);
403+
daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions);
404+
// No exception should be thrown because we did not call block() on the mono above.
405+
}
406+
336407
@Test
337408
public void deleteStateNullEtag() {
338409
State<String> stateKeyValue = new State("value", "key", null, null);

0 commit comments

Comments
 (0)