|
| 1 | +From 2921f133676149aae8a6029dae3fd2ccfe705ed6 Mon Sep 17 00:00:00 2001 |
| 2 | +From: Artur Souza < [email protected]> |
| 3 | +Date: Wed, 27 May 2020 18:54:47 -0700 |
| 4 | +Subject: [PATCH] Auto-activate actors. |
| 5 | + |
| 6 | +--- |
| 7 | + .../io/dapr/actors/runtime/ActorManager.java | 9 +- |
| 8 | + .../io/dapr/actors/runtime/ActorRuntime.java | 24 ++---- |
| 9 | + .../dapr/actors/runtime/ActorRuntimeTest.java | 83 ++++++++++++------- |
| 10 | + .../io/dapr/springboot/DaprController.java | 12 --- |
| 11 | + 4 files changed, 67 insertions(+), 61 deletions(-) |
| 12 | + |
| 13 | +diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java |
| 14 | +index e249585..e0a2ec6 100644 |
| 15 | +--- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java |
| 16 | ++++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java |
| 17 | +@@ -58,8 +58,13 @@ class ActorManager<T extends AbstractActor> { |
| 18 | + * @return Asynchronous void response. |
| 19 | + */ |
| 20 | + Mono<Void> activateActor(ActorId actorId) { |
| 21 | +- return Mono.fromSupplier(() -> this.runtimeContext.getActorFactory().createActor(runtimeContext, actorId)) |
| 22 | +- .flatMap(actor -> actor.onActivateInternal().then(this.onActivatedActor(actorId, actor))); |
| 23 | ++ return Mono.fromSupplier(() -> { |
| 24 | ++ if (this.activeActors.containsKey(actorId)) { |
| 25 | ++ return null; |
| 26 | ++ } |
| 27 | ++ |
| 28 | ++ return this.runtimeContext.getActorFactory().createActor(runtimeContext, actorId); |
| 29 | ++ }).flatMap(actor -> actor.onActivateInternal().then(this.onActivatedActor(actorId, actor))); |
| 30 | + } |
| 31 | + |
| 32 | + /** |
| 33 | +diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java |
| 34 | +index 8925672..9404efe 100644 |
| 35 | +--- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java |
| 36 | ++++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java |
| 37 | +@@ -198,18 +198,6 @@ public class ActorRuntime { |
| 38 | + this.config.addRegisteredActorType(actorTypeInfo.getName()); |
| 39 | + } |
| 40 | + |
| 41 | +- /** |
| 42 | +- * Activates an actor for an actor type with given actor id. |
| 43 | +- * |
| 44 | +- * @param actorTypeName Actor type name to activate the actor for. |
| 45 | +- * @param actorId Actor id for the actor to be activated. |
| 46 | +- * @return Async void task. |
| 47 | +- */ |
| 48 | +- public Mono<Void> activate(String actorTypeName, String actorId) { |
| 49 | +- return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) |
| 50 | +- .flatMap(m -> m.activateActor(new ActorId(actorId))); |
| 51 | +- } |
| 52 | +- |
| 53 | + /** |
| 54 | + * Deactivates an actor for an actor type with given actor id. |
| 55 | + * |
| 56 | +@@ -233,8 +221,10 @@ public class ActorRuntime { |
| 57 | + * @return Response for the actor method. |
| 58 | + */ |
| 59 | + public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMethodName, byte[] payload) { |
| 60 | ++ ActorId id = new ActorId(actorId); |
| 61 | + return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) |
| 62 | +- .flatMap(m -> m.invokeMethod(new ActorId(actorId), actorMethodName, payload)); |
| 63 | ++ .flatMap(m -> m.activateActor(id).thenReturn(m)) |
| 64 | ++ .flatMap(m -> ((ActorManager)m).invokeMethod(id, actorMethodName, payload)); |
| 65 | + } |
| 66 | + |
| 67 | + /** |
| 68 | +@@ -247,8 +237,10 @@ public class ActorRuntime { |
| 69 | + * @return Async void task. |
| 70 | + */ |
| 71 | + public Mono<Void> invokeReminder(String actorTypeName, String actorId, String reminderName, byte[] params) { |
| 72 | ++ ActorId id = new ActorId(actorId); |
| 73 | + return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) |
| 74 | +- .flatMap(m -> m.invokeReminder(new ActorId(actorId), reminderName, params)); |
| 75 | ++ .flatMap(m -> m.activateActor(id).thenReturn(m)) |
| 76 | ++ .flatMap(m -> ((ActorManager)m).invokeReminder(new ActorId(actorId), reminderName, params)); |
| 77 | + } |
| 78 | + |
| 79 | + /** |
| 80 | +@@ -260,8 +252,10 @@ public class ActorRuntime { |
| 81 | + * @return Async void task. |
| 82 | + */ |
| 83 | + public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName) { |
| 84 | ++ ActorId id = new ActorId(actorId); |
| 85 | + return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) |
| 86 | +- .flatMap(m -> m.invokeTimer(new ActorId(actorId), timerName)); |
| 87 | ++ .flatMap(m -> m.activateActor(id).thenReturn(m)) |
| 88 | ++ .flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName)); |
| 89 | + } |
| 90 | + |
| 91 | + /** |
| 92 | +diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java |
| 93 | +index 34b4605..986cac5 100644 |
| 94 | +--- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java |
| 95 | ++++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java |
| 96 | +@@ -25,18 +25,60 @@ public class ActorRuntimeTest { |
| 97 | + |
| 98 | + public interface MyActor { |
| 99 | + String say(); |
| 100 | ++ int count(); |
| 101 | + } |
| 102 | + |
| 103 | + @ActorType(name = ACTOR_NAME) |
| 104 | + public static class MyActorImpl extends AbstractActor implements MyActor { |
| 105 | + |
| 106 | ++ private int count = 0; |
| 107 | ++ |
| 108 | ++ private Boolean activated; |
| 109 | ++ |
| 110 | + public MyActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { |
| 111 | + super(runtimeContext, id); |
| 112 | + } |
| 113 | + |
| 114 | ++ public Mono<Void> onActivate() { |
| 115 | ++ return Mono.fromRunnable(() -> { |
| 116 | ++ if (this.activated != null) { |
| 117 | ++ throw new IllegalStateException("already activated once"); |
| 118 | ++ } |
| 119 | ++ |
| 120 | ++ this.activated = true; |
| 121 | ++ }); |
| 122 | ++ } |
| 123 | ++ |
| 124 | ++ public Mono<Void> onDeactivate() { |
| 125 | ++ return Mono.fromRunnable(() -> { |
| 126 | ++ if (this.activated == null) { |
| 127 | ++ throw new IllegalStateException("never activated"); |
| 128 | ++ } |
| 129 | ++ |
| 130 | ++ if (this.activated == false) { |
| 131 | ++ throw new IllegalStateException("already deactivated"); |
| 132 | ++ } |
| 133 | ++ |
| 134 | ++ if (this.count == 0) { |
| 135 | ++ throw new IllegalStateException("test expects a call before deactivate"); |
| 136 | ++ } |
| 137 | ++ |
| 138 | ++ this.activated = false; |
| 139 | ++ }); |
| 140 | ++ } |
| 141 | ++ |
| 142 | + public String say() { |
| 143 | ++ if (!this.activated) { |
| 144 | ++ throw new IllegalStateException("not activated"); |
| 145 | ++ } |
| 146 | ++ |
| 147 | ++ this.count++; |
| 148 | + return "Nothing to say."; |
| 149 | + } |
| 150 | ++ |
| 151 | ++ public int count() { |
| 152 | ++ return this.count; |
| 153 | ++ } |
| 154 | + } |
| 155 | + |
| 156 | + private static final ActorObjectSerializer ACTOR_STATE_SERIALIZER = new ActorObjectSerializer(); |
| 157 | +@@ -71,32 +113,17 @@ public class ActorRuntimeTest { |
| 158 | + Assert.assertTrue(new String(this.runtime.serializeConfig()).contains(ACTOR_NAME)); |
| 159 | + } |
| 160 | + |
| 161 | +- @Test |
| 162 | +- public void activateActor() throws Exception { |
| 163 | +- String actorId = UUID.randomUUID().toString(); |
| 164 | +- this.runtime.registerActor(MyActorImpl.class); |
| 165 | +- this.runtime.activate(ACTOR_NAME, actorId).block(); |
| 166 | +- } |
| 167 | + |
| 168 | + @Test |
| 169 | + public void invokeActor() throws Exception { |
| 170 | + String actorId = UUID.randomUUID().toString(); |
| 171 | + this.runtime.registerActor(MyActorImpl.class); |
| 172 | +- this.runtime.activate(ACTOR_NAME, actorId).block(); |
| 173 | + |
| 174 | + byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block(); |
| 175 | + String message = ACTOR_STATE_SERIALIZER.deserialize(response, String.class); |
| 176 | + Assert.assertEquals("Nothing to say.", message); |
| 177 | + } |
| 178 | + |
| 179 | +- @Test |
| 180 | +- public void activateThendeactivateActor() throws Exception { |
| 181 | +- String actorId = UUID.randomUUID().toString(); |
| 182 | +- this.runtime.registerActor(MyActorImpl.class); |
| 183 | +- this.runtime.activate(ACTOR_NAME, actorId).block(); |
| 184 | +- this.runtime.deactivate(ACTOR_NAME, actorId).block(); |
| 185 | +- } |
| 186 | +- |
| 187 | + @Test |
| 188 | + public void deactivateActor() throws Exception { |
| 189 | + String actorId = UUID.randomUUID().toString(); |
| 190 | +@@ -104,30 +131,16 @@ public class ActorRuntimeTest { |
| 191 | + this.runtime.deactivate(ACTOR_NAME, actorId).block(); |
| 192 | + } |
| 193 | + |
| 194 | +- @Test |
| 195 | +- public void lazyActivate() throws Exception { |
| 196 | +- String actorId = UUID.randomUUID().toString(); |
| 197 | +- this.runtime.registerActor(MyActorImpl.class); |
| 198 | +- this.runtime.activate(ACTOR_NAME, actorId).block(); |
| 199 | +- |
| 200 | +- this.runtime.invoke(ACTOR_NAME, actorId, "say", null) |
| 201 | +- .doOnError(e -> Assert.assertTrue(e.getMessage().contains("Could not find actor"))) |
| 202 | +- .doOnSuccess(s -> Assert.fail()) |
| 203 | +- .onErrorReturn("".getBytes()) |
| 204 | +- .block(); |
| 205 | +- } |
| 206 | +- |
| 207 | + @Test |
| 208 | + public void lazyDeactivate() throws Exception { |
| 209 | + String actorId = UUID.randomUUID().toString(); |
| 210 | + this.runtime.registerActor(MyActorImpl.class); |
| 211 | +- this.runtime.activate(ACTOR_NAME, actorId).block(); |
| 212 | + |
| 213 | +- Mono<Void> deacticateCall = this.runtime.deactivate(ACTOR_NAME, actorId); |
| 214 | ++ Mono<Void> deactivateCall = this.runtime.deactivate(ACTOR_NAME, actorId); |
| 215 | + |
| 216 | + this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block(); |
| 217 | + |
| 218 | +- deacticateCall.block(); |
| 219 | ++ deactivateCall.block(); |
| 220 | + |
| 221 | + this.runtime.invoke(ACTOR_NAME, actorId, "say", null) |
| 222 | + .doOnError(e -> Assert.assertTrue(e.getMessage().contains("Could not find actor"))) |
| 223 | +@@ -143,9 +156,15 @@ public class ActorRuntimeTest { |
| 224 | + |
| 225 | + Mono<byte[]> invokeCall = this.runtime.invoke(ACTOR_NAME, actorId, "say", null); |
| 226 | + |
| 227 | +- this.runtime.activate(ACTOR_NAME, actorId).block(); |
| 228 | ++ byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block(); |
| 229 | ++ int count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class); |
| 230 | ++ Assert.assertEquals(0, count); |
| 231 | + |
| 232 | + invokeCall.block(); |
| 233 | ++ |
| 234 | ++ response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block(); |
| 235 | ++ count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class); |
| 236 | ++ Assert.assertEquals(1, count); |
| 237 | + } |
| 238 | + |
| 239 | + } |
| 240 | +diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java |
| 241 | +index 96e6925..0b8ebbb 100644 |
| 242 | +--- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java |
| 243 | ++++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java |
| 244 | +@@ -56,18 +56,6 @@ public class DaprController { |
| 245 | + return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics()); |
| 246 | + } |
| 247 | + |
| 248 | +- /** |
| 249 | +- * Handles API to activate an actor. |
| 250 | +- * @param type Actor type. |
| 251 | +- * @param id Actor Id. |
| 252 | +- * @return Void. |
| 253 | +- */ |
| 254 | +- @PostMapping(path = "/actors/{type}/{id}") |
| 255 | +- public Mono<Void> activateActor(@PathVariable("type") String type, |
| 256 | +- @PathVariable("id") String id) { |
| 257 | +- return ActorRuntime.getInstance().activate(type, id); |
| 258 | +- } |
| 259 | +- |
| 260 | + /** |
| 261 | + * Handles API to deactivate an actor. |
| 262 | + * @param type Actor type. |
| 263 | +-- |
| 264 | +2.21.1 (Apple Git-122.3) |
| 265 | + |
0 commit comments